from dateutil.parser import parse as date_parse import frontmatter import json import sqlite3 import sqlalchemy from orm import Shout, Comment, Topic, ShoutTopic, ShoutRating, ShoutViewByDay, User from bs4 import BeautifulSoup from migration.html2text import html2text from migration.tables.comments import migrate as migrateComment from transliterate import translit from datetime import datetime from sqlalchemy.exc import IntegrityError from orm.base import local_session from orm.community import Community import os DISCOURS_USER = { 'id': 9999999, 'slug': 'discours', 'name': 'Дискурс', 'userpic': 'https://discours.io/images/logo-mini.svg', 'createdAt': '2016-03-05 22:22:00.350000' } ts = datetime.now() type2layout = { 'Article': 'article', 'Literature': 'prose', 'Music': 'music', 'Video': 'video', 'Image': 'image' } def get_metadata(r): metadata = {} metadata['title'] = r.get('title') metadata['authors'] = r.get('authors') metadata['createdAt'] = r.get('createdAt', ts) metadata['layout'] = r['layout'] metadata['topics'] = [topic['slug'] for topic in r['topics']] metadata['topics'].sort() if r.get('cover', False): metadata['cover'] = r.get('cover') return metadata def migrate(entry, users_by_oid, topics_by_oid): ''' type Shout { slug: String! author: Int! body: String! createdAt: DateTime! updatedAt: DateTime! deletedAt: DateTime deletedBy: Int rating: Int ratings: [Rating] published: Bool! publishedAt: DateTime # if there is no published field - it is not published replyTo: String # another shout tags: [String] # actual values topics: [String] # topic-slugs, order has matter title: String versionOf: String visibleForRoles: [String] # role ids are strings visibleForUsers: [Int] views: Int } ''' # print(entry) content = '' r = { 'layout': type2layout[entry['type']], 'title': entry['title'], 'community': Community.default_community.id, 'authors': [], 'topics': [], 'rating': entry.get('rating', 0), 'ratings': [], 'createdAt': entry.get('createdAt', '2016-03-05 22:22:00.350000') } r['slug'] = entry.get('slug', '') if not r['slug'] and entry.get('friendlySlugs') is not None: r['slug'] = entry['friendlySlugs']['slug'][0]['slug'] if(r['slug'] is None): r['slug'] = entry['friendlySlugs'][0]['slug'] if not r['slug']: print('NO SLUG ERROR') # print(entry) raise Exception category = entry['category'] mainTopic = topics_by_oid.get(category) if mainTopic: r['mainTopic'] = mainTopic["slug"] topic_oids = [category, ] taglist = entry.get("tags", []) topic_oids.extend(taglist) for oid in topic_oids: if oid in topics_by_oid: r['topics'].append(topics_by_oid[oid]) if entry.get('image') is not None: r['cover'] = entry['image']['url'] if entry.get('thumborId') is not None: r['cover'] = 'https://assets.discours.io/unsafe/1600x/' + entry['thumborId'] if entry.get('updatedAt') is not None: r['updatedAt'] = date_parse(entry['updatedAt']) if entry.get('type') == 'Literature': media = entry.get('media', '') # print(media[0]['literatureBody']) if type(media) == list and media: body_orig = media[0].get('literatureBody', '') if body_orig == '': print('EMPTY BODY!') else: body_html = str(BeautifulSoup( body_orig, features="html.parser")) r['body'] = html2text(body_html) else: print(r['slug'] + ': literature has no media') elif entry.get('type') == 'Video': m = entry['media'][0] yt = m.get('youtubeId', '') vm = m.get('vimeoId', '') video_url = 'https://www.youtube.com/watch?v=' + yt if yt else '#' therestof = html2text(m.get('body', entry.get('body', ''))) r['body'] = 'import { YouTube } from \"solid-social\"\n' + \ '\n\n' + therestof if video_url == '#': video_url = 'https://vimeo.com/' + vm if vm else '#' r['body'] = 'import { Vimeo } from \"solid-social\"\n' + \ '\n\n' + therestof if video_url == '#': print(entry.get('media', 'UNKNOWN MEDIA PROVIDER!')) # raise Exception elif entry.get('type') == 'Music': r['body'] = '' for m in entry['media']: if m == { 'main': 'true' } or m == { 'main': True } or m == {}: continue else: # TODO: mark highlighted track isMain == True fileUrl = m.get('fileUrl', '') if not fileUrl: print(m) continue else: r['body'] = 'import MusicPlayer from \"src/components/MusicPlayer\"\n\n' r['body'] += '\n' r['body'] += html2text(entry.get('body', '')) elif entry.get('type') == 'Image': m = r.get('media') r['body'] = '' if 'cover' in r: r['body'] = '' r['body'] += entry.get('body', '') if r['body'] == '': print(entry) if r.get('body') is None: body_orig = entry.get('body', '') body_html = str(BeautifulSoup(body_orig, features="html.parser")) r['body'] = html2text(body_html) body = r.get('body', '') # get author data userdata = {} try: userdata = users_by_oid[entry['createdBy']] except KeyError: app = entry.get('application') if app: userslug = translit(app['name'], 'ru', reversed=True).replace(' ', '-').replace('\'', '').replace('.', '-').lower() userdata = { 'username': app['email'], 'email': app['email'], 'name': app['name'], 'bio': app.get('bio', ''), 'emailConfirmed': False, 'slug': userslug, 'createdAt': ts, 'wasOnlineAt': ts } if userdata == {}: userdata = { 'name': 'Дискурс', 'slug': 'discours', 'userpic': 'https://discours.io/image/logo-mini.svg' } # set author data shout_dict = r.copy() author = { # a short version for public listings 'slug': userdata.get('slug', 'discours'), 'name': userdata.get('name', 'Дискурс'), 'userpic': userdata.get('userpic', '') } shout_dict['authors'] = [ author, ] if entry['published']: metadata = get_metadata(shout_dict) content = frontmatter.dumps(frontmatter.Post(body, **metadata)) ext = 'mdx' parentDir = '/'.join(os.getcwd().split('/')[:-1]) filepath = parentDir + '/discoursio-web/content/' + r['layout'] + '/' + r['slug'] + '.' + ext # print(filepath) open(filepath, 'w').write(content) try: shout_dict['createdAt'] = date_parse(r.get('createdAt')) if entry.get('createdAt') else ts shout_dict['publishedAt'] = date_parse(entry.get('publishedAt')) if entry.get('published') else None if entry.get('deletedAt') is not None: shout_dict['deletedAt'] = date_parse(entry.get('deletedAt')) shout_dict['deletedBy'] = entry.get('deletedBy', '0') del shout_dict['topics'] # FIXME: AttributeError: 'str' object has no attribute '_sa_instance_state' del shout_dict['rating'] # FIXME: TypeError: 'rating' is an invalid keyword argument for Shout del shout_dict['ratings'] # get user user = None email = userdata.get('email') slug = userdata.get('slug') with local_session() as session: try: if email: user = session.query(User).filter(User.email == email).first() if not user and slug: user = session.query(User).filter(User.slug == slug).first() if not user and userdata: user = User.create(**userdata) except: print(userdata) assert user, 'could not get a user' shout_dict['authors'] = [ user, ] try: s = Shout.create(**shout_dict) # shout ratings shout_dict['ratings'] = [] for shout_rating_old in entry.get('ratings',[]): with local_session() as session: rater = session.query(User).\ filter(User.old_id == shout_rating_old['createdBy']).first() if rater: shout_rating_dict = { 'value': shout_rating_old['value'], 'rater': rater.slug, 'shout': s.slug } cts = shout_rating_old.get('createdAt') if cts: shout_rating_dict['ts'] = date_parse(cts) try: shout_rating = ShoutRating.create(**shout_rating_dict) except sqlalchemy.exc.IntegrityError: pass shout_dict['ratings'].append(shout_rating_dict) # shout topics shout_dict['topics'] = [] for topic in r['topics']: try: ShoutTopic.create(**{ 'shout': s.slug, 'topic': topic['slug'] }) shout_dict['topics'].append(topic['slug']) except sqlalchemy.exc.IntegrityError: pass views = entry.get('views', 1) ShoutViewByDay.create( shout = s.slug, value = views ) except Exception as e: raise e except Exception as e: if not shout_dict['body']: r['body'] = 'body moved' raise e shout_dict['old_id'] = entry.get('_id') return shout_dict # for json