viewed-service-fixes
All checks were successful
deploy / deploy (push) Successful in 1m23s

This commit is contained in:
2023-12-17 23:30:20 +03:00
parent 2c6b872acb
commit a6c5243c06
20 changed files with 110 additions and 109 deletions

View File

@@ -1,4 +1,5 @@
from functools import wraps
import aiohttp
from aiohttp.web import HTTPUnauthorized

View File

@@ -1,4 +1,5 @@
import json
from services.rediscache import redis

View File

@@ -1,4 +1,5 @@
import redis.asyncio as aredis
from settings import REDIS_URL
@@ -45,16 +46,6 @@ class RedisCache:
return
await self._client.publish(channel, data)
async def lrange(self, key, start, stop):
if self._client:
print(f"[redis] LRANGE {key} {start} {stop}")
return await self._client.lrange(key, start, stop)
async def mget(self, key, *keys):
if self._client:
print(f"[redis] MGET {key} {keys}")
return await self._client.mget(key, *keys)
redis = RedisCache()

View File

@@ -1,5 +1,4 @@
from ariadne import QueryType, MutationType # , ScalarType
from ariadne import MutationType, QueryType # , ScalarType
# datetime_scalar = ScalarType("DateTime")
query = QueryType()

View File

@@ -1,8 +1,11 @@
import asyncio
import json
from typing import List
import aiohttp
from services.rediscache import redis
from orm.shout import Shout
from services.rediscache import redis
class SearchService:
@@ -16,11 +19,12 @@ class SearchService:
SearchService.cache = {}
@staticmethod
async def search(text, limit, offset) -> [Shout]:
async def search(text, limit: int = 50, offset: int = 0) -> List[Shout]:
cached = await redis.execute("GET", text)
if not cached:
async with SearchService.lock:
# Use aiohttp to send a request to ElasticSearch
# TODO: add limit offset usage
async with aiohttp.ClientSession() as session:
search_url = f"https://search.discours.io/search?q={text}"
async with session.get(search_url) as response:

View File

@@ -2,14 +2,15 @@ from services.rediscache import redis
async def get_unread_counter(chat_id: str, author_id: int) -> int:
unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}")
return unread or 0
unread: int = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}") or 0
return unread
async def get_total_unread_counter(author_id: int) -> int:
chats_set = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
unread = 0
for chat_id in list(chats_set):
n = await get_unread_counter(chat_id, author_id)
unread += n
if chats_set:
for chat_id in list(chats_set):
n = await get_unread_counter(chat_id, author_id)
unread += n
return unread

View File

@@ -1,14 +1,14 @@
import asyncio
import time
from datetime import timedelta, timezone, datetime
from datetime import datetime, timedelta, timezone
from os import environ
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from services.db import local_session
from orm.shout import Shout, ShoutTopic
from orm.topic import Topic
from orm.shout import ShoutTopic, Shout
from services.db import local_session
load_facts = gql(
""" query getDomains {
@@ -60,7 +60,7 @@ class ViewedStorage:
pages = None
domains = None
period = 60 * 60 # every hour
client = None
client: Client | None = None
auth_result = None
disabled = False
@@ -70,7 +70,7 @@ class ViewedStorage:
self = ViewedStorage
async with self.lock:
if token:
self.client = create_client({"Authorization": "Bearer %s" % str(token)}, schema=schema_str)
self.client = create_client({"Authorization": f"Bearer {token}"}, schema=schema_str)
print("[services.viewed] * authorized permanently by ackee.discours.io: %s" % token)
else:
print("[services.viewed] * please set ACKEE_TOKEN")
@@ -83,22 +83,19 @@ class ViewedStorage:
start = time.time()
self = ViewedStorage
try:
async with self.client as session:
self.pages = await session.execute(load_pages)
if self.client:
self.pages = self.client.execute(load_pages)
self.pages = self.pages["domains"][0]["statistics"]["pages"]
shouts = {}
try:
for page in self.pages:
p = page["value"].split("?")[0]
slug = p.split("discours.io/")[-1]
shouts[slug] = page["count"]
for slug in shouts.keys():
await ViewedStorage.increment(slug, shouts[slug])
except Exception:
pass
for page in self.pages:
p = page["value"].split("?")[0]
slug = p.split("discours.io/")[-1]
shouts[slug] = page["count"]
for slug in shouts.keys():
await ViewedStorage.increment(slug, shouts[slug])
print("[services.viewed] ⎪ %d pages collected " % len(shouts.keys()))
except Exception as e:
raise e
raise Exception(e)
end = time.time()
print("[services.viewed] ⎪ update_pages took %fs " % (end - start))
@@ -106,8 +103,14 @@ class ViewedStorage:
@staticmethod
async def get_facts():
self = ViewedStorage
async with self.lock:
return await self.client.execute(load_facts)
facts = []
try:
if self.client:
async with self.lock:
facts = self.client.execute(load_facts)
except Exception as er:
print(f"[services.viewed] get_facts error: {er}")
return facts or []
@staticmethod
async def get_shout(shout_slug):
@@ -138,7 +141,7 @@ class ViewedStorage:
"""updates topics counters by shout slug"""
self = ViewedStorage
with local_session() as session:
for [shout_topic, topic] in (
for [_shout_topic, topic] in (
session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all()
):
if not self.by_topics.get(topic.slug):