welcomecenterbot/bot/api.py
2024-09-28 10:31:48 +03:00

53 lines
2.0 KiB
Python

import aiohttp
import aiofiles
import json
from urllib.parse import urlencode
from bot.config import BOT_TOKEN
import logging
# Create a logger instance
logger = logging.getLogger(" [bot.api] ")
api_base = f"https://api.telegram.org/bot{BOT_TOKEN}/"
async def telegram_api(endpoint: str, json_data=None, **kwargs):
try:
url = api_base + f"{endpoint}?{urlencode(kwargs)}"
is_polling = endpoint == "getUpdates"
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
url = api_base + f"{endpoint}?{urlencode(kwargs)}"
if not is_polling:
logger.info(f' >>> {url} {json_data if json_data else ""}')
async with session.get(
url, data=json.dumps(json_data), headers=headers
) as response:
data = await response.json()
if not is_polling:
logger.info(f" <<< {data}")
return data
except Exception:
import traceback
traceback.print_exc()
async def download_file(file_id):
"""Asynchronously download a file from Telegram and yield the temporary file path."""
# Get the file path of the file using the telegram_api method
file = await telegram_api("getFile", file_id=file_id)
file_path = file["result"]["file_path"]
download_url = f"{api_base}/{file_path}"
async with aiohttp.ClientSession() as session:
async with session.get(download_url) as response:
if response.status == 200:
# Save the downloaded file to a temporary location
async with aiofiles.tempfile.NamedTemporaryFile(delete=True) as temp_file:
await temp_file.write(await response.read())
await temp_file.flush()
logger.error(f"download file: {temp_file.name }")
yield temp_file.name # Yield the path of the temporary file
else:
logger.error(f"Failed to download file: {response.status}")