notifier-integration
Some checks failed
Deploy on push / deploy (push) Failing after 19s

This commit is contained in:
Untone 2024-03-04 10:35:33 +03:00
parent ad0dc98bc9
commit 3016a75332
8 changed files with 386 additions and 0 deletions

41
orm/notification.py Normal file
View File

@ -0,0 +1,41 @@
import time
from enum import Enum as Enumeration
from sqlalchemy import JSON, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from orm.author import Author
from services.db import Base
class NotificationEntity(Enumeration):
REACTION = 'reaction'
SHOUT = 'shout'
FOLLOWER = 'follower'
class NotificationAction(Enumeration):
CREATE = 'create'
UPDATE = 'update'
DELETE = 'delete'
SEEN = 'seen'
FOLLOW = 'follow'
UNFOLLOW = 'unfollow'
class NotificationSeen(Base):
__tablename__ = 'notification_seen'
viewer = Column(ForeignKey('author.id'))
notification = Column(ForeignKey('notification.id'))
class Notification(Base):
__tablename__ = 'notification'
created_at = Column(Integer, server_default=str(int(time.time())))
entity = Column(String, nullable=False)
action = Column(String, nullable=False)
payload = Column(JSON, nullable=True)
seen = relationship(lambda: Author, secondary='notification_seen')

View File

@ -43,6 +43,7 @@ from resolvers.topic import (
) )
__all__ = [ __all__ = [
# author # author
'get_author', 'get_author',

287
resolvers/notifier.py Normal file
View File

@ -0,0 +1,287 @@
import json
import time
from typing import List, Tuple
from sqlalchemy.exc import SQLAlchemyError
from services.auth import login_required
from services.schema import mutation, query
from sqlalchemy import and_, select
from sqlalchemy.orm import aliased
from sqlalchemy.sql import not_
from orm.notification import (
Notification,
NotificationAction,
NotificationEntity,
NotificationSeen,
)
from services.db import local_session
from services.logger import root_logger as logger
def query_notifications(author_id: int, after: int = 0) -> Tuple[int, int, List[Tuple[Notification, bool]]]:
notification_seen_alias = aliased(NotificationSeen)
q = (
select(Notification, notification_seen_alias.viewer.label("seen"))
.outerjoin(
NotificationSeen,
and_(
NotificationSeen.viewer == author_id,
NotificationSeen.notification == Notification.id,
),
)
)
if after:
q = q.filter(Notification.created_at > after)
q = q.group_by(NotificationSeen.notification, Notification.created_at)
with local_session() as session:
total = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
)
)
.count()
)
unread = (
session.query(Notification)
.filter(
and_(
Notification.action == NotificationAction.CREATE.value,
Notification.created_at > after,
not_(Notification.seen),
)
)
.count()
)
notifications_result = session.execute(q)
notifications = []
for n, seen in notifications_result:
notifications.append((n, seen))
return total, unread, notifications
def group_shout(shout_dict, seen: bool, action: str):
return {
"thread": f'shout-{shout_dict.get("id")}',
"entity": 'shout',
"shout": shout_dict,
"authors": shout_dict.get('authors'),
"updated_at": shout_dict.get('created_at'),
"reactions": [],
"action": action,
"seen": seen
}
def group_reaction(reaction_dict, seen: bool, action):
thread_id = reaction_dict['shout']
if reaction_dict['kind'] == "COMMENT" and reaction_dict.get('reply_to'):
thread_id += f"shout-{reaction_dict.get('shout')}::{reaction_dict.get('reply_to')}"
return {
"thread": thread_id,
"entity": 'reaction',
"updated_at": reaction_dict['created_at'],
"reactions": [reaction_dict['id']],
"shout": reaction_dict.get('shout'),
"authors": [reaction_dict.get('created_by'), ],
"action": action,
"seen": seen
}
def group_follower(follower, seen: bool):
return {
"thread": "followers",
"authors": [follower],
"updated_at": int(time.time()),
"shout": None,
"reactions": [],
"entity": "follower",
"action": "follow",
"seen": seen
}
def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10):
"""
Retrieves notifications for a given author.
Args:
author_id (int): The ID of the author for whom notifications are retrieved.
after (int, optional): If provided, selects only notifications created after this timestamp will be considered.
limit (int, optional): The maximum number of groupa to retrieve.
Returns:
Dict[str, NotificationGroup], int, int: A dictionary where keys are thread IDs
and values are NotificationGroup objects, unread and total amounts.
This function queries the database to retrieve notifications for the specified author, considering optional filters.
The result is a dictionary where each key is a thread ID, and the corresponding value is a NotificationGroup
containing information about the notifications within that thread.
NotificationGroup structure:
{
entity: str, # Type of entity (e.g., 'reaction', 'shout', 'follower').
updated_at: int, # Timestamp of the latest update in the thread.
shout: Optional[NotificationShout]
reactions: List[int], # List of reaction ids within the thread.
authors: List[NotificationAuthor], # List of authors involved in the thread.
}
"""
total, unread, notifications = query_notifications(author_id, after)
groups_by_thread = {}
groups_amount = 0
for notification, seen in notifications:
if groups_amount >= limit:
break
payload = notification.payload
if notification.entity == NotificationEntity.SHOUT.value:
group = group_shout(payload, seen, notification.action)
thread_id = group.get('thread')
groups_by_thread[thread_id] = group
groups_amount += 1
elif notification.entity == NotificationEntity.REACTION.value:
shout_id = payload.get('shout')
author_id = payload.get('created_by')
reply_id = payload.get('reply_to')
thread_id = f'shout-{shout_id}'
if reply_id and payload.get('kind', '').lower() == 'comment':
thread_id += f'{reply_id}'
existing_group = groups_by_thread.get(thread_id)
if existing_group:
existing_group['seen'] = False
existing_group['authors'].append(author_id)
existing_group['reactions'] = existing_group['reactions'] or []
existing_group['reactions'].append(payload)
groups_by_thread[thread_id] = existing_group
else:
group = group_reaction(payload, seen, notification.action)
if group:
groups_by_thread[thread_id] = group
groups_amount += 1
elif notification.entity == "follower":
thread_id = 'followers' if notification.action == 'follow' else 'unfollowers'
group = groups_by_thread.get(thread_id)
if group:
group['authors'].append(payload)
else:
group = group_follower(payload, seen)
groups_amount += 1
groups_by_thread[thread_id] = group
return groups_by_thread, unread, total
@query.field('load_notifications')
@login_required
async def load_notifications(_, info, after: int, limit: int = 50):
author_id = info.context.get("author_id")
if author_id:
groups, unread, total = get_notifications_grouped(author_id, after, limit)
notifications = sorted(groups.values(), key=lambda group: group.updated_at, reverse=True)
return {"notifications": notifications, "total": total, "unread": unread, "error": None}
return {"notifications": [], "total": 0, "unread": 0, "error": None}
@mutation.field('notification_mark_seen')
@login_required
async def notification_mark_seen(_, info, notification_id: int):
author_id = info.context.get('author_id')
if author_id:
with local_session() as session:
try:
ns = NotificationSeen(notification=notification_id, viewer=author_id)
session.add(ns)
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error(f'seen mutation failed: {e}')
return {"error": 'cant mark as read'}
return {"error": None}
@mutation.field('notifications_seen_after')
@login_required
async def notifications_seen_after(_, info, after: int):
# TODO: use latest loaded notification_id as input offset parameter
error = None
try:
author_id = info.context.get('author_id')
if author_id:
with local_session() as session:
nnn = session.query(Notification).filter(and_(Notification.created_at > after)).all()
for n in nnn:
try:
ns = NotificationSeen(notification=n.id, viewer=author_id)
session.add(ns)
session.commit()
except SQLAlchemyError:
session.rollback()
except Exception as e:
print(e)
error = 'cant mark as read'
return {"error": error}
@mutation.field('notifications_seen_thread')
@login_required
async def notifications_seen_thread(_, info, thread: str, after: int):
error = None
author_id = info.context.get('author_id')
if author_id:
[shout_id, reply_to_id] = thread.split('::')
with local_session() as session:
# TODO: handle new follower and new shout notifications
new_reaction_notifications = (
session.query(Notification)
.filter(
Notification.action == 'create',
Notification.entity == 'reaction',
Notification.created_at > after,
)
.all()
)
removed_reaction_notifications = (
session.query(Notification)
.filter(
Notification.action == 'delete',
Notification.entity == 'reaction',
Notification.created_at > after,
)
.all()
)
exclude = set()
for nr in removed_reaction_notifications:
reaction = json.loads(nr.payload)
reaction_id = reaction.get('id')
exclude.add(reaction_id)
for n in new_reaction_notifications:
reaction = json.loads(n.payload)
reaction_id = reaction.get('id')
if (
reaction_id not in exclude
and str(reaction.get('shout')) == str(shout_id)
and str(reaction.get('reply_to')) == str(reply_to_id)
):
try:
ns = NotificationSeen(notification=n.id, viewer=author_id)
session.add(ns)
session.commit()
except Exception as e:
logger.warn(e)
session.rollback()
else:
error = 'You are not logged in'
return {"error": error}

View File

@ -79,3 +79,8 @@ input ReactionBy {
after: Int after: Int
sort: ReactionSort sort: ReactionSort
} }
input NotificationSeenInput {
notifications: [Int]
thread: Int
}

View File

@ -28,4 +28,9 @@ type Mutation {
remove_invite(invite_id: Int!): CommonResult! remove_invite(invite_id: Int!): CommonResult!
accept_invite(invite_id: Int!): CommonResult! accept_invite(invite_id: Int!): CommonResult!
reject_invite(invite_id: Int!): CommonResult! reject_invite(invite_id: Int!): CommonResult!
# notifier
notification_mark_seen(notification_id: Int!, seen: Boolean): CommonResult!
notifications_seen_after(after: Int!, seen: Boolean): CommonResult!
notifications_seen_thread(thread_id: String!, seen: Boolean): CommonResult!
} }

View File

@ -41,4 +41,7 @@ type Query {
get_topics_random(amount: Int): [Topic] get_topics_random(amount: Int): [Topic]
get_topics_by_author(slug: String, user: String, author_id: Int): [Topic] get_topics_by_author(slug: String, user: String, author_id: Int): [Topic]
get_topics_by_community(slug: String, community_id: Int): [Topic] get_topics_by_community(slug: String, community_id: Int): [Topic]
# notifier
get_notifications: NotificationsResult!
} }

View File

@ -178,3 +178,34 @@ type AuthorFollows {
# shouts: [Shout] # shouts: [Shout]
communities: [Community] communities: [Community]
} }
type Notification {
id: Int!
action: String!
entity: String!
created_at: Int!
payload: String!
seen: [Author]
}
type NotificationSeenResult {
error: String
}
type NotificationGroup {
id: Int!
authors: [Author]
updated_at: Int!
entity: String!
action: String
shout: Shout
reactions: [Reaction]
seen: Boolean
}
type NotificationsResult {
notifications: [NotificationGroup!]!
unread: Int!
total: Int!
error: String
}

View File

@ -1,12 +1,22 @@
import json import json
from orm.notification import Notification
from services.db import local_session
from services.rediscache import redis from services.rediscache import redis
def save_notification(action: str, entity: str, payload):
with local_session() as session:
n = Notification(action=action, entity=entity, payload=payload)
session.add(n)
session.commit()
async def notify_reaction(reaction, action: str = 'create'): async def notify_reaction(reaction, action: str = 'create'):
channel_name = 'reaction' channel_name = 'reaction'
data = {'payload': reaction, 'action': action} data = {'payload': reaction, 'action': action}
try: try:
save_notification(action, channel_name, data.get('payload'))
await redis.publish(channel_name, json.dumps(data)) await redis.publish(channel_name, json.dumps(data))
except Exception as e: except Exception as e:
print(f'[services.notify] Failed to publish to channel {channel_name}: {e}') print(f'[services.notify] Failed to publish to channel {channel_name}: {e}')
@ -16,6 +26,7 @@ async def notify_shout(shout, action: str = 'update'):
channel_name = 'shout' channel_name = 'shout'
data = {'payload': shout, 'action': action} data = {'payload': shout, 'action': action}
try: try:
save_notification(action, channel_name, data.get('payload'))
await redis.publish(channel_name, json.dumps(data)) await redis.publish(channel_name, json.dumps(data))
except Exception as e: except Exception as e:
print(f'[services.notify] Failed to publish to channel {channel_name}: {e}') print(f'[services.notify] Failed to publish to channel {channel_name}: {e}')
@ -36,6 +47,8 @@ async def notify_follower(follower: dict, author_id: int, action: str = 'follow'
if not json_data: if not json_data:
raise ValueError('Empty data to publish.') raise ValueError('Empty data to publish.')
save_notification(action, channel_name, data.get('payload'))
# Use the 'await' keyword when publishing # Use the 'await' keyword when publishing
await redis.publish(channel_name, json_data) await redis.publish(channel_name, json_data)