34 lines
1.0 KiB
Python
34 lines
1.0 KiB
Python
import aiohttp
|
|
import json
|
|
from urllib.parse import urlencode
|
|
from bot.config import BOT_TOKEN
|
|
import logging
|
|
|
|
# Create a logger instance
|
|
logger = logging.getLogger("bot.api")
|
|
|
|
|
|
api_base = f"https://api.telegram.org/bot{BOT_TOKEN}/"
|
|
|
|
|
|
async def telegram_api(endpoint: str, json_data=None, **kwargs):
|
|
try:
|
|
url = api_base + f"{endpoint}?{urlencode(kwargs)}"
|
|
is_polling = endpoint == "getUpdates"
|
|
headers = {"Content-Type": "application/json"}
|
|
async with aiohttp.ClientSession() as session:
|
|
url = api_base + f"{endpoint}?{urlencode(kwargs)}"
|
|
if not is_polling:
|
|
logger.info(f' >>> {url} {json_data if json_data else ""}')
|
|
async with session.get(
|
|
url, data=json.dumps(json_data), headers=headers
|
|
) as response:
|
|
data = await response.json()
|
|
if not is_polling:
|
|
logger.info(f" <<< {data}")
|
|
return data
|
|
except Exception:
|
|
import traceback
|
|
|
|
traceback.print_exc()
|