Merge remote-tracking branch 'origin/main'

This commit is contained in:
2021-08-19 07:48:40 +03:00
10 changed files with 257 additions and 163 deletions

View File

@@ -1,4 +1,4 @@
from orm.rbac import Operation, Permission, Role
from orm.rbac import Organization, Operation, Resource, Permission, Role
from orm.user import User
from orm.message import Message
from orm.shout import Shout
@@ -7,3 +7,5 @@ from orm.base import Base, engine
__all__ = ["User", "Role", "Operation", "Permission", "Message", "Shout"]
Base.metadata.create_all(engine)
Operation.init_table()
Resource.init_table()

View File

@@ -3,63 +3,85 @@ import warnings
from typing import Type
from sqlalchemy import String, Column, ForeignKey, types, UniqueConstraint
from sqlalchemy.orm import relationship
from orm.base import Base, REGISTRY, engine
from orm.base import Base, REGISTRY, engine, local_session
class ClassType(types.TypeDecorator):
impl = types.String
impl = types.String
@property
def python_type(self):
return NotImplemented
@property
def python_type(self):
return NotImplemented
def process_literal_param(self, value, dialect):
return NotImplemented
def process_literal_param(self, value, dialect):
return NotImplemented
def process_bind_param(self, value, dialect):
return value.__name__ if isinstance(value, type) else str(value)
def process_bind_param(self, value, dialect):
return value.__name__ if isinstance(value, type) else str(value)
def process_result_value(self, value, dialect):
class_ = REGISTRY.get(value)
if class_ is None:
warnings.warn(f"Can't find class <{value}>,find it yourself 😊", stacklevel=2)
return class_
def process_result_value(self, value, dialect):
class_ = REGISTRY.get(value)
if class_ is None:
warnings.warn(f"Can't find class <{value}>,find it yourself 😊", stacklevel=2)
return class_
class Organization(Base):
__tablename__ = 'organization'
name: str = Column(String, nullable=False, unique=True, comment="Organization Name")
class Role(Base):
__tablename__ = 'role'
name: str = Column(String, nullable=False, unique=True, comment="Role Name")
__tablename__ = 'role'
name: str = Column(String, nullable=False, unique=True, comment="Role Name")
org_id: int = Column(ForeignKey("organization.id", ondelete="CASCADE"), nullable=False, comment="Organization")
permissions = relationship("Permission")
class Operation(Base):
__tablename__ = 'operation'
name: str = Column(String, nullable=False, unique=True, comment="Operation Name")
__tablename__ = 'operation'
name: str = Column(String, nullable=False, unique=True, comment="Operation Name")
@staticmethod
def init_table():
with local_session() as session:
edit_op = session.query(Operation).filter(Operation.name == "edit").first()
if not edit_op:
edit_op = Operation.create(name = "edit")
Operation.edit_id = edit_op.id
class Resource(Base):
__tablename__ = "resource"
resource_class: Type[Base] = Column(ClassType, nullable=False, unique=True, comment="Resource class")
name: str = Column(String, nullable=False, unique=True, comment="Resource name")
__tablename__ = "resource"
resource_class: Type[Base] = Column(ClassType, nullable=False, unique=True, comment="Resource class")
name: str = Column(String, nullable=False, unique=True, comment="Resource name")
@staticmethod
def init_table():
with local_session() as session:
shout_res = session.query(Resource).filter(Resource.name == "shout").first()
if not shout_res:
shout_res = Resource.create(name = "shout", resource_class = "shout")
Resource.shout_id = shout_res.id
class Permission(Base):
__tablename__ = "permission"
__table_args__ = (UniqueConstraint("role_id", "operation_id", "resource_id"), {"extend_existing": True})
__tablename__ = "permission"
__table_args__ = (UniqueConstraint("role_id", "operation_id", "resource_id"), {"extend_existing": True})
role_id: int = Column(ForeignKey("role.id", ondelete="CASCADE"), nullable=False, comment="Role")
operation_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Operation")
resource_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Resource")
role_id: int = Column(ForeignKey("role.id", ondelete="CASCADE"), nullable=False, comment="Role")
operation_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Operation")
resource_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Resource")
if __name__ == '__main__':
Base.metadata.create_all(engine)
ops = [
Permission(role_id=1, operation_id=1, resource_id=1),
Permission(role_id=1, operation_id=2, resource_id=1),
Permission(role_id=1, operation_id=3, resource_id=1),
Permission(role_id=1, operation_id=4, resource_id=1),
Permission(role_id=2, operation_id=4, resource_id=1)
]
global_session.add_all(ops)
global_session.commit()
Base.metadata.create_all(engine)
ops = [
Permission(role_id=1, operation_id=1, resource_id=1),
Permission(role_id=1, operation_id=2, resource_id=1),
Permission(role_id=1, operation_id=3, resource_id=1),
Permission(role_id=1, operation_id=4, resource_id=1),
Permission(role_id=2, operation_id=4, resource_id=1)
]
global_session.add_all(ops)
global_session.commit()

View File

@@ -12,7 +12,7 @@ class Shout(Base):
id = None
slug: str = Column(String, primary_key=True)
org: str = Column(String, nullable=False)
org_id: str = Column(ForeignKey("organization.id"), nullable=False)
author_id: str = Column(ForeignKey("user.id"), nullable=False, comment="Author")
body: str = Column(String, nullable=False, comment="Body")
createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at")

View File

@@ -1,28 +1,41 @@
from typing import List
from sqlalchemy import Column, Integer, String, ForeignKey #, relationship
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from orm import Permission
from orm.base import Base, local_session
class UserRole(Base):
__tablename__ = 'user_role'
id = None
user_id: int = Column(ForeignKey("user.id"), primary_key = True)
role_id: int = Column(ForeignKey("role.id"), primary_key = True)
class User(Base):
__tablename__ = 'user'
email: str = Column(String, nullable=False)
email: str = Column(String, unique=True, nullable=False)
username: str = Column(String, nullable=False, comment="Name")
password: str = Column(String, nullable=True, comment="Password")
role_id: list = Column(ForeignKey("role.id"), nullable=True, comment="Role")
# roles = relationship("Role") TODO: one to many, see schema.graphql
oauth_id: str = Column(String, nullable=True)
roles = relationship("Role", secondary=UserRole.__table__)
@classmethod
def get_permission(cls, user_id):
scope = {}
with local_session() as session:
perms: List[Permission] = session.query(Permission).join(User, User.role_id == Permission.role_id).filter(
User.id == user_id).all()
return {f"{p.operation_id}-{p.resource_id}" for p in perms}
user = session.query(User).filter(User.id == user_id).first()
for role in user.roles:
for p in role.permissions:
if not p.resource_id in scope:
scope[p.resource_id] = set()
scope[p.resource_id].add(p.operation_id)
return scope
if __name__ == '__main__':