This commit is contained in:
parent
8965395377
commit
9a6c995589
|
@ -21,10 +21,12 @@ class RedisService:
|
|||
self._client = FakeRedis(decode_responses=True)
|
||||
else:
|
||||
self._client = await Redis.from_url(self._uri, decode_responses=True)
|
||||
logger.info("Redis connection was established.")
|
||||
|
||||
async def disconnect(self):
|
||||
if self._client:
|
||||
if isinstance(self._client, Redis):
|
||||
await self._client.close()
|
||||
logger.info("Redis connection was closed.")
|
||||
|
||||
async def execute(self, command, *args, **kwargs):
|
||||
if self._client:
|
||||
|
|
|
@ -35,7 +35,7 @@ class ViewedStorage:
|
|||
period = 60 * 60 # каждый час
|
||||
analytics_client: BetaAnalyticsDataClient | None = None
|
||||
auth_result = None
|
||||
disabled = False
|
||||
running = False
|
||||
start_date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
@staticmethod
|
||||
|
@ -57,7 +57,7 @@ class ViewedStorage:
|
|||
_task = asyncio.create_task(self.worker())
|
||||
else:
|
||||
logger.warning(" * please, add Google Analytics credentials file")
|
||||
self.disabled = True
|
||||
self.running = False
|
||||
|
||||
@staticmethod
|
||||
def load_precounted_views():
|
||||
|
@ -103,7 +103,7 @@ class ViewedStorage:
|
|||
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
|
||||
self = ViewedStorage
|
||||
logger.info(" ⎧ views update from Google Analytics ---")
|
||||
if not self.disabled:
|
||||
if self.running:
|
||||
try:
|
||||
start = time.time()
|
||||
async with self.lock:
|
||||
|
@ -142,7 +142,7 @@ class ViewedStorage:
|
|||
logger.info(" ⎪ views update time: %fs " % (end - start))
|
||||
except Exception as error:
|
||||
logger.error(error)
|
||||
self.disabled = True
|
||||
self.running = False
|
||||
|
||||
@staticmethod
|
||||
def get_shout(shout_slug="", shout_id=0) -> int:
|
||||
|
@ -178,25 +178,31 @@ class ViewedStorage:
|
|||
dictionary[key] = list(set(dictionary.get(key, []) + [value]))
|
||||
|
||||
# Обновление тем и авторов с использованием вспомогательной функции
|
||||
for [_shout_topic, topic] in (
|
||||
for [_st, topic] in (
|
||||
session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all()
|
||||
):
|
||||
update_groups(self.shouts_by_topic, topic.slug, shout_slug)
|
||||
|
||||
for [_shout_topic, author] in (
|
||||
for [_st, author] in (
|
||||
session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all()
|
||||
):
|
||||
update_groups(self.shouts_by_author, author.slug, shout_slug)
|
||||
|
||||
@staticmethod
|
||||
async def stop():
|
||||
"""Остановка фоновой задачи"""
|
||||
self = ViewedStorage
|
||||
async with self.lock:
|
||||
self.running = False
|
||||
logger.info("ViewedStorage worker was stopped.")
|
||||
|
||||
@staticmethod
|
||||
async def worker():
|
||||
"""Асинхронная задача обновления"""
|
||||
failed = 0
|
||||
self = ViewedStorage
|
||||
if self.disabled:
|
||||
return
|
||||
|
||||
while True:
|
||||
while self.running:
|
||||
try:
|
||||
await self.update_pages()
|
||||
failed = 0
|
||||
|
@ -206,6 +212,7 @@ class ViewedStorage:
|
|||
logger.info(" - update failed #%d, wait 10 secs" % failed)
|
||||
if failed > 3:
|
||||
logger.info(" - views update failed, not trying anymore")
|
||||
self.running = False
|
||||
break
|
||||
if failed == 0:
|
||||
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
|
||||
|
|
Loading…
Reference in New Issue
Block a user