global session to local session

This commit is contained in:
knst-kotov
2021-08-05 16:49:08 +00:00
parent a0e99e5ba9
commit 93c6f88435
6 changed files with 88 additions and 79 deletions

View File

@@ -2,7 +2,7 @@ from typing import TypeVar, Any, Dict, Generic, Callable
from sqlalchemy import create_engine, Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session
from sqlalchemy.sql.schema import Table
from settings import DB_URL
@@ -10,48 +10,48 @@ from orm._retry import RetryingQuery
# engine = create_engine(DB_URL, convert_unicode=True, echo=False)
engine = create_engine(DB_URL,
convert_unicode=True,
echo=False,
#pool_size=10,
#max_overflow=2,
#pool_recycle=300,
pool_pre_ping=True,
#pool_use_lifo=True
)
Session = sessionmaker(autocommit=False, autoflush=False, bind=engine, query_cls=RetryingQuery)
#Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
global_session = Session()
convert_unicode=True,
echo=False,
#pool_size=10,
#max_overflow=2,
#pool_recycle=300,
pool_pre_ping=True,
#pool_use_lifo=True
future=True
)
T = TypeVar("T")
REGISTRY: Dict[str, type] = {}
def local_session():
return Session(bind=engine, expire_on_commit=False)
class Base(declarative_base()):
__table__: Table
__tablename__: str
__new__: Callable
__init__: Callable
__table__: Table
__tablename__: str
__new__: Callable
__init__: Callable
__abstract__: bool = True
__table_args__ = {"extend_existing": True}
id: int = Column(Integer, primary_key=True)
session = global_session
__abstract__: bool = True
__table_args__ = {"extend_existing": True}
id: int = Column(Integer, primary_key=True)
def __init_subclass__(cls, **kwargs):
REGISTRY[cls.__name__] = cls
def __init_subclass__(cls, **kwargs):
REGISTRY[cls.__name__] = cls
@classmethod
def create(cls: Generic[T], **kwargs) -> Generic[T]:
instance = cls(**kwargs)
return instance.save()
@classmethod
def create(cls: Generic[T], **kwargs) -> Generic[T]:
instance = cls(**kwargs)
return instance.save()
def save(self) -> Generic[T]:
self.session.add(self)
self.session.commit()
return self
def save(self) -> Generic[T]:
with local_session() as session:
session.add(self)
session.commit()
return self
def dict(self) -> Dict[str, Any]:
column_names = self.__table__.columns.keys()
return {c: getattr(self, c) for c in column_names}
def dict(self) -> Dict[str, Any]:
column_names = self.__table__.columns.keys()
return {c: getattr(self, c) for c in column_names}

View File

@@ -4,7 +4,7 @@ from typing import Type
from sqlalchemy import String, Column, ForeignKey, types, UniqueConstraint
from orm.base import Base, REGISTRY, engine, global_session
from orm.base import Base, REGISTRY, engine
class ClassType(types.TypeDecorator):

View File

@@ -3,26 +3,27 @@ from typing import List
from sqlalchemy import Column, Integer, String, ForeignKey #, relationship
from orm import Permission
from orm.base import Base
from orm.base import Base, local_session
class User(Base):
__tablename__ = 'user'
__tablename__ = 'user'
email: str = Column(String, nullable=False)
username: str = Column(String, nullable=False, comment="Name")
password: str = Column(String, nullable=True, comment="Password")
email: str = Column(String, nullable=False)
username: str = Column(String, nullable=False, comment="Name")
password: str = Column(String, nullable=True, comment="Password")
role_id: list = Column(ForeignKey("role.id"), nullable=True, comment="Role")
# roles = relationship("Role") TODO: one to many, see schema.graphql
oauth_id: str = Column(String, nullable=True)
role_id: list = Column(ForeignKey("role.id"), nullable=True, comment="Role")
# roles = relationship("Role") TODO: one to many, see schema.graphql
oauth_id: str = Column(String, nullable=True)
@classmethod
def get_permission(cls, user_id):
perms: List[Permission] = cls.session.query(Permission).join(User, User.role_id == Permission.role_id).filter(
User.id == user_id).all()
return {f"{p.operation_id}-{p.resource_id}" for p in perms}
@classmethod
def get_permission(cls, user_id):
with local_session() as session:
perms: List[Permission] = session.query(Permission).join(User, User.role_id == Permission.role_id).filter(
User.id == user_id).all()
return {f"{p.operation_id}-{p.resource_id}" for p in perms}
if __name__ == '__main__':
print(User.get_permission(user_id=1))
print(User.get_permission(user_id=1))