unbloat-code
This commit is contained in:
55
bot/announce.py
Normal file
55
bot/announce.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from bot.api import telegram_api
|
||||
from utils.mention import mention, userdata_extract
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
announces = {}
|
||||
|
||||
def get_newcomer_message(msg):
|
||||
lang = msg["from"].get("language_code", "ru")
|
||||
r = "хочет присоединиться к нам здесь" if lang == "ru" else " wants to join us here"
|
||||
_uid, identity, username = userdata_extract(msg["from"])
|
||||
if username:
|
||||
r = "@" + username + " " + r
|
||||
r = identity + " " + r
|
||||
return r
|
||||
|
||||
|
||||
async def show_announce(msg):
|
||||
logger.info("showing announce with photo")
|
||||
chat_id = str(msg["chat"]["id"])
|
||||
from_id = str(msg["from"]["id"])
|
||||
mid = msg.get("message_id", "")
|
||||
newcomer_message = get_newcomer_message(msg)
|
||||
|
||||
userphotos_response = await telegram_api("getUserphotos", user_id=from_id)
|
||||
logger.debug(userphotos_response)
|
||||
|
||||
file_id = ""
|
||||
if userphotos_response["ok"] and userphotos_response["result"]["total_count"] > 0:
|
||||
logger.info("showing button with photo")
|
||||
file_id = userphotos_response["result"]["photos"][0][0]["file_id"]
|
||||
|
||||
r = await telegram_api("sendPhoto",
|
||||
chat_id=chat_id,
|
||||
file_id=file_id,
|
||||
caption=newcomer_message,
|
||||
reply_to=mid
|
||||
)
|
||||
logger.debug(r)
|
||||
announces[from_id] = r.get("message_id")
|
||||
|
||||
|
||||
async def edit_announce(msg):
|
||||
logger.info("editing announce")
|
||||
chat_id = str(msg["chat"]["id"])
|
||||
from_id = str(msg["from"]["id"])
|
||||
mid = msg.get("message_id", "")
|
||||
caption = get_newcomer_message(msg) + msg.get("text")
|
||||
announce_message_id = announces.get(from_id)
|
||||
r = await telegram_api("editMessageCaption", chat_id=chat_id, message_id=announce_message_id, caption=caption)
|
||||
announces[from_id] = r.get("message_id")
|
269
bot/api.py
269
bot/api.py
@@ -1,6 +1,7 @@
|
||||
import aiohttp
|
||||
import json
|
||||
from bot.config import BOT_TOKEN, WEBHOOK
|
||||
from urllib.parse import urlencode
|
||||
from bot.config import BOT_TOKEN
|
||||
import logging
|
||||
|
||||
# Create a logger instance
|
||||
@@ -10,269 +11,9 @@ logging.basicConfig(level=logging.INFO)
|
||||
apiBase = f"https://api.telegram.org/bot{BOT_TOKEN}/"
|
||||
|
||||
|
||||
async def register_webhook():
|
||||
async def telegram_api(endpoint: str, **kwargs):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(apiBase + f"setWebhook?url={WEBHOOK}") as response:
|
||||
async with session.get(apiBase + f"{endpoint}?{urlencode(kwargs)}") as response:
|
||||
data = await response.json()
|
||||
logger.info("Webhook registration response: %s", data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#sendmessage
|
||||
async def send_message(cid: str, body, reply_to=None, reply_markup=None):
|
||||
if not body:
|
||||
return
|
||||
url = apiBase + f"sendMessage"
|
||||
params = {"chat_id": cid, "text": body, "parse_mode": "HTML"}
|
||||
if reply_to:
|
||||
params["reply_to_message_id"] = reply_to
|
||||
if reply_markup:
|
||||
params["reply_markup"] = json.dumps(reply_markup)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Message sent to %s: %s", cid, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#sendchataction
|
||||
async def send_chataction(cid: str, action: str):
|
||||
url = apiBase + f"sendChatAction"
|
||||
params = {"chat_id": cid, "action": action}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Chat action sent to %s: %s", cid, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#forwardmessage
|
||||
async def forward_message(cid, mid, to_chat_id):
|
||||
url = apiBase + f"forwardMessage"
|
||||
params = {"chat_id": to_chat_id, "from_chat_id": cid, "message_id": mid}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Message forwarded from %s to %s: %s", cid, to_chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#deletemessage
|
||||
async def delete_message(cid: str, mid: str):
|
||||
url = apiBase + f"deleteMessage"
|
||||
params = {"chat_id": cid, "message_id": mid}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#sendphoto
|
||||
async def send_photo(
|
||||
cid: str, file_id: str, caption="", reply_to=None, reply_markup=None
|
||||
):
|
||||
url = apiBase + f"sendPhoto"
|
||||
params = {
|
||||
"chat_id": cid,
|
||||
"photo": file_id,
|
||||
"caption": caption,
|
||||
"parse_mode": "HTML",
|
||||
}
|
||||
if reply_to:
|
||||
params["reply_to_message_id"] = reply_to
|
||||
if reply_markup:
|
||||
params["reply_markup"] = json.dumps(reply_markup)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Photo sent to %s: %s", cid, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#banchatmember
|
||||
async def ban_member(chat_id, user_id, until_date=None):
|
||||
url = apiBase + f"banChatMember"
|
||||
params = {"chat_id": chat_id, "user_id": user_id}
|
||||
if until_date:
|
||||
params["until_date"] = until_date
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Member banned from %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#unbanchatmember
|
||||
async def unban_member(chat_id, user_id):
|
||||
url = apiBase + f"unbanChatMember"
|
||||
params = {"chat_id": chat_id, "user_id": user_id, "only_if_banned": 1}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Member unbanned from %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#addchatmember
|
||||
async def add_member(chat_id, user_id):
|
||||
url = apiBase + f"addChatMember"
|
||||
params = {"chat_id": chat_id, "user_id": user_id}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Member added to %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#restrictchatmember
|
||||
async def mute_member(chat_id, member_id):
|
||||
url = apiBase + f"restrictChatMember"
|
||||
params = {
|
||||
"chat_id": chat_id,
|
||||
"user_id": member_id,
|
||||
"permissions": json.dumps({"can_send_messages": False}),
|
||||
"use_independent_chat_permissions": 1,
|
||||
}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Member muted in %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#restrictchatmember
|
||||
async def unmute_member(chat_id, member_id, chat_permissions=None):
|
||||
if not chat_permissions:
|
||||
chat_permissions = json.dumps(
|
||||
{
|
||||
"can_send_messages": True,
|
||||
"can_send_photos": True,
|
||||
"can_send_other_messages": True,
|
||||
"can_send_polls": True,
|
||||
"can_add_web_page_previews": True,
|
||||
"can_send_audios": True,
|
||||
"can_invite_users": True,
|
||||
"can_send_voice_notes": True,
|
||||
"can_send_video_notes": True,
|
||||
"can_send_videos": True,
|
||||
"can_send_documents": True,
|
||||
}
|
||||
)
|
||||
|
||||
url = apiBase + f"restrictChatMember"
|
||||
params = {
|
||||
"chat_id": chat_id,
|
||||
"user_id": member_id,
|
||||
"permissions": chat_permissions,
|
||||
"use_independent_chat_permissions": 1,
|
||||
}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Member unmuted in %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#approvechatjoinrequest
|
||||
async def approve_chat_join_request(chat_id, user_id):
|
||||
url = apiBase + f"approveChatJoinRequest"
|
||||
params = {"chat_id": chat_id, "user_id": user_id}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Chat join request approved in %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#senddocument
|
||||
async def send_document(chat_id, data="", filename="chart.svg"):
|
||||
url = apiBase + "sendDocument"
|
||||
params = {"chat_id": chat_id}
|
||||
filedata = aiohttp.FormData()
|
||||
filedata.add_field("document", data, filename=filename)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params, data=filedata) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
print(f"Error sending document: {response.status} - {error_text}")
|
||||
return None
|
||||
|
||||
try:
|
||||
return await response.json()
|
||||
except ValueError as e:
|
||||
print(f"Error decoding JSON: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#getchatadministrators
|
||||
async def get_chat_administrators(chat_id):
|
||||
url = apiBase + f"getChatAdministrators"
|
||||
params = {"chat_id": chat_id}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Chat administrators retrieved for %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#getchatmember
|
||||
async def get_member(chat_id, member_id):
|
||||
url = apiBase + f"getChatMember"
|
||||
params = {"chat_id": chat_id, "user_id": member_id}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Chat member retrieved for %s: %s", chat_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#getuserprofilephotos
|
||||
async def get_userphotos(user_id):
|
||||
url = apiBase + f"getUserProfilePhotos"
|
||||
params = {"user_id": user_id}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("User profile photos retrieved for %s: %s", user_id, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#editmessagereplymarkup
|
||||
async def edit_replymarkup(cid, mid, reply_markup):
|
||||
url = apiBase + f"editMessageReplyMarkup"
|
||||
params = {
|
||||
"chat_id": cid,
|
||||
"message_id": mid,
|
||||
"reply_markup": json.dumps(reply_markup),
|
||||
}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Reply markup edited for message %s in %s: %s", mid, cid, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#getchat
|
||||
async def get_chat(cid):
|
||||
url = apiBase + f"getChat"
|
||||
params = {"chat_id": cid}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Chat retrieved for %s: %s", cid, data)
|
||||
return data
|
||||
|
||||
|
||||
# https://core.telegram.org/bots/api#banchatmember
|
||||
async def kick_member(chat_id, member_id):
|
||||
url = apiBase + f"banChatMember"
|
||||
params = {"chat_id": chat_id, "user_id": member_id}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, params=params) as response:
|
||||
data = await response.json()
|
||||
logger.info("Member kicked from %s: %s", chat_id, data)
|
||||
logger.info("Telegram API response: %s", data)
|
||||
return data
|
||||
|
@@ -1,6 +1,4 @@
|
||||
import os
|
||||
|
||||
BOT_TOKEN = os.environ.get("BOT_TOKEN") or ""
|
||||
WEBHOOK = os.environ.get("VERCEL_URL") or "http://localhost:8000"
|
||||
REDIS_URL = os.environ.get("REDIS_URL") or "redis://localhost:6379"
|
||||
FEEDBACK_CHAT_ID = os.environ.get("FEEDBACK_CHAT_ID", "").replace("-", "-100")
|
||||
|
12
bot/state.py
12
bot/state.py
@@ -1,12 +0,0 @@
|
||||
class State:
|
||||
def __init__(self):
|
||||
self.talking = dict()
|
||||
|
||||
def is_talking(self, uid):
|
||||
return uid in self.talking
|
||||
|
||||
def make_talking(self, uid, cid):
|
||||
self.talking[uid] = cid
|
||||
|
||||
def aho(self, uid):
|
||||
del self.talking[uid]
|
16
bot/talking.py
Normal file
16
bot/talking.py
Normal file
@@ -0,0 +1,16 @@
|
||||
class TalkingStick:
|
||||
def __init__(self):
|
||||
self.chat_by_talking = dict()
|
||||
self.talking_by_chat = dict()
|
||||
|
||||
def holder(self, cid):
|
||||
return self.talking_by_chat[cid]
|
||||
|
||||
def take(self, uid, cid):
|
||||
self.chat_by_talking[uid] = cid
|
||||
self.talking_by_chat[cid] = uid
|
||||
|
||||
def aho(self, uid):
|
||||
cid = self.chat_by_talking[uid]
|
||||
del self.talking_by_chat[cid]
|
||||
del self.chat_by_talking[uid]
|
Reference in New Issue
Block a user