From ff6637a51e4c33a3f94cf0e669cfa898e0f3ac7a Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 22 Jan 2024 19:17:39 +0300 Subject: [PATCH] precounted-views-import --- services/viewed.py | 62 +++++++++++----------------------------------- 1 file changed, 15 insertions(+), 47 deletions(-) diff --git a/services/viewed.py b/services/viewed.py index 7d58f268..a66ab232 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -3,6 +3,7 @@ import threading from typing import Dict from logging import Logger import time +import json from datetime import datetime, timedelta, timezone from os import environ import logging @@ -102,12 +103,26 @@ class ViewedStorage: self.client = create_client({"Authorization": f"Bearer {token}"}, schema=schema_str) logger.info(" * authorized permanently by ackee.discours.io: %s" % token) + # Load pre-counted views from the JSON file + self.load_precounted_views() + views_stat_task = asyncio.create_task(self.worker()) logger.info(views_stat_task) else: logger.info(" * please set ACKEE_TOKEN") self.disabled = True + @staticmethod + def load_precounted_views(): + self = ViewedStorage + try: + with open("/dump/views.json", "r") as file: + precounted_views = json.load(file) + self.by_shouts.update(precounted_views) + logger.info(f" * {len(precounted_views)} pre-counted views loaded successfully.") + except Exception as e: + logger.error(f"Error loading pre-counted views: {e}") + @staticmethod async def update_pages(): """query all the pages from ackee sorted by views count""" @@ -225,53 +240,6 @@ class ViewedStorage: except Exception as e: logger.error(f"Error during threaded execution: {e}") - @staticmethod - async def increment_amount(shout_slug, amount): - """the migration way to change counter with batching""" - resource = ackee_site + shout_slug - self = ViewedStorage - - gql_string = "" - batch_size = 100 - if not isinstance(amount, int): - try: - amount = int(amount) - if not isinstance(amount, int): - amount = 1 - except: - pass - - self.by_shouts[shout_slug] = self.by_shouts.get(shout_slug, 0) + amount - self.update_topics(shout_slug) - logger.info(f"{int(amount/100) + 1} requests") - for i in range(amount): - alias = f"mutation{i + 1}" - gql_string += f"{alias}: {create_record_mutation_string - .replace('$domainId', f'"{domain_id}"') - .replace('$input', f'{{siteLocation: "{resource}"}}') - }\n" - # Execute the batch every 100 records - if (i + 1) % batch_size == 0 or (i + 1) == amount: - await self.exec(f"mutation {{\n{gql_string}\n}}") - gql_string = "" # Reset the gql_string for the next batch - # Throttle the requests to 3 per second - await asyncio.sleep(1 / 3) - - - logger.info(f"Incremented {amount} records for shout_slug: {shout_slug}") - - - @staticmethod - async def exec(gql_string: str): - self = ViewedStorage - async with self.lock: - if self.client: - try: - await asyncio.to_thread(self.client.execute, gql(gql_string)) - except Exception as e: - logger.error(f"Error during threaded execution: {e}") - - @staticmethod async def worker(): """async task worker"""