unread-fixes
This commit is contained in:
@@ -1,16 +1,22 @@
|
||||
from services.rediscache import redis
|
||||
import json
|
||||
|
||||
|
||||
async def get_unread_counter(chat_id: str, author_id: int) -> int:
|
||||
unread: int = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}") or 0
|
||||
return unread
|
||||
|
||||
r = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}")
|
||||
if isinstance(r, str):
|
||||
return int(r)
|
||||
elif isinstance(r, int):
|
||||
return r
|
||||
else:
|
||||
return 0
|
||||
|
||||
async def get_total_unread_counter(author_id: int) -> int:
|
||||
chats_set = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
|
||||
unread = 0
|
||||
if chats_set:
|
||||
for chat_id in list(chats_set):
|
||||
n = await get_unread_counter(chat_id, author_id)
|
||||
unread += n
|
||||
return unread
|
||||
s = 0
|
||||
if isinstance(chats_set, str):
|
||||
chats_set = json.loads(chats_set)
|
||||
if isinstance(chats_set, list):
|
||||
for chat_id in chats_set:
|
||||
s += await get_unread_counter(chat_id, author_id)
|
||||
return s
|
||||
|
@@ -1,77 +1,30 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import os
|
||||
from typing import Dict
|
||||
from logging import Logger
|
||||
import logging
|
||||
import time
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from os import environ
|
||||
import logging
|
||||
from gql import Client, gql
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
from graphql import DocumentNode
|
||||
from orm.author import Author
|
||||
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from orm.topic import Topic
|
||||
from services.db import local_session
|
||||
# ga
|
||||
from apiclient.discovery import build
|
||||
from google.oauth2.service_account import Credentials
|
||||
import pandas as pd
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger = logging.getLogger("\t[services.viewed]\t")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", '/dump/google-service.json')
|
||||
|
||||
load_facts = gql(
|
||||
""" query getDomains {
|
||||
domains {
|
||||
id
|
||||
title
|
||||
facts {
|
||||
activeVisitors
|
||||
viewsToday
|
||||
viewsMonth
|
||||
viewsYear
|
||||
}
|
||||
} } """
|
||||
)
|
||||
|
||||
load_pages = gql(
|
||||
""" query getDomains {
|
||||
domains {
|
||||
title
|
||||
statistics {
|
||||
pages(sorting: TOP) {
|
||||
# id
|
||||
count
|
||||
# created
|
||||
value
|
||||
}
|
||||
}
|
||||
} } """
|
||||
)
|
||||
|
||||
create_record_mutation_string = """
|
||||
createRecord(domainId: $domainId, input: $input) {
|
||||
payload {
|
||||
id
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
create_record_mutation = gql(f"mutation {{{create_record_mutation_string}}}")
|
||||
|
||||
schema_str = open("schemas/ackee.graphql").read()
|
||||
token = environ.get("ACKEE_TOKEN", "")
|
||||
domain_id = environ.get("ACKEE_DOMAIN_ID", "")
|
||||
ackee_site = environ.get("ACKEE_SITE", "https://testing.discours.io/")
|
||||
|
||||
|
||||
def create_client(headers=None, schema=None):
|
||||
transport = AIOHTTPTransport(
|
||||
url="https://ackee.discours.io/api",
|
||||
headers=headers,
|
||||
# Build Analytics Reporting API V4 service object.
|
||||
def get_service():
|
||||
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
|
||||
credentials = Credentials.from_service_account_file(
|
||||
GOOGLE_KEYFILE_PATH, scopes=SCOPES
|
||||
)
|
||||
return Client(schema=schema, transport=transport)
|
||||
service = build(serviceName='analyticsreporting', version='v4', credentials=credentials)
|
||||
return service
|
||||
|
||||
|
||||
class ViewedStorage:
|
||||
@@ -81,21 +34,20 @@ class ViewedStorage:
|
||||
shouts_by_author = {}
|
||||
views = None
|
||||
pages = None
|
||||
domains = None
|
||||
facts = None
|
||||
period = 60 * 60 # every hour
|
||||
client: Client | None = None
|
||||
analytics_client = None
|
||||
auth_result = None
|
||||
disabled = False
|
||||
|
||||
@staticmethod
|
||||
async def init():
|
||||
"""graphql client connection using permanent token"""
|
||||
"""Google Analytics client connection using authentication"""
|
||||
self = ViewedStorage
|
||||
async with self.lock:
|
||||
if token:
|
||||
self.client = create_client({"Authorization": f"Bearer {token}"}, schema=schema_str)
|
||||
logger.info(" * authorized permanently by ackee.discours.io: %s" % token)
|
||||
if os.path.exists(GOOGLE_KEYFILE_PATH):
|
||||
self.analytics_client = get_service()
|
||||
logger.info(" * authorized permanently by Google Analytics")
|
||||
|
||||
# Load pre-counted views from the JSON file
|
||||
self.load_precounted_views()
|
||||
@@ -103,7 +55,7 @@ class ViewedStorage:
|
||||
views_stat_task = asyncio.create_task(self.worker())
|
||||
logger.info(views_stat_task)
|
||||
else:
|
||||
logger.info(" * please set ACKEE_TOKEN")
|
||||
logger.info(" * please add Google Analytics keyfile")
|
||||
self.disabled = True
|
||||
|
||||
@staticmethod
|
||||
|
Reference in New Issue
Block a user