improve rbac
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
from orm.rbac import Operation, Permission, Role
|
||||
from orm.rbac import Organization, Operation, Resource, Permission, Role
|
||||
from orm.user import User
|
||||
from orm.message import Message
|
||||
from orm.shout import Shout
|
||||
@@ -7,3 +7,5 @@ from orm.base import Base, engine
|
||||
__all__ = ["User", "Role", "Operation", "Permission", "Message", "Shout"]
|
||||
|
||||
Base.metadata.create_all(engine)
|
||||
Operation.init_table()
|
||||
Resource.init_table()
|
||||
|
94
orm/rbac.py
94
orm/rbac.py
@@ -3,63 +3,85 @@ import warnings
|
||||
from typing import Type
|
||||
|
||||
from sqlalchemy import String, Column, ForeignKey, types, UniqueConstraint
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from orm.base import Base, REGISTRY, engine
|
||||
from orm.base import Base, REGISTRY, engine, local_session
|
||||
|
||||
|
||||
class ClassType(types.TypeDecorator):
|
||||
impl = types.String
|
||||
impl = types.String
|
||||
|
||||
@property
|
||||
def python_type(self):
|
||||
return NotImplemented
|
||||
@property
|
||||
def python_type(self):
|
||||
return NotImplemented
|
||||
|
||||
def process_literal_param(self, value, dialect):
|
||||
return NotImplemented
|
||||
def process_literal_param(self, value, dialect):
|
||||
return NotImplemented
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
return value.__name__ if isinstance(value, type) else str(value)
|
||||
def process_bind_param(self, value, dialect):
|
||||
return value.__name__ if isinstance(value, type) else str(value)
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
class_ = REGISTRY.get(value)
|
||||
if class_ is None:
|
||||
warnings.warn(f"Can't find class <{value}>,find it yourself 😊", stacklevel=2)
|
||||
return class_
|
||||
def process_result_value(self, value, dialect):
|
||||
class_ = REGISTRY.get(value)
|
||||
if class_ is None:
|
||||
warnings.warn(f"Can't find class <{value}>,find it yourself 😊", stacklevel=2)
|
||||
return class_
|
||||
|
||||
class Organization(Base):
|
||||
__tablename__ = 'organization'
|
||||
name: str = Column(String, nullable=False, unique=True, comment="Organization Name")
|
||||
|
||||
class Role(Base):
|
||||
__tablename__ = 'role'
|
||||
name: str = Column(String, nullable=False, unique=True, comment="Role Name")
|
||||
__tablename__ = 'role'
|
||||
name: str = Column(String, nullable=False, unique=True, comment="Role Name")
|
||||
org_id: int = Column(ForeignKey("organization.id", ondelete="CASCADE"), nullable=False, comment="Organization")
|
||||
|
||||
permissions = relationship("Permission")
|
||||
|
||||
class Operation(Base):
|
||||
__tablename__ = 'operation'
|
||||
name: str = Column(String, nullable=False, unique=True, comment="Operation Name")
|
||||
__tablename__ = 'operation'
|
||||
name: str = Column(String, nullable=False, unique=True, comment="Operation Name")
|
||||
|
||||
@staticmethod
|
||||
def init_table():
|
||||
with local_session() as session:
|
||||
edit_op = session.query(Operation).filter(Operation.name == "edit").first()
|
||||
if not edit_op:
|
||||
edit_op = Operation.create(name = "edit")
|
||||
Operation.edit_id = edit_op.id
|
||||
|
||||
|
||||
class Resource(Base):
|
||||
__tablename__ = "resource"
|
||||
resource_class: Type[Base] = Column(ClassType, nullable=False, unique=True, comment="Resource class")
|
||||
name: str = Column(String, nullable=False, unique=True, comment="Resource name")
|
||||
__tablename__ = "resource"
|
||||
resource_class: Type[Base] = Column(ClassType, nullable=False, unique=True, comment="Resource class")
|
||||
name: str = Column(String, nullable=False, unique=True, comment="Resource name")
|
||||
|
||||
@staticmethod
|
||||
def init_table():
|
||||
with local_session() as session:
|
||||
shout_res = session.query(Resource).filter(Resource.name == "shout").first()
|
||||
if not shout_res:
|
||||
shout_res = Resource.create(name = "shout", resource_class = "shout")
|
||||
Resource.shout_id = shout_res.id
|
||||
|
||||
|
||||
class Permission(Base):
|
||||
__tablename__ = "permission"
|
||||
__table_args__ = (UniqueConstraint("role_id", "operation_id", "resource_id"), {"extend_existing": True})
|
||||
__tablename__ = "permission"
|
||||
__table_args__ = (UniqueConstraint("role_id", "operation_id", "resource_id"), {"extend_existing": True})
|
||||
|
||||
role_id: int = Column(ForeignKey("role.id", ondelete="CASCADE"), nullable=False, comment="Role")
|
||||
operation_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Operation")
|
||||
resource_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Resource")
|
||||
role_id: int = Column(ForeignKey("role.id", ondelete="CASCADE"), nullable=False, comment="Role")
|
||||
operation_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Operation")
|
||||
resource_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Resource")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
Base.metadata.create_all(engine)
|
||||
ops = [
|
||||
Permission(role_id=1, operation_id=1, resource_id=1),
|
||||
Permission(role_id=1, operation_id=2, resource_id=1),
|
||||
Permission(role_id=1, operation_id=3, resource_id=1),
|
||||
Permission(role_id=1, operation_id=4, resource_id=1),
|
||||
Permission(role_id=2, operation_id=4, resource_id=1)
|
||||
]
|
||||
global_session.add_all(ops)
|
||||
global_session.commit()
|
||||
Base.metadata.create_all(engine)
|
||||
ops = [
|
||||
Permission(role_id=1, operation_id=1, resource_id=1),
|
||||
Permission(role_id=1, operation_id=2, resource_id=1),
|
||||
Permission(role_id=1, operation_id=3, resource_id=1),
|
||||
Permission(role_id=1, operation_id=4, resource_id=1),
|
||||
Permission(role_id=2, operation_id=4, resource_id=1)
|
||||
]
|
||||
global_session.add_all(ops)
|
||||
global_session.commit()
|
||||
|
@@ -12,7 +12,7 @@ class Shout(Base):
|
||||
id = None
|
||||
|
||||
slug: str = Column(String, primary_key=True)
|
||||
org: str = Column(String, nullable=False)
|
||||
org_id: str = Column(ForeignKey("organization.id"), nullable=False)
|
||||
author_id: str = Column(ForeignKey("user.id"), nullable=False, comment="Author")
|
||||
body: str = Column(String, nullable=False, comment="Body")
|
||||
createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at")
|
||||
|
23
orm/user.py
23
orm/user.py
@@ -1,11 +1,19 @@
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy import Column, Integer, String, ForeignKey #, relationship
|
||||
from sqlalchemy import Column, Integer, String, ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from orm import Permission
|
||||
from orm.base import Base, local_session
|
||||
|
||||
|
||||
class UserRole(Base):
|
||||
__tablename__ = 'user_role'
|
||||
|
||||
id = None
|
||||
user_id: int = Column(ForeignKey("user.id"), primary_key = True)
|
||||
role_id: int = Column(ForeignKey("role.id"), primary_key = True)
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = 'user'
|
||||
|
||||
@@ -13,16 +21,19 @@ class User(Base):
|
||||
username: str = Column(String, nullable=False, comment="Name")
|
||||
password: str = Column(String, nullable=True, comment="Password")
|
||||
|
||||
role_id: list = Column(ForeignKey("role.id"), nullable=True, comment="Role")
|
||||
# roles = relationship("Role") TODO: one to many, see schema.graphql
|
||||
oauth_id: str = Column(String, nullable=True)
|
||||
|
||||
roles = relationship("Role", secondary=UserRole.__table__)
|
||||
|
||||
@classmethod
|
||||
def get_permission(cls, user_id):
|
||||
scope = {}
|
||||
with local_session() as session:
|
||||
perms: List[Permission] = session.query(Permission).join(User, User.role_id == Permission.role_id).filter(
|
||||
User.id == user_id).all()
|
||||
return {f"{p.operation_id}-{p.resource_id}" for p in perms}
|
||||
user = session.query(User).filter(User.id == user_id).first()
|
||||
for role in user.roles:
|
||||
for p in role.permissions:
|
||||
scope[p.resource_id] = p.operation_id
|
||||
return scope
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Reference in New Issue
Block a user