welcomecenterbot/tgbot/api.py

188 lines
6.2 KiB
Python
Raw Normal View History

2023-04-16 14:58:53 +00:00
import requests
2023-04-23 16:54:58 +00:00
import aiohttp
2023-04-16 14:58:53 +00:00
import json
import os
TOKEN = os.environ.get('BOT_TOKEN')
apiBase = f"https://api.telegram.org/bot{TOKEN}/"
def register_webhook(url):
2023-04-16 21:54:36 +00:00
r = requests.get(
2023-04-16 14:58:53 +00:00
apiBase + f'setWebhook?url={url}'
)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-16 14:58:53 +00:00
2023-04-16 21:54:36 +00:00
def delete_message(cid: str, mid: str):
url = apiBase + f"deleteMessage?chat_id={cid}&message_id={mid}"
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-16 14:58:53 +00:00
2023-04-23 16:54:58 +00:00
# https://core.telegram.org/bots/api#sendmessage
2023-04-17 21:42:25 +00:00
def send_message(cid: str, body, reply_to=None, reply_markup=None):
2023-04-29 20:37:28 +00:00
url = f"sendMessage?chat_id={cid}&text={body}"
2023-04-16 14:58:53 +00:00
if reply_to:
url += f'&reply_to_message_id={reply_to}'
if reply_markup:
reply_markup = json.dumps(reply_markup)
reply_markup = requests.utils.quote(reply_markup)
url += f'&reply_markup={reply_markup}'
2023-04-28 12:24:14 +00:00
url += f'&parse_mode=html'
2023-04-29 20:37:28 +00:00
print(url)
r = requests.post(apiBase + url)
2023-04-28 12:24:14 +00:00
return r.json()
# https://core.telegram.org/bots/api#sendphoto
def send_photo(cid: str, file_id: str, caption="", reply_to=None, reply_markup=None):
2023-04-29 20:37:28 +00:00
url = f"sendPhoto?chat_id={cid}&photo={file_id}&caption={caption}"
2023-04-28 12:24:14 +00:00
if reply_to:
url += f'&reply_to_message_id={reply_to}'
if reply_markup:
reply_markup = json.dumps(reply_markup)
reply_markup = requests.utils.quote(reply_markup)
url += f'&reply_markup={reply_markup}'
url += f'&parse_mode=html'
2023-04-29 20:37:28 +00:00
print(url)
r = requests.post(apiBase + url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-16 14:58:53 +00:00
# https://core.telegram.org/bots/api#banchatmember
def ban_member(chat_id, user_id, until_date=None):
url = apiBase + f"banChatMember?chat_id={chat_id}&user_id={user_id}"
if until_date:
url += f'&until_data={until_date}'
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-16 14:58:53 +00:00
2023-04-21 10:06:48 +00:00
2023-04-16 14:58:53 +00:00
# https://core.telegram.org/bots/api#unbanchatmember
def unban_member(chat_id, user_id):
url = apiBase + f"unbanChatMember?chat_id={chat_id}&user_id={user_id}&only_if_banned=1"
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-16 14:58:53 +00:00
2023-04-21 10:09:10 +00:00
# https://core.telegram.org/bots/api#addchatmember
2023-04-22 01:17:50 +00:00
def add_member(chat_id, user_id):
2023-04-21 10:09:10 +00:00
url = apiBase + f"addChatMember?chat_id={chat_id}&user_id={user_id}"
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-21 10:09:10 +00:00
2023-04-16 14:58:53 +00:00
def forward_message(cid, mid, to_chat_id):
url = apiBase + f"forwardMessage?chat_id={to_chat_id}" + \
f"&from_chat_id={cid}&message_id={mid}"
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-21 10:06:48 +00:00
2023-04-22 01:17:50 +00:00
# https://core.telegram.org/bots/api#restrictchatmember
def mute_member(chat_id, member_id):
chat_permissions = json.dumps({ "can_send_messages": False })
2023-04-21 10:06:48 +00:00
chat_permissions = requests.utils.quote(chat_permissions)
2023-04-22 01:17:50 +00:00
url = apiBase + f'restrictChatMember?chat_id={chat_id}' + \
2023-04-28 12:24:14 +00:00
f'&user_id={member_id}&permissions={chat_permissions}' + \
f'&use_independent_chat_permissions=1'
2023-04-21 10:06:48 +00:00
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-22 01:17:50 +00:00
# https://core.telegram.org/bots/api#restrictchatmember
2023-04-29 20:37:28 +00:00
def unmute_member(chat_id, member_id, chat_permissions=None):
if not chat_permissions:
chat_permissions = json.dumps({
"can_send_messages": True,
"can_send_photos": True,
"can_send_other_messages": True,
"can_send_polls": True,
"can_add_web_page_previews": True,
"can_send_audios": True,
"can_invite_users": True,
"can_send_voice_notes": True,
"can_send_video_notes": True,
"can_send_videos": True,
"can_send_documents": True
})
2023-04-22 01:17:50 +00:00
chat_permissions = requests.utils.quote(chat_permissions)
url = apiBase + f'restrictChatMember?chat_id={chat_id}' + \
2023-04-28 12:24:14 +00:00
f'&user_id={member_id}&permissions={chat_permissions}' + \
f'&use_independent_chat_permissions=1'
2023-04-22 01:17:50 +00:00
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
2023-04-22 01:17:50 +00:00
# https://core.telegram.org/bots/api#approvechatjoinrequest
def approve_chat_join_request(chat_id, user_id):
url = apiBase + f"approveChatJoinRequest?chat_id={chat_id}" + \
f'&user_id={user_id}'
r = requests.post(url)
2023-04-23 16:54:58 +00:00
return r.json()
# https://core.telegram.org/bots/api#senddocument
async def send_document(chat_id, data='', filename='chart.svg'):
url = apiBase + "sendDocument"
params = { "chat_id": chat_id }
2023-04-24 06:14:35 +00:00
filedata = aiohttp.FormData()
filedata.add_field('document', data, filename=filename)
2023-04-23 16:54:58 +00:00
2023-04-24 06:14:35 +00:00
async with aiohttp.ClientSession() as session:
2023-04-23 16:54:58 +00:00
async with session.post(url, params=params, data=filedata) as response:
if response.status != 200:
error_text = await response.text()
print(f"Error sending document: {response.status} - {error_text}")
return None
try:
return await response.json()
except ValueError as e:
print(f"Error decoding JSON: {e}")
return None
2023-04-22 01:21:24 +00:00
2023-04-23 16:54:58 +00:00
# https://core.telegram.org/bots/api#getchatadministrators
def get_chat_administrators(chat_id):
url = apiBase + f"getChatAdministrators?chat_id={chat_id}"
r = requests.get(url)
return r.json()
2023-04-28 12:24:14 +00:00
# https://core.telegram.org/bots/api#getchatmember
2023-04-23 16:54:58 +00:00
def get_member(chat_id, member_id):
url = apiBase + f"getChatMember?chat_id={chat_id}&user_id={member_id}"
r = requests.get(url)
2023-04-28 12:24:14 +00:00
return r.json()
# https://core.telegram.org/bots/api#getuserprofilephotos
def get_userphotos(user_id):
url = apiBase + f"getUserProfilePhotos?user_id={user_id}"
r = requests.get(url)
2023-04-29 20:37:28 +00:00
return r.json()
# https://core.telegram.org/bots/api#editmessagereplymarkup
def edit_replymarkup(cid, mid, reply_markup):
2023-05-04 08:42:52 +00:00
reply_markup = json.dumps(reply_markup)
reply_markup = requests.utils.quote(reply_markup)
url = f"editMessageReplyMarkup?chat_id={cid}&message_id={mid}&reply_markup={reply_markup}"
r = requests.post(apiBase + url)
2023-04-29 20:37:28 +00:00
return r.json()
# https://core.telegram.org/bots/api#getchat
def get_chat(cid):
url = apiBase + f"getChat?chat_id={cid}"
r = requests.get(url)
return r.json()
# https://core.telegram.org/bots/api#banchatmember
def kick_member(chat_id, member_id):
url = f"banChatSenderChat?chat_id={cid}&user_id={member_id}"
r = requests.post(apiBase + url)
print(r.json())
url = f"unbanChatSenderChat?chat_id={cid}&user_id={member_id}&only_if_banned=1"
r = requests.post(apiBase + url)
2023-04-23 16:54:58 +00:00
return r.json()