update-redis-api
Some checks failed
Deploy to core / deploy (push) Failing after 1m27s

This commit is contained in:
Untone 2024-02-21 16:06:24 +03:00
parent 33330fb052
commit 63f5a708b7
7 changed files with 147 additions and 83 deletions

View File

@ -7,6 +7,7 @@ from ariadne.asgi import GraphQL
from starlette.applications import Starlette
from starlette.routing import Route
from resolvers.author_events import update_cache, scheduled_cache_update
from services.rediscache import redis
from services.schema import resolvers
from services.search import search_service
@ -36,6 +37,8 @@ app = Starlette(
on_startup=[
redis.connect,
ViewedStorage.init,
update_cache,
scheduled_cache_update,
search_service.info,
# start_sentry,
start,

View File

@ -1,11 +1,6 @@
import time
import asyncio
from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String
from sqlalchemy import event, select
from services.rediscache import redis
from orm.topic import Topic, TopicFollower
from services.db import Base
@ -43,77 +38,3 @@ class Author(Base):
last_seen = Column(Integer, nullable=False, default=lambda: int(time.time()))
updated_at = Column(Integer, nullable=False, default=lambda: int(time.time()))
deleted_at = Column(Integer, nullable=True, comment="Deleted at")
@event.listens_for(Author, "after_insert")
@event.listens_for(Author, "after_update")
def after_author_update(mapper, connection, target):
redis_key = f"user:{target.user}:author"
asyncio.create_task(redis.execute("HSET", redis_key, **vars(target)))
async def update_follows_for_user(connection, user_id, entity_type, entity, is_insert):
redis_key = f"user:{user_id}:follows"
follows = await redis.execute("HGET", redis_key)
if not follows:
follows = {
"topics": [],
"authors": [],
"communities": [
{"slug": "discours", "name": "Дискурс", "id": 1, "desc": ""}
],
}
if is_insert:
follows[f"{entity_type}s"].append(entity)
else:
# Remove the entity from follows
follows[f"{entity_type}s"] = [
e for e in follows[f"{entity_type}s"] if e["id"] != entity.id
]
await redis.execute("HSET", redis_key, **vars(follows))
async def handle_author_follower_change(connection, author_id, follower_id, is_insert):
async with connection.begin() as conn:
author = await conn.execute(select(Author).filter(Author.id == author_id)).first()
follower = await conn.execute(select(Author).filter(Author.id == follower_id)).first()
if follower and author:
await update_follows_for_user(
connection, follower.user, "author", author, is_insert
)
async def handle_topic_follower_change(connection, topic_id, follower_id, is_insert):
topic = connection.execute(select(Topic).filter(Topic.id == topic_id)).first()
follower = connection.execute(
select(Author).filter(Author.id == follower_id)
).first()
if follower and topic:
await update_follows_for_user(
connection, follower.user, "topic", topic, is_insert
)
@event.listens_for(TopicFollower, "after_insert")
def after_topic_follower_insert(mapper, connection, target):
asyncio.create_task(handle_topic_follower_change(connection, target.topic, target.follower, True))
@event.listens_for(TopicFollower, "after_delete")
def after_topic_follower_delete(mapper, connection, target):
asyncio.create_task(handle_topic_follower_change(connection, target.topic, target.follower, False))
@event.listens_for(AuthorFollower, "after_insert")
def after_author_follower_insert(mapper, connection, target):
asyncio.create_task(handle_author_follower_change(
connection, target.author, target.follower, True
))
@event.listens_for(AuthorFollower, "after_delete")
def after_author_follower_delete(mapper, connection, target):
asyncio.create_task(handle_author_follower_change(
connection, target.author, target.follower, False
))

View File

@ -22,6 +22,7 @@ opensearch-py = "^2.4.2"
httpx = "^0.26.0"
dogpile-cache = "^1.3.1"
colorlog = "^6.8.2"
aiocron = "^1.8"
[tool.poetry.group.dev.dependencies]
ruff = "^0.2.1"

View File

@ -192,7 +192,7 @@ async def get_author(_, _info, slug="", author_id=None):
async def get_author_by_user_id(user_id: str):
redis_key = f"user:{user_id}:author"
res = await redis.execute("HGET", redis_key)
res = await redis.hget(redis_key)
if isinstance(res, dict) and res.get("id"):
logger.debug(f"got cached author: {res}")
return res
@ -200,7 +200,7 @@ async def get_author_by_user_id(user_id: str):
logger.info(f"getting author id for {user_id}")
q = select(Author).filter(Author.user == user_id)
author = await load_author_with_stats(q)
await redis.execute("HSET", redis_key, **author.dict())
await redis.hset(redis_key, **author.dict())
return author

128
resolvers/author_events.py Normal file
View File

@ -0,0 +1,128 @@
import asyncio
from aiocron import crontab
from sqlalchemy import select, event
from orm.author import Author, AuthorFollower
from orm.topic import Topic, TopicFollower
from resolvers.author import add_author_stat_columns, get_author_follows
from resolvers.topic import add_topic_stat_columns
from services.db import local_session
from services.rediscache import redis
from services.viewed import ViewedStorage
async def update_cache():
with local_session() as session:
for author in session.query(Author).all():
redis_key = f"user:{author.user}:author"
await redis.hset(redis_key, **vars(author))
follows = await get_author_follows(None, None, user=author.user)
if isinstance(follows, dict):
redis_key = f"user:{author.user}:follows"
await redis.hset(redis_key, **follows)
@crontab("*/10 * * * *", func=update_cache)
async def scheduled_cache_update():
pass
@event.listens_for(Author, "after_insert")
@event.listens_for(Author, "after_update")
def after_author_update(mapper, connection, target):
redis_key = f"user:{target.user}:author"
asyncio.create_task(redis.hset(redis_key, **vars(target)))
@event.listens_for(TopicFollower, "after_insert")
def after_topic_follower_insert(mapper, connection, target):
asyncio.create_task(
handle_topic_follower_change(connection, target.topic, target.follower, True)
)
@event.listens_for(TopicFollower, "after_delete")
def after_topic_follower_delete(mapper, connection, target):
asyncio.create_task(
handle_topic_follower_change(connection, target.topic, target.follower, False)
)
@event.listens_for(AuthorFollower, "after_insert")
def after_author_follower_insert(mapper, connection, target):
asyncio.create_task(
handle_author_follower_change(connection, target.author, target.follower, True)
)
@event.listens_for(AuthorFollower, "after_delete")
def after_author_follower_delete(mapper, connection, target):
asyncio.create_task(
handle_author_follower_change(connection, target.author, target.follower, False)
)
async def update_follows_for_user(connection, user_id, entity_type, entity, is_insert):
redis_key = f"user:{user_id}:follows"
follows = await redis.hget(redis_key)
if not follows:
follows = {
"topics": [],
"authors": [],
"communities": [
{"slug": "discours", "name": "Дискурс", "id": 1, "desc": ""}
],
}
if is_insert:
follows[f"{entity_type}s"].append(entity)
else:
# Remove the entity from follows
follows[f"{entity_type}s"] = [
e for e in follows[f"{entity_type}s"] if e["id"] != entity.id
]
await redis.hset(redis_key, **vars(follows))
async def handle_author_follower_change(connection, author_id, follower_id, is_insert):
q = select(Author).filter(Author.id == author_id)
q = add_author_stat_columns(q)
async with connection.begin() as conn:
[author, shouts_stat, followers_stat, followings_stat] = await conn.execute(
q
).first()
author.stat = {
"shouts": shouts_stat,
"viewed": await ViewedStorage.get_author(author.slug),
"followers": followers_stat,
"followings": followings_stat,
}
follower = await conn.execute(
select(Author).filter(Author.id == follower_id)
).first()
if follower and author:
await update_follows_for_user(
connection, follower.user, "author", author, is_insert
)
async def handle_topic_follower_change(connection, topic_id, follower_id, is_insert):
q = select(Topic).filter(Topic.id == topic_id)
q = add_topic_stat_columns(q)
async with connection.begin() as conn:
[topic, shouts_stat, authors_stat, followers_stat] = await conn.execute(
q
).first()
topic.stat = {
"shouts": shouts_stat,
"authors": authors_stat,
"followers": followers_stat,
"viewed": await ViewedStorage.get_topic(topic.slug),
}
follower = connection.execute(
select(Author).filter(Author.id == follower_id)
).first()
if follower and topic:
await update_follows_for_user(
connection, follower.user, "topic", topic, is_insert
)

View File

@ -131,13 +131,13 @@ def query_follows(user_id: str):
async def get_follows_by_user_id(user_id: str):
if user_id:
redis_key = f"user:{user_id}:follows"
res = await redis.execute("HGET", redis_key)
res = await redis.hget(redis_key)
if res:
return res
logger.debug(f"getting follows for {user_id}")
follows = query_follows(user_id)
await redis.execute("HSET", redis_key, **follows)
await redis.hset(redis_key, **follows)
return follows

View File

@ -21,6 +21,10 @@ class RedisCache:
if self._client:
try:
logger.debug(f"{command} {args} {kwargs}")
for arg in args:
if isinstance(arg, dict):
if arg.get('_sa_instance_state'):
del arg['_sa_instance_state']
r = await self._client.execute_command(command, *args, **kwargs)
logger.debug(type(r))
logger.debug(r)
@ -48,6 +52,13 @@ class RedisCache:
return
await self._client.publish(channel, data)
async def hset(self, hash_key: str, fields_values: dict):
return await self._client.hset(hash_key, mapping=fields_values)
async def hget(self, hash_key: str):
return await self._client.hget(hash_key)
redis = RedisCache()