diff --git a/migration/__init__.py b/migration/__init__.py index c59e3ec4..ef540896 100644 --- a/migration/__init__.py +++ b/migration/__init__.py @@ -1,22 +1,23 @@ """ cmd managed migration """ import asyncio +import gc import json import sys from datetime import datetime, timezone -import gc + import bs4 + +from migration.export import export_mdx from migration.tables.comments import migrate as migrateComment from migration.tables.comments import migrate_2stage as migrateComment_2stage from migration.tables.content_items import get_shout_slug from migration.tables.content_items import migrate as migrateShout -from migration.tables.topics import migrate as migrateTopic -from migration.tables.users import migrate as migrateUser from migration.tables.remarks import migrate as migrateRemark +from migration.tables.topics import migrate as migrateTopic +from migration.tables.users import migrate as migrateUser, post_migrate as users_post_migrate from migration.tables.users import migrate_2stage as migrateUser_2stage -from orm.reaction import Reaction from orm import init_tables -from migration.export import export_mdx - +from orm.reaction import Reaction TODAY = datetime.strftime(datetime.now(tz=timezone.utc), "%Y%m%d") OLD_DATE = "2016-03-05 22:22:00.350000" @@ -42,6 +43,7 @@ async def users_handle(storage): ce = 0 for entry in storage["users"]["data"]: ce += migrateUser_2stage(entry, id_map) + users_post_migrate() async def topics_handle(storage): @@ -180,8 +182,8 @@ async def all_handle(storage, args): await topics_handle(storage) print("[migration] users and topics are migrated") await shouts_handle(storage, args) - print("[migration] remarks...") - await remarks_handle(storage) + # print("[migration] remarks...") + # await remarks_handle(storage) print("[migration] migrating comments") await comments_handle(storage) # export_email_subscriptions() diff --git a/migration/tables/content_items.py b/migration/tables/content_items.py index f386e484..d44b19dc 100644 --- a/migration/tables/content_items.py +++ b/migration/tables/content_items.py @@ -176,7 +176,7 @@ async def migrate(entry, storage): await content_ratings_to_reactions(entry, shout_dict["slug"]) # shout views - await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1)) + await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1), viewer='old-discours') # del shout_dict['ratings'] storage["shouts"]["by_oid"][entry["_id"]] = shout_dict diff --git a/migration/tables/users.py b/migration/tables/users.py index 9dc45518..3404dd7c 100644 --- a/migration/tables/users.py +++ b/migration/tables/users.py @@ -1,7 +1,9 @@ +import re + +from bs4 import BeautifulSoup from dateutil.parser import parse from sqlalchemy.exc import IntegrityError -from bs4 import BeautifulSoup -import re + from base.orm import local_session from orm.user import AuthorFollower, User, UserRating @@ -108,6 +110,20 @@ def migrate(entry): return user_dict +def post_migrate(): + old_discours_dict = { + "slug": "old-discours", + "username": "old-discours", + "email": "old@discours.io", + "name": "Просмотры на старой версии сайта" + } + + with local_session() as session: + old_discours_user = User.create(**old_discours_dict) + session.add(old_discours_user) + session.commit() + + def migrate_2stage(entry, id_map): ce = 0 for rating_entry in entry.get("ratings", []): diff --git a/services/stat/viewed.py b/services/stat/viewed.py index e2168942..e1ef0a58 100644 --- a/services/stat/viewed.py +++ b/services/stat/viewed.py @@ -1,16 +1,17 @@ import asyncio import time from datetime import timedelta, timezone, datetime +from os import environ, path +from ssl import create_default_context + from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport -from base.orm import local_session from sqlalchemy import func +from base.orm import local_session from orm import User, Topic from orm.shout import ShoutTopic, Shout from orm.viewed import ViewedEntry -from ssl import create_default_context -from os import environ, path load_facts = gql(""" query getDomains { @@ -64,7 +65,7 @@ class ViewedStorage: views = None pages = None domains = None - period = 24 * 60 * 60 # one time a day + period = 60 * 60 # every hour client = None auth_result = None disabled = False @@ -98,8 +99,8 @@ class ViewedStorage: p = page["value"].split("?")[0] slug = p.split('discours.io/')[-1] shouts[slug] = page["count"] - for slug, v in shouts: - await ViewedStorage.increment(slug, v) + for slug in shouts.keys(): + await ViewedStorage.increment(slug, shouts[slug]) except Exception: pass print("[stat.viewed] ⎪ %d pages collected " % len(shouts.keys())) @@ -164,15 +165,31 @@ class ViewedStorage: self = ViewedStorage async with self.lock: with local_session() as session: - shout = session.query(Shout).where(Shout.slug == shout_slug).one() - viewer = session.query(User).where(User.slug == viewer).one() + # TODO: user slug -> id + viewed = session.query( + ViewedEntry + ).join( + Shout + ).join( + User + ).filter( + User.slug == viewer, + Shout.slug == shout_slug + ).first() + + if viewed: + viewed.amount = amount + print("amount: %d" % amount) + else: + shout = session.query(Shout).where(Shout.slug == shout_slug).one() + viewer = session.query(User).where(User.slug == viewer).one() + new_viewed = ViewedEntry.create(**{ + "viewer": viewer.id, + "shout": shout.id, + "amount": amount + }) + session.add(new_viewed) - viewed = ViewedEntry.create(**{ - "viewer": viewer.id, - "shout": shout.id, - "amount": amount - }) - session.add(viewed) session.commit() self.by_shouts[shout_slug] = self.by_shouts.get(shout_slug, 0) + amount self.update_topics(session, shout_slug) @@ -184,25 +201,25 @@ class ViewedStorage: self = ViewedStorage if self.disabled: return - async with self.lock: - while True: - try: - print("[stat.viewed] - updating views...") - await self.update_pages() - failed = 0 - except Exception: - failed += 1 - print("[stat.viewed] - update failed #%d, wait 10 seconds" % failed) - if failed > 3: - print("[stat.viewed] - not trying to update anymore") - break - if failed == 0: - when = datetime.now(timezone.utc) + timedelta(seconds=self.period) - t = format(when.astimezone().isoformat()) - print("[stat.viewed] ⎩ next update: %s" % ( - t.split("T")[0] + " " + t.split("T")[1].split(".")[0] - )) - await asyncio.sleep(self.period) - else: - await asyncio.sleep(10) - print("[stat.viewed] - trying to update data again") + + while True: + try: + print("[stat.viewed] - updating views...") + await self.update_pages() + failed = 0 + except Exception: + failed += 1 + print("[stat.viewed] - update failed #%d, wait 10 seconds" % failed) + if failed > 3: + print("[stat.viewed] - not trying to update anymore") + break + if failed == 0: + when = datetime.now(timezone.utc) + timedelta(seconds=self.period) + t = format(when.astimezone().isoformat()) + print("[stat.viewed] ⎩ next update: %s" % ( + t.split("T")[0] + " " + t.split("T")[1].split(".")[0] + )) + await asyncio.sleep(self.period) + else: + await asyncio.sleep(10) + print("[stat.viewed] - trying to update data again")