import os from importlib import import_module from os.path import exists from ariadne import load_schema_from_path, make_executable_schema from ariadne.asgi import GraphQL from granian import Granian from granian.server import Interfaces from sentry_sdk.integrations.aiohttp import AioHttpIntegration from sentry_sdk.integrations.ariadne import AriadneIntegration from sentry_sdk.integrations.redis import RedisIntegration from starlette.applications import Starlette from services.core import CacheStorage from services.rediscache import redis from services.schema import resolvers from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN import_module('resolvers') schema = make_executable_schema(load_schema_from_path('inbox.graphql'), resolvers) # type: ignore async def start_up(): try: await redis.connect() await CacheStorage.init() if MODE == 'dev': if not exists(DEV_SERVER_PID_FILE_NAME): with open(DEV_SERVER_PID_FILE_NAME, 'w', encoding='utf-8') as f: f.write(str(os.getpid())) else: # startup sentry monitoring services import sentry_sdk sentry_sdk.init( SENTRY_DSN, enable_tracing=True, integrations=[ AriadneIntegration(), RedisIntegration(), AioHttpIntegration(), ], ) except Exception: print('STARTUP FAILED') import traceback traceback.print_exc() async def shutdown(): await redis.disconnect() app = Starlette(debug=True, on_startup=[start_up], on_shutdown=[shutdown]) app.mount('/', GraphQL(schema, debug=True)) if __name__ == '__main__': Granian(target='main:app', port=8888, interface=Interfaces.ASGI, reload=True).serve()