This commit is contained in:
Tony Rewin 2023-10-04 01:54:52 +03:00
parent 04c0f3d6e7
commit f8b00cc1b4
6 changed files with 17 additions and 11 deletions

View File

@ -11,6 +11,3 @@
__Redis__:
- Для каждого пользователя создаётся запись в хранилищах `chats_by_author/<chat_id>` и `chats/<chat_id>` и канал redis `chat:<chat_id>`, в котором публикуюутся обновления всех переписок.
__SSE__:
- Каждый пользователь подписывается на свой канал по урлу `/sse/<auth_token>`

View File

@ -38,6 +38,4 @@ class Author(Base):
lastSeen = Column(DateTime, nullable=False, default=datetime.now)
deletedAt = Column(DateTime, nullable=True, comment="Deleted at")
links = Column(JSONType, nullable=True, comment="Links")
ratings = relationship(
AuthorRating, foreign_keys=AuthorRating.author, nullable=True
)
ratings = relationship(AuthorRating, foreign_keys=AuthorRating.author)

View File

@ -9,7 +9,7 @@ uvicorn
httpx
itsdangerous
pydantic
psycopg2
psycopg2-binary
######## development deps
isort
brunette

View File

@ -54,9 +54,10 @@ async def create_message(_, info, chat: str, body: str, reply_to=None):
# subscribe on updates
channel_name = (
f"private:{author_id}" if not chat["title"] else f"group:{chat['id']}"
f"chat:{chat['id']}" if not chat["title"] else f"group:{chat['id']}"
)
redis.execute("PUBLISH", channel_name, json.dumps(new_message))
new_message["kind"] = "new_message"
await redis.execute_pubsub("PUBLISH", channel_name, json.dumps(new_message))
return {"message": new_message, "error": None}

View File

@ -4,6 +4,7 @@ from settings import PORT
def exception_handler(exception_type, exception, traceback, debug_hook=sys.excepthook):
print(traceback)
print("%s: %s" % (exception_type.__name__, exception))

View File

@ -28,6 +28,15 @@ class RedisCache:
except Exception:
pass
async def execute_pubsub(self, command, *args, **kwargs):
while not self._redis:
await asyncio.sleep(1)
try:
print("[redis] " + command + " " + " ".join(args))
return await self._redis.execute_pubsub(command, *args, **kwargs)
except Exception:
pass
async def subscribe(self, *channels):
if not self._redis:
await self.connect()