dcso.portal.auth.token API documentation

Module dcso.portal.auth.token

Expand source code
# Copyright (c) 2020, DCSO GmbH

import base64
import binascii
import json
from datetime import datetime, timezone, timedelta
from typing import Optional

from ..exceptions import PortalAPIResponse
from ..util.temporal import diff_utc_now, utc_now


def pad_base64(b: str) -> str:
    return b + ('=' * (len(b) % 4))


class Token:
    def __init__(self, graphql_response: Optional[dict]):
        self.token: str = ''
        self._is_temporary: bool = False
        self.expires: Optional[datetime] = None

        if graphql_response:
            self._handle_graphql_response(graphql_response)

    @property
    def is_temporary(self) -> bool:
        return self._is_temporary

    def _handle_graphql_response(self, res: dict):
        try:
            self.token = res['token']
            self._is_temporary = res['isTemporaryToken']
        except KeyError as exc:
            raise PortalAPIResponse(f"failed handling token information ({exc})")

        if '.' in self.token:
            self._handle_jwt(self.token)

    def _handle_jwt(self, token: str):
        bad_jwt = "malformed user token ({reason})"
        try:
            base64_header, base64_payload, base64_signature = token.split('.')
        except ValueError:
            raise PortalAPIResponse(bad_jwt.format(reason="parts check"))

        try:
            header = json.loads(base64.b64decode(pad_base64(base64_header)).decode())
            payload = json.loads(base64.b64decode(pad_base64(base64_payload)).decode())
        except (binascii.Error, UnicodeError) as exc:
            raise PortalAPIResponse(bad_jwt.format(reason="decode JSON; " + str(exc)))

        if header['typ'] != 'JWT':
            raise PortalAPIResponse(bad_jwt.format(reason="not JWT"))

        try:
            self.expires = datetime.utcfromtimestamp(payload['exp']).replace(tzinfo=timezone.utc)
        except (OverflowError, OSError, KeyError):
            raise PortalAPIResponse(bad_jwt.format(reason="bad expire"))

        if self.is_expired():
            raise PortalAPIResponse(bad_jwt.format(reason="expired"))

        self._is_temporary = not payload['authz']['groups']

    def is_expired(self) -> bool:
        return self.expires <= utc_now()

    def expires_in(self) -> timedelta:
        if self.expires:
            return diff_utc_now(self.expires)

        return timedelta(0)

Functions

def pad_base64(b: str) ‑> str
Expand source code
def pad_base64(b: str) -> str:
    return b + ('=' * (len(b) % 4))

Classes

class Token (graphql_response: Union[dict, NoneType])
Expand source code
class Token:
    def __init__(self, graphql_response: Optional[dict]):
        self.token: str = ''
        self._is_temporary: bool = False
        self.expires: Optional[datetime] = None

        if graphql_response:
            self._handle_graphql_response(graphql_response)

    @property
    def is_temporary(self) -> bool:
        return self._is_temporary

    def _handle_graphql_response(self, res: dict):
        try:
            self.token = res['token']
            self._is_temporary = res['isTemporaryToken']
        except KeyError as exc:
            raise PortalAPIResponse(f"failed handling token information ({exc})")

        if '.' in self.token:
            self._handle_jwt(self.token)

    def _handle_jwt(self, token: str):
        bad_jwt = "malformed user token ({reason})"
        try:
            base64_header, base64_payload, base64_signature = token.split('.')
        except ValueError:
            raise PortalAPIResponse(bad_jwt.format(reason="parts check"))

        try:
            header = json.loads(base64.b64decode(pad_base64(base64_header)).decode())
            payload = json.loads(base64.b64decode(pad_base64(base64_payload)).decode())
        except (binascii.Error, UnicodeError) as exc:
            raise PortalAPIResponse(bad_jwt.format(reason="decode JSON; " + str(exc)))

        if header['typ'] != 'JWT':
            raise PortalAPIResponse(bad_jwt.format(reason="not JWT"))

        try:
            self.expires = datetime.utcfromtimestamp(payload['exp']).replace(tzinfo=timezone.utc)
        except (OverflowError, OSError, KeyError):
            raise PortalAPIResponse(bad_jwt.format(reason="bad expire"))

        if self.is_expired():
            raise PortalAPIResponse(bad_jwt.format(reason="expired"))

        self._is_temporary = not payload['authz']['groups']

    def is_expired(self) -> bool:
        return self.expires <= utc_now()

    def expires_in(self) -> timedelta:
        if self.expires:
            return diff_utc_now(self.expires)

        return timedelta(0)

Instance variables

var is_temporary : bool
Expand source code
@property
def is_temporary(self) -> bool:
    return self._is_temporary

Methods

def expires_in(self) ‑> datetime.timedelta
Expand source code
def expires_in(self) -> timedelta:
    if self.expires:
        return diff_utc_now(self.expires)

    return timedelta(0)
def is_expired(self) ‑> bool
Expand source code
def is_expired(self) -> bool:
    return self.expires <= utc_now()