This commit is contained in:
tonyrewin 2022-11-19 14:35:34 +03:00
parent 57e1460356
commit 47b285f8ac
18 changed files with 162 additions and 218 deletions

View File

@ -8,6 +8,7 @@ from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.routing import Route
from orm import init_tables
from auth.authenticate import JWTAuthenticate
from auth.oauth import oauth_login, oauth_authorize
@ -30,6 +31,7 @@ middleware = [
async def start_up():
init_tables()
await redis.connect()
await storages_init()
views_stat_task = asyncio.create_task(ViewedStorage().worker())

View File

@ -7,7 +7,6 @@ import sys
from datetime import datetime
import bs4
from base.redis import redis
from migration.tables.comments import migrate as migrateComment
from migration.tables.comments import migrate_2stage as migrateComment_2stage
from migration.tables.content_items import get_shout_slug
@ -17,6 +16,7 @@ from migration.tables.users import migrate as migrateUser
from migration.tables.users import migrate_2stage as migrateUser_2stage
from orm.reaction import Reaction
from settings import DB_URL
from orm import init_tables
# from export import export_email_subscriptions
from .export import export_mdx, export_slug
@ -84,6 +84,7 @@ async def shouts_handle(storage, args):
discours_author = 0
anonymous_author = 0
pub_counter = 0
ignored = 0
topics_dataset_bodies = []
topics_dataset_tlist = []
for entry in storage["shouts"]["data"]:
@ -96,40 +97,44 @@ async def shouts_handle(storage, args):
# migrate
shout = await migrateShout(entry, storage)
storage["shouts"]["by_oid"][entry["_id"]] = shout
storage["shouts"]["by_slug"][shout["slug"]] = shout
# shouts.topics
if not shout["topics"]:
print("[migration] no topics!")
if shout:
storage["shouts"]["by_oid"][entry["_id"]] = shout
storage["shouts"]["by_slug"][shout["slug"]] = shout
# shouts.topics
if not shout["topics"]:
print("[migration] no topics!")
# with author
author: str = shout["authors"][0].dict()
if author["slug"] == "discours":
discours_author += 1
if author["slug"] == "anonymous":
anonymous_author += 1
# print('[migration] ' + shout['slug'] + ' with author ' + author)
# with author
author: str = shout["authors"][0].dict()
if author["slug"] == "discours":
discours_author += 1
if author["slug"] == "anonymous":
anonymous_author += 1
# print('[migration] ' + shout['slug'] + ' with author ' + author)
if entry.get("published"):
if "mdx" in args:
export_mdx(shout)
pub_counter += 1
if entry.get("published"):
if "mdx" in args:
export_mdx(shout)
pub_counter += 1
# print main counter
counter += 1
line = str(counter + 1) + ": " + shout["slug"] + " @" + author["slug"]
print(line)
# print main counter
counter += 1
line = str(counter + 1) + ": " + shout["slug"] + " @" + author["slug"]
print(line)
b = bs4.BeautifulSoup(shout["body"], "html.parser")
texts = [shout["title"].lower().replace(r"[^а-яА-Яa-zA-Z]", "")]
texts = texts + b.findAll(text=True)
topics_dataset_bodies.append(" ".join([x.strip().lower() for x in texts]))
topics_dataset_tlist.append(shout["topics"])
b = bs4.BeautifulSoup(shout["body"], "html.parser")
texts = [shout["title"].lower().replace(r"[^а-яА-Яa-zA-Z]", "")]
texts = texts + b.findAll(text=True)
topics_dataset_bodies.append(" ".join([x.strip().lower() for x in texts]))
topics_dataset_tlist.append(shout["topics"])
else:
ignored += 1
# np.savetxt('topics_dataset.csv', (topics_dataset_bodies, topics_dataset_tlist), delimiter=',
# ', fmt='%s')
print("[migration] " + str(counter) + " content items were migrated")
print("[migration] " + str(ignored) + " content items were ignored")
print("[migration] " + str(pub_counter) + " have been published")
print("[migration] " + str(discours_author) + " authored by @discours")
print("[migration] " + str(anonymous_author) + " authored by @anonymous")
@ -182,8 +187,6 @@ async def all_handle(storage, args):
await users_handle(storage)
await topics_handle(storage)
print("[migration] users and topics are migrated")
await redis.connect()
print("[migration] redis connected")
await shouts_handle(storage, args)
print("[migration] migrating comments")
await comments_handle(storage)
@ -314,6 +317,7 @@ async def main():
cmd = sys.argv[1]
if type(cmd) == str:
print("[migration] command: " + cmd)
init_tables()
await handle_auto()
else:
print("[migration] usage: python server.py migrate")

View File

@ -3,10 +3,8 @@ import json
from dateutil.parser import parse as date_parse
from sqlalchemy.exc import IntegrityError
from transliterate import translit
from base.orm import local_session
from migration.extract import prepare_html_body
from orm.community import Community
from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower
from orm.user import User
@ -103,12 +101,8 @@ async def migrate(entry, storage):
r = {
"layout": type2layout[entry["type"]],
"title": entry["title"],
"community": Community.default_community.id,
"authors": [],
"topics": set([]),
# 'rating': 0,
# 'ratings': [],
"createdAt": [],
"topics": set([])
}
topics_by_oid = storage["topics"]["by_oid"]
users_by_oid = storage["users"]["by_oid"]
@ -177,20 +171,24 @@ async def migrate(entry, storage):
# add author as TopicFollower
with local_session() as session:
for tpc in r['topics']:
tf = session.query(
TopicFollower
).where(
TopicFollower.follower == userslug
).filter(
TopicFollower.topic == tpc
).first()
if not tf:
tf = TopicFollower.create(
topic=tpc,
follower=userslug,
auto=True
)
session.add(tf)
try:
tf = session.query(
TopicFollower
).where(
TopicFollower.follower == userslug
).filter(
TopicFollower.topic == tpc
).first()
if not tf:
tf = TopicFollower.create(
topic=tpc,
follower=userslug,
auto=True
)
session.add(tf)
except IntegrityError:
print('[migration.shout] skipped by topic ' + tpc)
return
entry["topics"] = r["topics"]
entry["cover"] = r["cover"]
@ -205,7 +203,6 @@ async def migrate(entry, storage):
user = None
del shout_dict["topics"]
with local_session() as session:
# c = session.query(Community).all().pop()
if not user and userslug:
user = session.query(User).filter(User.slug == userslug).first()
if not user and userdata:

View File

@ -200,7 +200,6 @@
"ecology": "ecology",
"economics": "economics",
"eda": "food",
"editing": "editing",
"editorial-statements": "editorial-statements",
"eduard-limonov": "eduard-limonov",
"education": "education",
@ -597,7 +596,6 @@
"r-b": "rnb",
"rasizm": "racism",
"realizm": "realism",
"redaktura": "editorial",
"refleksiya": "reflection",
"reggi": "reggae",
"religion": "religion",

View File

@ -1,6 +1,6 @@
from base.orm import local_session
from migration.extract import extract_md, html2text
from orm import Topic, Community
from orm import Topic
def migrate(entry):
@ -8,9 +8,7 @@ def migrate(entry):
topic_dict = {
"slug": entry["slug"],
"oid": entry["_id"],
"title": entry["title"].replace(" ", " "),
"children": [],
"community": Community.default_community.slug,
"title": entry["title"].replace(" ", " ")
}
topic_dict["body"] = extract_md(html2text(body_orig), entry["_id"])
with local_session() as session:

View File

@ -36,6 +36,7 @@ def migrate(entry):
)
bio = BeautifulSoup(entry.get("profile").get("bio") or "", features="lxml").text
if bio.startswith('<'):
print('[migration] bio! ' + bio)
bio = BeautifulSoup(bio, features="lxml").text
bio = bio.replace('\(', '(').replace('\)', ')')

View File

@ -6,6 +6,9 @@ from orm.reaction import Reaction
from orm.shout import Shout
from orm.topic import Topic, TopicFollower
from orm.user import User, UserRating
from orm.viewed import ViewedEntry
# NOTE: keep orm module isolated
__all__ = [
"User",
@ -19,13 +22,18 @@ __all__ = [
"Notification",
"Reaction",
"UserRating"
"ViewedEntry"
]
Base.metadata.create_all(engine)
Operation.init_table()
Resource.init_table()
User.init_table()
Community.init_table()
Role.init_table()
# NOTE: keep orm module isolated
def init_tables():
Base.metadata.create_all(engine)
Operation.init_table()
Resource.init_table()
User.init_table()
Community.init_table()
UserRating.init_table()
Shout.init_table()
Role.init_table()
ViewedEntry.init_table()
print("[orm] tables initialized")

View File

@ -32,12 +32,14 @@ class Community(Base):
@staticmethod
def init_table():
with local_session() as session:
default = (
d = (
session.query(Community).filter(Community.slug == "discours").first()
)
if not default:
default = Community.create(
name="Дискурс", slug="discours", createdBy="discours"
)
Community.default_community = default
if not d:
d = Community.create(
name="Дискурс", slug="discours", createdBy="anonymous"
)
session.add(d)
session.commit()
Community.default_community = d
print('[migration] default community: %s' % d.id)

View File

@ -50,7 +50,7 @@ class Role(Base):
default = Role.create(
name="author",
desc="Role for author",
community=Community.default_community.id,
community=1,
)
Role.default_role = default

View File

@ -1,9 +1,9 @@
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, JSON
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, String, JSON
from sqlalchemy.orm import relationship
from base.orm import Base
from base.orm import Base, local_session
from orm.reaction import Reaction
from orm.topic import Topic
from orm.user import User
@ -43,7 +43,7 @@ class Shout(Base):
__tablename__ = "shout"
slug = Column(String, unique=True)
community = Column(Integer, ForeignKey("community.id"), nullable=False, comment="Community")
community = Column(ForeignKey("community.id"), default=1)
lang = Column(String, nullable=False, default='ru', comment="Language")
body = Column(String, nullable=False, comment="Body")
title = Column(String, nullable=True)
@ -56,7 +56,6 @@ class Shout(Base):
reactions = relationship(lambda: Reaction)
visibility = Column(String, nullable=True) # owner authors community public
versionOf = Column(ForeignKey("shout.slug"), nullable=True)
lang = Column(String, default='ru')
oid = Column(String, nullable=True)
media = Column(JSON, nullable=True)
@ -64,3 +63,16 @@ class Shout(Base):
updatedAt = Column(DateTime, nullable=True, comment="Updated at")
publishedAt = Column(DateTime, nullable=True)
deletedAt = Column(DateTime, nullable=True)
@staticmethod
def init_table():
with local_session() as session:
entry = {
"slug": "genesis-block",
"body": "",
"title": "Ничего",
"lang": "ru"
}
s = Shout.create(**entry)
session.add(s)
session.commit()

View File

@ -1,6 +1,5 @@
from datetime import datetime
from sqlalchemy import JSON as JSONType
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, String
from base.orm import Base
@ -25,10 +24,7 @@ class Topic(Base):
title = Column(String, nullable=False, comment="Title")
body = Column(String, nullable=True, comment="Body")
pic = Column(String, nullable=True, comment="Picture")
children = Column(
JSONType, nullable=True, default=[], comment="list of children topics"
)
community = Column(
ForeignKey("community.slug"), nullable=False, comment="Community"
ForeignKey("community.id"), default=1, comment="Community"
)
oid = Column(String, nullable=True, comment="Old ID")

View File

@ -25,6 +25,10 @@ class UserRating(Base):
user = Column(ForeignKey("user.slug"), primary_key=True)
value = Column(Integer)
@staticmethod
def init_table():
pass
class UserRole(Base):
__tablename__ = "user_role"
@ -48,6 +52,7 @@ class AuthorFollower(Base):
class User(Base):
__tablename__ = "user"
default_user = None
email = Column(String, unique=True, nullable=False, comment="Email")
username = Column(String, nullable=False, comment="Login")

View File

@ -1,6 +1,6 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey
from base.orm import Base
from sqlalchemy import Column, DateTime, ForeignKey, Integer
from base.orm import Base, local_session
class ViewedEntry(Base):
@ -8,6 +8,18 @@ class ViewedEntry(Base):
viewer = Column(ForeignKey("user.slug"), default='anonymous')
shout = Column(ForeignKey("shout.slug"))
amount = Column(Integer, default=1)
createdAt = Column(
DateTime, nullable=False, default=datetime.now, comment="Created at"
)
@staticmethod
def init_table():
with local_session() as session:
entry = {
"shout": "genesis-block",
"amount": 0
}
viewed = ViewedEntry.create(**entry)
session.add(viewed)
session.commit()

View File

@ -33,7 +33,7 @@ async def get_author_stat(slug):
# TODO: implement author stat
with local_session() as session:
return {
"shouts": session.query(ShoutAuthor).where(ShoutAuthor.author == slug).count(),
"shouts": session.query(ShoutAuthor).where(ShoutAuthor.user == slug).count(),
"followers": session.query(AuthorFollower).where(AuthorFollower.author == slug).count(),
"followings": session.query(AuthorFollower).where(AuthorFollower.follower == slug).count(),
"rating": session.query(func.sum(UserRating.value)).where(UserRating.user == slug).first(),

View File

@ -37,6 +37,7 @@ type AuthorStat {
followers: Int
rating: Int
commented: Int
shouts: Int
}
@ -492,8 +493,6 @@ type Topic {
title: String
body: String
pic: String
parents: [String] # NOTE: topic can have parent topics
children: [String] # and children
community: Community!
stat: TopicStat
oid: String

View File

@ -2,9 +2,10 @@ import asyncio
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from base.orm import local_session
from sqlalchemy import func, select
from orm.viewed import ViewedEntry
from orm.shout import ShoutTopic
from services.zine.topics import TopicStorage
from ssl import create_default_context
@ -43,12 +44,13 @@ ssl = create_default_context()
class ViewedStorage:
lock = asyncio.Lock()
by_topics = {}
by_shouts = {}
period = 5 * 60 # 5 minutes
client = None
transport = None
@staticmethod
async def load_views(session):
async def update_views(session):
# TODO: when the struture of payload will be transparent
# TODO: perhaps ackee token getting here
@ -61,18 +63,53 @@ class ViewedStorage:
print(domains)
print('\n\n# TODO: something here...\n\n')
@staticmethod
async def get_shout(shout_slug):
self = ViewedStorage
async with self.lock:
r = self.by_shouts.get(shout_slug)
if r:
with local_session() as session:
shout_views = 0
shout_views_q = select(func.sum(ViewedEntry.amount)).where(
ViewedEntry.shout == shout_slug
)
shout_views = session.execute(shout_views_q)
self.by_shouts[shout_slug] = shout_views
return shout_views
else:
return r
@staticmethod
async def get_topic(topic_slug):
self = ViewedStorage
topic_views = 0
async with self.lock:
topic_views_by_shouts = self.by_topics.get(topic_slug) or {}
if len(topic_views_by_shouts.keys()) == 0:
with local_session() as session:
shoutslugs = session.query(ShoutTopic.shout).where(ShoutTopic.topic == topic_slug).all()
self.by_topics[topic_slug] = {}
for slug in shoutslugs:
self.by_topics[topic_slug][slug] = await self.get_shout(slug)
topic_views_by_shouts = self.by_topics.get(topic_slug) or {}
for shout in topic_views_by_shouts:
topic_views += shout
return topic_views
@staticmethod
async def increment(shout_slug, amount=1, viewer='anonymous'):
self = ViewedStorage
async with self.lock:
with local_session() as session:
viewed = ViewedEntry.create({
viewed = ViewedEntry.create(**{
"viewer": viewer,
"shout": shout_slug
"shout": shout_slug,
"amount": amount
})
session.add(viewed)
session.commit()
self.by_shouts[shout_slug] = self.by_shouts.get(shout_slug, 0) + amount
shout_topics = await TopicStorage.get_topics_by_slugs([shout_slug, ])
for t in shout_topics:
self.by_topics[t] = self.by_topics.get(t) or {}
@ -85,7 +122,7 @@ class ViewedStorage:
while True:
try:
with local_session() as session:
await self.load_views(session)
await self.update_views(session)
except Exception as err:
print("[stat.viewed] : %s" % (err))
print("[stat.viewed] renew period: %d minutes" % (self.period / 60))

View File

@ -1,127 +0,0 @@
import asyncio
import json
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from base.redis import redis
from services.zine.topics import TopicStorage
from ssl import create_default_context
query_ackee_views = gql(
"""
query getDomainsFacts {
domains {
statistics {
views {
id
count
}
pages {
id
count
created
}
}
facts {
activeVisitors
# averageViews
# averageDuration
viewsToday
viewsMonth
viewsYear
}
}
}
"""
)
ssl = create_default_context()
class ViewStat:
lock = asyncio.Lock()
by_slugs = {}
by_topics = {}
period = 5 * 60 # 5 minutes
transport = AIOHTTPTransport(url="https://ackee.discours.io/", ssl=ssl)
client = Client(transport=transport, fetch_schema_from_transport=True)
@staticmethod
async def load_views():
# TODO: when the struture of paylod will be transparent
# TODO: perhaps ackee token getting here
self = ViewStat
async with self.lock:
self.by_topics = await redis.execute("GET", "views_by_topics")
if self.by_topics:
self.by_topics = dict(json.loads(self.by_topics))
else:
self.by_topics = {}
self.by_slugs = await redis.execute("GET", "views_by_shouts")
if self.by_slugs:
self.by_slugs = dict(json.loads(self.by_slugs))
else:
self.by_slugs = {}
domains = await self.client.execute_async(query_ackee_views)
print("[stat.ackee] loaded domains")
print(domains)
print('\n\n# TODO: something here...\n\n')
@staticmethod
async def get_shout(shout_slug):
self = ViewStat
async with self.lock:
return self.by_slugs.get(shout_slug) or 0
@staticmethod
async def get_topic(topic_slug):
self = ViewStat
async with self.lock:
shouts = self.by_topics.get(topic_slug) or {}
topic_views = 0
for v in shouts.values():
topic_views += v
return topic_views
@staticmethod
async def increment(shout_slug, amount=1):
self = ViewStat
async with self.lock:
self.by_slugs[shout_slug] = self.by_slugs.get(shout_slug) or 0
self.by_slugs[shout_slug] += amount
await redis.execute(
"SET",
f"views_by_shouts/{shout_slug}",
str(self.by_slugs[shout_slug])
)
shout_topics = await TopicStorage.get_topics_by_slugs([shout_slug, ])
for t in shout_topics:
self.by_topics[t] = self.by_topics.get(t) or {}
self.by_topics[t][shout_slug] = self.by_topics[t].get(shout_slug) or 0
self.by_topics[t][shout_slug] += amount
await redis.execute(
"SET",
f"views_by_topics/{t}/{shout_slug}",
str(self.by_topics[t][shout_slug])
)
@staticmethod
async def reset():
self = ViewStat
self.by_topics = {}
self.by_slugs = {}
@staticmethod
async def worker():
self = ViewStat
while True:
try:
await self.load_views()
except Exception as err:
print("[stat.ackee] : %s" % (err))
print("[stat.ackee] renew period: %d minutes" % (ViewStat.period / 60))
await asyncio.sleep(self.period)