Module dcso.portal.auth.rbac
Expand source code
# Copyright (c) 2020, DCSO GmbH
from typing import Optional, Sequence
from ..abstracts import ServiceAbstract
from ..exceptions import PortalAPIResponse, PortalException
from ..util.graphql import GraphQLRequest
class Permission:
def __init__(self, id: str, slug: str, service: str):
self.id: str = id
self.slug: str = slug
self.service: str = service
class ServicePermissions:
def __init__(self, graphql_response: Optional[dict]):
self._permissions: Sequence[Permission] = []
self._index_services: dict = {}
self._index_permissions_by_id: dict = {}
self._index_permissions_by_slug: dict = {}
if graphql_response:
self._handle_graphql_response(graphql_response)
def _handle_graphql_response(self, res: dict) -> None:
self._permissions = [] # reset
self._index_services = {}
idx = 0 # permissions are indexed in dicts for membership testing
try:
for entry in res['accessControl']['servicePermissions']:
service = entry['service']['code']
for p in entry['permissions']:
self._permissions.append(Permission(id=p['id'], slug=p['slug'], service=service))
self._index_services.setdefault(service, [])
# store the index to the permission
self._index_services[service].append(idx)
self._index_permissions_by_id[p['id']] = idx
self._index_permissions_by_slug[p['slug']] = idx
idx += 1
except KeyError as exc:
raise PortalAPIResponse(f"failed handling service permission ({exc})")
def __iter__(self) -> Permission:
for perm in self._permissions:
yield perm
def have(self, p) -> bool:
"""Returns True whether permission is available
The p argument can be either the ID (a UUID) or the Slug of the permission.
Note that the list of permissions being checked might be less than the
permissions the user actually has. This is because only a subset can
be queried for.
:param p: ID (UUID) or Slug of the permission to check
:return: True if permission is available
"""
return p in self._index_permissions_by_slug or p in self._index_permissions_by_id
def slugs(self) -> Sequence[str]:
"""Returns sequence of permissions' ID and Slug
:return: Sequence of IDs and Slugs of all permissions for this service.
"""
def as_slug_dict(self) -> dict:
"""Returns permission slugs for all services as dict
:return: Dictionary with key the service code, and value sequence of permission slugs.
"""
result = {}
for p in self._permissions:
try:
result[p.service].append(p.slug)
except KeyError:
result[p.service] = [p.slug]
return result
class RBACMixin(ServiceAbstract):
_api = None # mixed in
def user_service_permissions(self, user_id: Optional[str] = None,
services: Optional[Sequence[str]] = None) -> ServicePermissions:
"""Retrieves permissions available to user with ID `user_id`. The result is an instance
of `ServicePermissions` which holds the user's permission for all or for selected services.
Raises `PortalAPIError` When the GraphQL API endpoint returned an error.
When there was an issue with the request itself, or decoding JSON failed,
the `PortalAPIRequest` exception is raised.
"""
variables = {
"id": user_id,
"services": services
}
request = GraphQLRequest(api_url=self._api.api_url, token=self._api.token,
query=_GRAPHQL_QUERY_USER_SERVICE_PERMISSIONS, variables=variables)
try:
response = request.execute_dict()
except PortalException:
raise
try:
return ServicePermissions(graphql_response=response['data']['user'])
except PortalAPIResponse:
raise
_GRAPHQL_QUERY_USER_SERVICE_PERMISSIONS = """
query ($id: ID, $services: [String!]) {
user: auth_user(id: $id) {
accessControl {
servicePermissions(filter: {serviceCode: $services}) {
service { code }
permissions { id slug }
}
}
}
}
"""
Classes
class Permission (id: str, slug: str, service: str)
-
Expand source code
class Permission: def __init__(self, id: str, slug: str, service: str): self.id: str = id self.slug: str = slug self.service: str = service
class RBACMixin
-
Expand source code
class RBACMixin(ServiceAbstract): _api = None # mixed in def user_service_permissions(self, user_id: Optional[str] = None, services: Optional[Sequence[str]] = None) -> ServicePermissions: """Retrieves permissions available to user with ID `user_id`. The result is an instance of `ServicePermissions` which holds the user's permission for all or for selected services. Raises `PortalAPIError` When the GraphQL API endpoint returned an error. When there was an issue with the request itself, or decoding JSON failed, the `PortalAPIRequest` exception is raised. """ variables = { "id": user_id, "services": services } request = GraphQLRequest(api_url=self._api.api_url, token=self._api.token, query=_GRAPHQL_QUERY_USER_SERVICE_PERMISSIONS, variables=variables) try: response = request.execute_dict() except PortalException: raise try: return ServicePermissions(graphql_response=response['data']['user']) except PortalAPIResponse: raise
Ancestors
Subclasses
Methods
def user_service_permissions(self, user_id: Union[str, NoneType] = None, services: Union[Sequence[str], NoneType] = None) ‑> ServicePermissions
-
Retrieves permissions available to user with ID
user_id
. The result is an instance ofServicePermissions
which holds the user's permission for all or for selected services.Raises
PortalAPIError
When the GraphQL API endpoint returned an error. When there was an issue with the request itself, or decoding JSON failed, thePortalAPIRequest
exception is raised.Expand source code
def user_service_permissions(self, user_id: Optional[str] = None, services: Optional[Sequence[str]] = None) -> ServicePermissions: """Retrieves permissions available to user with ID `user_id`. The result is an instance of `ServicePermissions` which holds the user's permission for all or for selected services. Raises `PortalAPIError` When the GraphQL API endpoint returned an error. When there was an issue with the request itself, or decoding JSON failed, the `PortalAPIRequest` exception is raised. """ variables = { "id": user_id, "services": services } request = GraphQLRequest(api_url=self._api.api_url, token=self._api.token, query=_GRAPHQL_QUERY_USER_SERVICE_PERMISSIONS, variables=variables) try: response = request.execute_dict() except PortalException: raise try: return ServicePermissions(graphql_response=response['data']['user']) except PortalAPIResponse: raise
class ServicePermissions (graphql_response: Union[dict, NoneType])
-
Expand source code
class ServicePermissions: def __init__(self, graphql_response: Optional[dict]): self._permissions: Sequence[Permission] = [] self._index_services: dict = {} self._index_permissions_by_id: dict = {} self._index_permissions_by_slug: dict = {} if graphql_response: self._handle_graphql_response(graphql_response) def _handle_graphql_response(self, res: dict) -> None: self._permissions = [] # reset self._index_services = {} idx = 0 # permissions are indexed in dicts for membership testing try: for entry in res['accessControl']['servicePermissions']: service = entry['service']['code'] for p in entry['permissions']: self._permissions.append(Permission(id=p['id'], slug=p['slug'], service=service)) self._index_services.setdefault(service, []) # store the index to the permission self._index_services[service].append(idx) self._index_permissions_by_id[p['id']] = idx self._index_permissions_by_slug[p['slug']] = idx idx += 1 except KeyError as exc: raise PortalAPIResponse(f"failed handling service permission ({exc})") def __iter__(self) -> Permission: for perm in self._permissions: yield perm def have(self, p) -> bool: """Returns True whether permission is available The p argument can be either the ID (a UUID) or the Slug of the permission. Note that the list of permissions being checked might be less than the permissions the user actually has. This is because only a subset can be queried for. :param p: ID (UUID) or Slug of the permission to check :return: True if permission is available """ return p in self._index_permissions_by_slug or p in self._index_permissions_by_id def slugs(self) -> Sequence[str]: """Returns sequence of permissions' ID and Slug :return: Sequence of IDs and Slugs of all permissions for this service. """ def as_slug_dict(self) -> dict: """Returns permission slugs for all services as dict :return: Dictionary with key the service code, and value sequence of permission slugs. """ result = {} for p in self._permissions: try: result[p.service].append(p.slug) except KeyError: result[p.service] = [p.slug] return result
Methods
def as_slug_dict(self) ‑> dict
-
Returns permission slugs for all services as dict
:return: Dictionary with key the service code, and value sequence of permission slugs.
Expand source code
def as_slug_dict(self) -> dict: """Returns permission slugs for all services as dict :return: Dictionary with key the service code, and value sequence of permission slugs. """ result = {} for p in self._permissions: try: result[p.service].append(p.slug) except KeyError: result[p.service] = [p.slug] return result
def have(self, p) ‑> bool
-
Returns True whether permission is available
The p argument can be either the ID (a UUID) or the Slug of the permission.
Note that the list of permissions being checked might be less than the permissions the user actually has. This is because only a subset can be queried for.
:param p: ID (UUID) or Slug of the permission to check :return: True if permission is available
Expand source code
def have(self, p) -> bool: """Returns True whether permission is available The p argument can be either the ID (a UUID) or the Slug of the permission. Note that the list of permissions being checked might be less than the permissions the user actually has. This is because only a subset can be queried for. :param p: ID (UUID) or Slug of the permission to check :return: True if permission is available """ return p in self._index_permissions_by_slug or p in self._index_permissions_by_id
def slugs(self) ‑> Sequence[str]
-
Returns sequence of permissions' ID and Slug
:return: Sequence of IDs and Slugs of all permissions for this service.
Expand source code
def slugs(self) -> Sequence[str]: """Returns sequence of permissions' ID and Slug :return: Sequence of IDs and Slugs of all permissions for this service. """