diff --git a/services/redis.py b/services/redis.py index 636aef01..8fcc3f9f 100644 --- a/services/redis.py +++ b/services/redis.py @@ -21,10 +21,12 @@ class RedisService: self._client = FakeRedis(decode_responses=True) else: self._client = await Redis.from_url(self._uri, decode_responses=True) + logger.info("Redis connection was established.") async def disconnect(self): - if self._client: + if isinstance(self._client, Redis): await self._client.close() + logger.info("Redis connection was closed.") async def execute(self, command, *args, **kwargs): if self._client: diff --git a/services/viewed.py b/services/viewed.py index c9d8f5be..af95d270 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -35,7 +35,7 @@ class ViewedStorage: period = 60 * 60 # каждый час analytics_client: BetaAnalyticsDataClient | None = None auth_result = None - disabled = False + running = False start_date = datetime.now().strftime("%Y-%m-%d") @staticmethod @@ -57,7 +57,7 @@ class ViewedStorage: _task = asyncio.create_task(self.worker()) else: logger.warning(" * please, add Google Analytics credentials file") - self.disabled = True + self.running = False @staticmethod def load_precounted_views(): @@ -103,7 +103,7 @@ class ViewedStorage: """Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров""" self = ViewedStorage logger.info(" ⎧ views update from Google Analytics ---") - if not self.disabled: + if self.running: try: start = time.time() async with self.lock: @@ -142,7 +142,7 @@ class ViewedStorage: logger.info(" ⎪ views update time: %fs " % (end - start)) except Exception as error: logger.error(error) - self.disabled = True + self.running = False @staticmethod def get_shout(shout_slug="", shout_id=0) -> int: @@ -178,25 +178,31 @@ class ViewedStorage: dictionary[key] = list(set(dictionary.get(key, []) + [value])) # Обновление тем и авторов с использованием вспомогательной функции - for [_shout_topic, topic] in ( + for [_st, topic] in ( session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all() ): update_groups(self.shouts_by_topic, topic.slug, shout_slug) - for [_shout_topic, author] in ( + for [_st, author] in ( session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all() ): update_groups(self.shouts_by_author, author.slug, shout_slug) + @staticmethod + async def stop(): + """Остановка фоновой задачи""" + self = ViewedStorage + async with self.lock: + self.running = False + logger.info("ViewedStorage worker was stopped.") + @staticmethod async def worker(): """Асинхронная задача обновления""" failed = 0 self = ViewedStorage - if self.disabled: - return - while True: + while self.running: try: await self.update_pages() failed = 0 @@ -206,6 +212,7 @@ class ViewedStorage: logger.info(" - update failed #%d, wait 10 secs" % failed) if failed > 3: logger.info(" - views update failed, not trying anymore") + self.running = False break if failed == 0: when = datetime.now(timezone.utc) + timedelta(seconds=self.period)