version-0.2.0

This commit is contained in:
2024-01-06 14:25:35 +03:00
parent bc05b93c47
commit 908529b5bb
26 changed files with 185 additions and 153 deletions

View File

@@ -1,6 +1,6 @@
import aiohttp
import json
from config import BOT_TOKEN, WEBHOOK
from bot.config import BOT_TOKEN, WEBHOOK
import logging
# Create a logger instance

View File

@@ -1,59 +0,0 @@
from api import send_message, delete_message, kick_member
from handlers.command_my import handle_command_my
from handlers.callback_vouch import update_button
from utils.mention import userdata_extract
from storage import Profile
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# remove link of callback sender
# from member vouched before
async def handle_unlink(payload, state):
logger.info("handle unlink button pressed or command, private chat only")
from_id = str(payload["from"]["id"])
linked_id = ""
if "data" in payload:
linked_id = str(payload["data"].replace("unlink", ""))
elif "text" in payload:
linked_id = str(payload["text"].replace("/unlink ", ""))
# удаляем связь с потомком
actor = Profile.get(from_id, payload)
actor["children"].remove(str(linked_id))
Profile.save(actor)
# удаляем связь с предком
linked = Profile.get(linked_id)
linked["parents"].remove(str(from_id))
Profile.save(linked)
# удаляем старое сообщение с кнопками-unlink
reply_msg_id = payload["message"]["message_id"]
r = await delete_message(from_id, reply_msg_id)
logger.debug(r)
# если ещё есть связи - посылаем новое сообщение
if len(actor["children"]) > 0:
await handle_command_my(payload, state)
lang = payload["from"].get("language_code", "ru")
for chat_id in linked["chats"]:
# если больше никто не поручился - kick out
if len(linked["parents"]) == 0:
r = await kick_member(chat_id, linked_id)
logger.debug(r)
if r["ok"]:
_, identity, username = userdata_extract(linked["result"]["user"])
body = (
"Участник %s%s был удалён"
if lang == "ru"
else "Member %s%s was deleted"
) % (identity, username)
r = await send_message(chat_id, body)
logger.debug(r)
# обновление счётчика
await update_button(linked_id, chat_id)

View File

@@ -1,61 +0,0 @@
from api import approve_chat_join_request, edit_replymarkup
from storage import Profile, storage
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def update_button(chat_id, member_id, text="❤️"):
button_message_id = storage.get(f"btn-{chat_id}-{member_id}")
print(f"button_message_id: {button_message_id}")
if button_message_id:
button_message_id = button_message_id.decode("utf-8")
print(f"button_message_id: {button_message_id}")
print("update reply markup")
newcomer = Profile.get(member_id)
amount = len(newcomer["parents"]) + 1
text = f"❤️ {amount}"
rm = {
"inline_keyboard": [[{"text": text, "callback_data": "vouch" + member_id}]]
}
r = await edit_replymarkup(chat_id, button_message_id, reply_markup=rm)
print(r)
async def handle_button(callback_query):
# получаем профиль нажавшего кнопку
actor_id = str(callback_query["from"]["id"])
actor = Profile.get(actor_id, callback_query)
callback_data = callback_query["data"]
if callback_data.startswith("vouch"):
print(f"button pressed by {actor_id}")
newcomer_id = callback_data[5:]
print(f"button pressed for {newcomer_id}")
newcomer = Profile.get(newcomer_id)
print(f"newcomer profile {newcomer}")
if newcomer_id == actor_id:
# нажал сам, не реагируем, прописываем данные
_newcomer = Profile.get(newcomer_id, callback_query)
else:
# нажал кто-то другой
if str(actor_id) not in newcomer["parents"]:
print(f"save parent for {newcomer_id}")
newcomer["parents"].append(str(actor_id))
Profile.save(newcomer)
if str(newcomer_id) not in actor["children"]:
print(f"save child for {actor_id}")
actor["children"].append(str(newcomer_id))
Profile.save(actor)
chat_id = str(vars(callback_query["message"]["chat"])["id"])
print("accept join request")
r = await approve_chat_join_request(chat_id, newcomer_id)
print(r)
await update_button(chat_id, newcomer_id)

View File

@@ -1,21 +0,0 @@
from storage import Profile
from handlers.send_button import show_request_msg
from api import get_member
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def handle_command_ask(msg):
logger.info("handling request resend")
_cmd, chat_id, member_id = msg["text"].split(" ")
chat_id = chat_id.replace("-", "-100")
r = await get_member(chat_id, member_id)
logger.debug(r)
m = {}
if "result" in r:
m["from"] = r["result"]["user"]
m["chat"] = {"id": str(chat_id)}
await show_request_msg(m)
elif "error_code" in r:
logger.error(r)

View File

@@ -1,15 +0,0 @@
from utils.graph import generate_chart
from api import send_document
from storage import storage, scan
import json
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def handle_command_graph(msg):
_usr_ids, members = scan(match="usr-*", count=100)
data = generate_chart(members)
if data:
r = await send_document(msg["chat"]["id"], data, "chart.svg")
logger.debug(r)

View File

@@ -1,55 +0,0 @@
from storage import Profile, scan
from api import get_member, send_message
from utils.mention import userdata_extract
import json
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def construct_unlink_buttons(actor):
print(f"constructing unlink buttons for {actor['children']}")
buttons = []
for vouch in actor["children"]:
for chat_id in actor["chats"]:
r = await get_member(chat_id, vouch)
logger.debug(r)
if "result" in r:
member = r["result"]["user"]
_uid, identity, username = userdata_extract(member)
buttons.append(
{"text": f"{identity} {username}", "callback_data": "unlink" + vouch}
)
return buttons
async def handle_command_my(msg, state):
logger.info("handle my command")
from_id = str(msg["from"]["id"])
sender = Profile.get(from_id, msg)
# генерируем кнопки для всех, за кого поручились
buttons = await construct_unlink_buttons(sender)
reply_markup = {
"inline_keyboard": [
buttons,
]
}
if len(buttons) == 0:
if msg["from"].get("language_code", "ru") == "ru":
body = "Вас ещё никто не узнал? Напишите, я передам нашему кругу"
else:
body = (
"Nobody recognized you? Speak, I will pass your message to the circle"
)
r = await send_message(from_id, body)
logger.debug(r)
chat_id = msg["chat"]["id"]
state.make_talking(from_id, chat_id)
else:
if msg["from"].get("language_code", "ru") == "ru":
body = "Нажмите кнопки ниже, чтобы удалить ваши связи"
else:
body = "Unlink your connections pressing the buttons below"
r = await send_message(from_id, body, reply_markup=reply_markup)
print(r)

View File

@@ -1,46 +0,0 @@
from api import send_message, delete_message, get_chat_administrators
from handlers.command_my import handle_command_my
from storage import Profile, storage
from handlers.send_button import show_request_msg
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def handle_default(msg, state):
logger.info(f"default handler for all messages {msg}")
chat_id = str(msg["chat"]["id"])
from_id = str(msg["from"]["id"])
sender = Profile.get(from_id, msg)
text = msg.get("text", "")
if text.startswith("/my"):
# команда в групповом чате
logger.info("remove some messages in group chat")
# удалить сообщение с командой /my
r = await delete_message(chat_id, msg["message_id"])
logger.debug(r)
# показать связи в личке
await handle_command_my(msg, state)
else:
# любое другое сообщение
if len(sender["parents"]) == 0:
# владелец чата автоматически ручается
logger.info(f"setting owner as parent for {from_id}")
r = await get_chat_administrators(chat_id)
logger.debug(r)
owner_id = ""
for admin in r["result"]:
if admin["status"] == "creator":
owner_id = admin["user"]["id"]
break
if owner_id:
sender["parents"].append(str(owner_id))
# обновляем профиль владельца
owner = Profile.get(owner_id)
owner["children"].append(str(from_id))
Profile.save(owner)
# сохранить профиль отправителя
Profile.save(sender)

View File

@@ -1,63 +0,0 @@
import json
from api import (
send_message,
forward_message,
get_chat_administrators,
)
from storage import storage
from config import FEEDBACK_CHAT_ID
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def handle_feedback(msg, state):
mid = msg["message_id"]
cid = msg["chat"]["id"]
uid = msg["from"]["id"]
if msg["text"] == "/start":
r = await send_message(cid, "Напишите своё сообщение для администрации чата")
logger.debug(r)
elif state.is_talking(uid):
r = await forward_message(cid, mid, state.talking[uid])
logger.debug(r)
state.aho(uid)
else:
r = await forward_message(cid, mid, FEEDBACK_CHAT_ID)
logger.debug(r)
support_msg_id = r["result"]["message_id"]
# сохранение айди сообщения в приватной переписке с ботом
storage.set(
f"fbk-{support_msg_id}",
json.dumps(
{"author_id": uid, "message_id": mid, "chat_id": cid}
),
)
async def handle_answer(msg):
logger.info("handle answering feedback")
logger.debug(msg)
answered_msg = msg.get("reply_to_message")
if answered_msg:
if "from" not in answered_msg:
answered_msg["from"] = msg.get("from_user")
r = await get_chat_administrators(msg["chat"]["id"])
logger.debug(r)
admins = []
for a in r["result"]:
admins.append(a["user"]["id"])
if answered_msg["from"]["is_bot"] and msg["from"]["id"] in admins:
support_msg_id = str(answered_msg["message_id"])
# получение сохраненного информации о сообщении для ответа
stored_feedback = storage.get(f"fbk-{support_msg_id}")
if stored_feedback:
logger.info("handle an answer from feedback group")
stored_feedback = json.loads(stored_feedback)
r = await send_message(
f'{stored_feedback["chat_id"]}',
msg["text"],
reply_to=stored_feedback["message_id"],
)
logger.debug(r)

View File

@@ -1,22 +0,0 @@
from api import approve_chat_join_request, delete_message
from handlers.send_button import show_request_msg
from storage import Profile, storage
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def handle_join_request(join_request):
logger.info(f"handle join request {join_request}")
chat_id = str(join_request["chat"]["id"])
from_id = str(join_request["from"]["id"])
actor = Profile.get(from_id, join_request)
if len(actor["parents"]) == 0:
# показываем сообщение с кнопкой "поручиться"
await show_request_msg(join_request)
else:
# за пользователя поручились ранее
r = await approve_chat_join_request(chat_id, from_id)
logger.debug(r)
Profile.save(actor)

View File

@@ -1,47 +0,0 @@
from handlers.send_button import show_request_msg
from api import delete_message
from storage import Profile, storage
from config import FEEDBACK_CHAT_ID
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def handle_join(msg):
chat_id = str(msg["chat"]["id"])
from_id = str(msg["from"]["id"])
actor = Profile.get(from_id, msg)
newcomer_id = str(msg["new_chat_member"]["id"])
if from_id == newcomer_id:
if len(actor["parents"]) == 0 and str(chat_id) != FEEDBACK_CHAT_ID:
# показываем сообщение с кнопкой "поручиться"
r = await show_request_msg(msg)
logger.debug(r)
else:
# за пользователя поручились ранее
pass
else:
# пользователи приглашены другим участником
logger.info(f'{len(msg["new_chat_members"])} members were invited by {from_id}')
for m in msg["new_chat_members"]:
newcomer = Profile.get(m["id"])
newcomer["parents"].append(str(from_id))
Profile.save(newcomer)
actor["children"].append(str(m["id"]))
# обновляем профиль пригласившего
Profile.save(actor)
async def handle_left(msg):
logger.info("handling member leaving")
member_id = msg["left_chat_member"]["id"]
chat_id = msg["chat"]["id"]
# удаление сообщения с кнопкой в этом чате
prev_msg = storage.get(f"btn-{chat_id}-{member_id}")
if prev_msg:
r = await delete_message(chat_id, prev_msg["message_id"])
logger.debug(r)
storage.remove(f"btn-{chat_id}-{member_id}")

View File

@@ -1,44 +0,0 @@
from config import FEEDBACK_CHAT_ID
from storage import scan, Profile
from api import approve_chat_join_request, kick_member, send_message
from handlers.callback_vouch import update_button
from utils.mention import userdata_extract
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def revalidate_storage():
# поддерживает консистентность данных
btn_ids, _btns = scan(match="btn-*", count=100)
logger.info(f"storage data revalidation for {len(btn_ids)} entries")
for btnid in btn_ids:
# для каждой ранее созданной кнопки
btnid_str = btnid.decode("utf-8").replace("btn-", "")
parts = btnid_str.split("-")
logger.debug(parts)
_, chat_id, member_id = parts
chat_id = "-" + chat_id
newcomer = Profile.get(member_id)
if len(newcomer.get("parents", [])) > 0:
# принять заявку если её нажимали
r = await approve_chat_join_request(chat_id, member_id)
logger.debug(r)
await update_button(chat_id, member_id)
elif len(newcomer.get("parents", [])) == 0:
r = await kick_member(chat_id, member_id)
logger.debug(r)
if r["ok"]:
_, identity, username = userdata_extract(newcomer["result"]["user"])
# feedback report
body = f"Участник {identity} {username} был удалён"
r = await send_message(FEEDBACK_CHAT_ID, body)
logger.debug(r)
# pm report
body = f"Вы утратили поддержку в чате и были удалены"
r = await send_message(member_id, body)
logger.debug(r)
async def handle_startup():
await revalidate_storage()

View File

@@ -1,41 +0,0 @@
from handlers.handle_feedback import handle_feedback, handle_answer
from handlers.handle_default import handle_default
from handlers.command_my import handle_command_my
from handlers.command_graph import handle_command_graph
from handlers.command_ask import handle_command_ask
from config import FEEDBACK_CHAT_ID
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def handle_routing(msg, state):
cid = msg["chat"]["id"]
uid = msg["from"]["id"]
if cid == uid:
# сообщения в личке с ботом
logger.info("private chat message")
text = msg.get("text")
if text:
if text.startswith("/my"):
await handle_command_my(msg, state)
elif text.startswith("/graph"):
await handle_command_graph(msg)
else:
await handle_feedback(msg, state)
elif str(cid) == FEEDBACK_CHAT_ID:
# сообщения из группы обратной связи
logger.info("feedback chat message")
logger.debug(msg)
if msg.get("reply_to_message"):
await handle_answer(msg)
elif msg.get("text", "").startswith("/ask"):
await handle_command_ask(msg)
else:
# сообщения из всех остальных групп
logger.info(f"group {cid} chat message")
text = msg.get("text", msg.get("caption"))
if text:
await handle_default(msg, state)

View File

@@ -1,53 +0,0 @@
from api import send_message, send_photo, get_userphotos, delete_message
from utils.mention import mention, userdata_extract
from storage import storage
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def show_request_msg(msg, auto = False):
logger.info("showing request with button")
chat_id = str(msg["chat"]["id"])
from_id = str(msg["from"]["id"])
lang = msg["from"].get("language_code", "ru")
reply_markup = {
"inline_keyboard": [[{"text": "❤️", "callback_data": "vouch" + from_id}]]
}
newcomer_message = (
"Нажмите, чтобы одобрить заявку "
if lang == "ru"
else "There is a newcomer, press the button if you are connected with "
)
r = await get_userphotos(user_id=from_id)
logger.debug(r)
if r["ok"] and r["result"]["total_count"] > 0:
logger.info("showing button with photo")
file_id = r["result"]["photos"][0][0]["file_id"]
_uid, identity, username = userdata_extract(msg["from"])
r = await send_photo(
chat_id,
file_id,
caption=newcomer_message + f"{identity}{username}",
reply_to=msg.get("message_id", ""),
reply_markup=reply_markup,
)
else:
logger.info("showing button without photo")
r = await send_message(
chat_id,
newcomer_message + mention(msg["from"]),
reply_to=msg.get("message_id", ""),
reply_markup=reply_markup,
)
logger.debug(r)
if "message_id" in r:
# удаляем предыдущее сообщение с кнопкой в этом чате
prevbtn = storage.get(f"btn-{chat_id}-{from_id}")
if prevbtn:
r = await delete_message(chat_id, prevbtn)
logger.debug(r)
# создаём новое
newbtn = r["message_id"]
logger.info(f"button message id: {newbtn}")
storage.set(f"btn-{chat_id}-{from_id}", newbtn)

View File

@@ -1,132 +0,0 @@
import asyncio
import logging
import sys
import signal # Import the signal module
from aiogram import Bot, Dispatcher, Router
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart
from aiogram.types import Message, ChatJoinRequest, CallbackQuery, ChatMemberUpdated
from aiogram.enums import ChatMemberStatus
from config import BOT_TOKEN, FEEDBACK_CHAT_ID
from handlers.routing import handle_routing
from handlers.callback_unlink import handle_unlink
from handlers.callback_vouch import handle_button
from handlers.handle_join_request import handle_join_request
from handlers.handle_startup import handle_startup
from handlers.handle_members_change import handle_join, handle_left
from state import State
from storage import Profile
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
router = Router()
bot = Bot(BOT_TOKEN, parse_mode=ParseMode.HTML)
dp = Dispatcher()
state = State()
@router.message(CommandStart())
async def command_start_handler(message: Message) -> None:
caption = "Напишите своё сообщение для нас" if message.from_user.llanguage_code == 'ru' else "Write your message for us"
await message.answer(caption)
@router.callback_query()
async def process_callback(callback_query: CallbackQuery):
cbq = vars(callback_query)
try:
cbq["from"] = vars(callback_query.from_user)
cbq["message"] = vars(callback_query.message)
data = cbq["data"]
if data.startswith("vouch"):
await handle_button(cbq)
elif data.startswith("unlink"):
await handle_unlink(cbq, state)
except Exception as e:
logger.error(f"[main.process_callback] ERROR {e}")
logger.debug(cbq)
import traceback
text = traceback.format_exc()
await bot.send_message(FEEDBACK_CHAT_ID, text)
@router.chat_join_request()
async def join_request_handler(update: ChatJoinRequest) -> None:
print("chat join request")
join_request = vars(update)
try:
join_request["from"] = vars(update.from_user)
join_request["chat"] = vars(update.chat)
await handle_join_request(join_request)
except Exception as e:
logger.error(f"[main.join_request_handler] ERROR {e}")
logger.debug(join_request)
import traceback
text = traceback.format_exc()
await bot.send_message(FEEDBACK_CHAT_ID, text)
@router.message()
async def all_handler(message: Message) -> None:
msg = vars(message)
try:
msg["from"] = vars(message.from_user)
msg["chat"] = vars(message.chat)
if message.reply_to_message:
msg["reply_to_message"] = vars(message.reply_to_message)
await handle_routing(msg, state)
await asyncio.sleep(1.0)
except Exception as e:
logger.error(f"[main.all_handler] ERROR {e}")
logger.debug(msg)
import traceback
text = traceback.format_exc()
await bot.send_message(FEEDBACK_CHAT_ID, text)
@router.my_chat_member()
async def chat_members_change(update: ChatMemberUpdated):
member_updated = vars(update)
try:
member_updated["chat"] = vars(update.chat)
member_updated["from"] = vars(update.from_user)
old_member = vars(member_updated["old_chat_member"])
new_member = vars(member_updated["new_chat_member"])
if old_member:
if old_member.status == ChatMemberStatus.KICKED:
Profile.erase(member_updated["from"]["id"])
await handle_left(member_updated)
elif new_member:
await handle_join(member_updated)
else:
logger.info("unhandled members update")
except Exception as e:
logger.error(f"[main.my_chat_member] ERROR {e}")
logger.debug(member_updated)
import traceback
text = traceback.format_exc()
await bot.send_message(FEEDBACK_CHAT_ID, text)
async def main() -> None:
# connect router
dp.include_router(router)
# storage revalidation
await handle_startup()
# Start event dispatching
await dp.start_polling(bot)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
# Define a function to handle SIGTERM
def handle_sigterm(signum, frame):
logger.info("Received SIGTERM. Shutting down gracefully...")
asyncio.get_event_loop().stop()
# Register the SIGTERM signal handler
signal.signal(signal.SIGTERM, handle_sigterm)
asyncio.get_event_loop().run_until_complete(main())

View File

@@ -1,37 +0,0 @@
from redis import Redis
from storage.profile import Profile as ProfileObj
from config import REDIS_URL
import json
# сохраняет сессии, айди кнопок в чатах для удаления и пересылаемые сообщения между перезагрузками
storage = Redis.from_url(REDIS_URL)
# хранение необходимой информации о пользователях
Profile = ProfileObj(storage)
# достаёт из хранилища jsonы по маске и количеству
def scan(match="usr-*", count=100):
cursor = 0
keys = []
r = storage
while True:
# Scan for keys starting with <match> in batches of <count>
cursor, batch_keys = r.scan(cursor=cursor, match=match, count=count)
keys += batch_keys
# If the cursor is 0, then we've reached the end of the keys
if cursor == 0:
break
# Get the values of all the keys
values = r.mget(keys)
# Parse the JSON data from each value
items = []
for value in values:
if value:
value_str = value.decode("utf-8")
i = json.loads(value_str)
items.append(i)
print(f"scan found {len(items)} items")
return keys, items

View File

@@ -1,47 +0,0 @@
import json
class Profile:
def __init__(self, storage):
self.storage = storage
def create(self, member_id, msg=None):
s = {"id": member_id, "parents": [], "children": [], "chats": []}
if msg:
if "from" in msg:
sender = msg.get("from")
s["mention"] = sender.get("username")
s[
"name"
] = f"{sender['first_name']} {sender.get('last_name', '')}".strip()
if "chat" in msg:
chat_id = str(msg["chat"]["id"])
if chat_id not in s["chats"]:
s["chats"].append(chat_id)
self.storage.set(f"usr-{member_id}", json.dumps(s))
return s
def save(self, s):
self.storage.set(f'usr-{s["id"]}', json.dumps(s))
def get(self, member_id, msg=None):
data = self.storage.get(f"usr-{member_id}")
if data is None:
r = self.create(member_id, msg)
else:
r = json.loads(data)
return r
def erase(self, member_id):
data = self.storage.get(f"usr-{member_id}")
if data:
member = json.loads(data)
for child in member["children"]:
child_member = self.storage.get(f"usr-{child}")
if child_member:
child_member = json.loads(child_member)
child_member["parents"].remove(member_id)
self.storage.set(f"usr-{child_member['id']}", json.dumps(child_member))

View File

@@ -1,79 +0,0 @@
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Define SVG code generation function with member_id parameter
def generate_chart(members, member_id=None):
if not member_id:
member_id = members[0]["id"]
# Define some constants for layout
node_radius = 30
node_spacing = 80
node_y = 100
parent_y_offset = 50
child_y_offset = 150
# Find the specified member and its descendants
member = None
descendants = set()
for m in members:
if m["id"] == member_id:
member = m
descendants.add(member_id)
break
# calculate only if links are founded
if member:
stack = member["children"].copy()
while stack:
child_id = stack.pop()
descendants.add(child_id)
for m in members:
if m["id"] == child_id:
stack.extend(m["children"])
break
# Define the x position for each member
x_positions = {}
x_positions[member_id] = 0
for i, m in enumerate(members):
if m["id"] in descendants:
x_positions[m["id"]] = (i * node_spacing) + node_radius
# Start building the SVG code
svg_width = (len(descendants) * node_spacing) + (2 * node_radius)
svg_height = 200
svg_code = f'<svg width="{svg_width}" height="{svg_height}">'
# Generate nodes and names for each descendant
for m in members:
if m["id"] in descendants:
node_x = x_positions[m["id"]]
node_code = f'<circle cx="{node_x}" cy="{node_y}" r="{node_radius}" stroke="black" stroke-width="2" fill="white"/>'
name_code = f'<text x="{node_x}" y="{node_y}" font-size="16" text-anchor="middle">{m["name"]}</text>'
svg_code += node_code + name_code
# Generate links to parent nodes
for parent_id in m["parents"]:
if parent_id in descendants:
parent_x = x_positions[parent_id]
link_code = f'<line x1="{node_x}" y1="{node_y - parent_y_offset}" x2="{parent_x}" y2="{node_y}" stroke="black" stroke-width="2"/>'
svg_code += link_code
# Generate links to child nodes
for child_id in m["children"]:
if child_id in descendants:
child_x = x_positions[child_id]
link_code = f'<line x1="{node_x}" y1="{node_y + child_y_offset}" x2="{child_x}" y2="{node_y}" stroke="black" stroke-width="2"/>'
svg_code += link_code
# Finish the SVG code
svg_code += "</svg>"
return svg_code.encode("utf-8")
else:
logger.error(f"no connections in graph for {member_id}")
logger.debug(members)
return ""

View File

@@ -1,21 +0,0 @@
def escape_username(username):
# Replace any non-ASCII and non-alphanumeric characters with underscores
return "".join(c if c.isalnum() or c.isspace() else "-" for c in username)
# generates a mention from standard telegram web json 'from' field
# using HTML markup
def mention(user):
uid, identity, username = userdata_extract(user)
identity = escape_username(identity)
return f'<a href="tg://user?id={uid}">{identity} {username}</a>'
def userdata_extract(user):
ln = " " + user.get('last_name', "") if user.get('last_name', "") else ""
identity = f"{user['first_name']}{ln}"
uid = user["id"]
username = user.get("username", "")
if username:
username = f"(@{username})"
return uid, identity, username