Source code for sanic_beskar.utilities

import datetime as dt
import re
from collections.abc import Iterable
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any

# If we are using `beanie`, we need to patch JSONEncoder to understand its objectid
try:  # pragma: no cover
    from beanie import PydanticObjectId as ObjectId
except (ImportError, ModuleNotFoundError):  # pragma: no cover
    from bson.objectid import ObjectId  # type: ignore

## If we are using `segno`, import for typing
if TYPE_CHECKING:
    from segno import QRCode

    from sanic_beskar import Beskar as BeskarType

from json import JSONEncoder as json_JSONEncoder

import pendulum
import ujson
from sanic import Request, Sanic

from sanic_beskar.constants import RESERVED_CLAIMS
from sanic_beskar.exceptions import BeskarError, ConfigurationError


[docs] class JSONEncoder(json_JSONEncoder): # pragma: no cover """JSON encoder class to facilitate serializing datetimes and ObjectId's"""
[docs] def default(self, o: Any) -> Any: """hooks for the various types""" if hasattr(o, "__json__"): return o.__json__() if isinstance(o, Iterable): return list(o) if isinstance(o, dt.datetime): return o.isoformat() if isinstance(o, ObjectId): return str(o) if hasattr(o, "__getitem__") and hasattr(o, "keys"): return dict(o) if hasattr(o, "__dict__"): return { member: getattr(o, member) for member in dir(o) if not member.startswith("_") and not callable(getattr(o, member)) } return JSONEncoder.default(self, o)
[docs] def get_request(request: Request) -> Request: """Get current Sanic Request""" try: if not request: return Request.get_current() return request except Exception: raise BeskarError("Could not identify current Sanic request")
[docs] def normalize_rbac(rbac_dump: dict) -> dict: """ Normalize an RBAC dump into something usable. Yes, I know this will produce duplicates in the role lists of a permission, but its much faster than dealing with a set, so we don't care. Example: {'rolename': ['read', 'write', 'update'],} Produces: {'read': ['rolename'], 'write': ['rolename'], 'update': ['rolename']} Args: rbac_dump (dict): RBAC dump from config/storage. Returns: dict: Normalized (for our purposes) RBAC policy. """ _inversed: dict = {} for k in rbac_dump: for v in rbac_dump[k]: _inversed.setdefault(v, []).append(k) return _inversed
[docs] async def is_valid_json(data: str) -> Any: """ Simple helper to validate if a value is valid json data :param data: Data to validate for valid JSON :type data: str :returns: ``True``, ``False`` :rtype: bool """ try: return ujson.loads(data) except (ValueError, TypeError): return False
[docs] def duration_from_string(text: str) -> pendulum.Duration: """ Parses a duration from a string. String may look like these patterns: * 1 Hour * 7 days, 45 minutes * 1y11d20m An exception will be raised if the text cannot be parsed :param text: String to parse for duration detail :type text: str :returns: Time Object :rtype: :py:mod:`pendulum` :raises: :py:exc:`~sanic_beskar.ConfigurationError` on bad strings """ text = text.replace(" ", "") text = text.replace(",", "") text = text.lower() match = re.match( r""" ((?P<years>\d+)y[a-z]*)? ((?P<months>\d+)mo[a-z]*)? ((?P<days>\d+)d[a-z]*)? ((?P<hours>\d+)h[a-z]*)? ((?P<minutes>\d+)m[a-z]*)? ((?P<seconds>\d+)s[a-z]*)? """, text, re.VERBOSE, ) ConfigurationError.require_condition( match, f"Couldn't parse {text}", ) parts = match.groupdict() # type: ignore clean = {k: int(v) for (k, v) in parts.items() if v} ConfigurationError.require_condition( clean, f"Couldn't parse {text}", ) with ConfigurationError.handle_errors(f"Couldn't parse {text}"): return pendulum.duration(**clean)
[docs] def current_guard(ctx: Sanic | SimpleNamespace | None = None) -> "BeskarType": """ Fetches the current instance of :py:class:`~sanic_beskar.Beskar` that is attached to the current sanic app :param ctx: Application Context :type ctx: Optional[:py:class:`sanic.Sanic`] :returns: Current Beskar Guard object for this app context :rtype: :py:class:`~sanic_beskar.Beskar` :raises: :py:exc:`~sanic_beskar.BeskarError` if no guard found """ if isinstance(ctx, Sanic): ctx = ctx.ctx if not ctx: ctx = Sanic.get_app().ctx guard: BeskarType = ctx.extensions.get("beskar", None) # type: ignore BeskarError.require_condition( guard is not None, "No current guard found; Beskar must be initialized first", ) return guard
[docs] def app_context_has_token_data(ctx: Sanic | None = None) -> bool: """ Checks if there is already token_data added to the app context :param ctx: Application Context :type ctx: Optional[Sanic] :returns: ``True``, ``False`` :rtype: bool """ if not ctx: ctx = Sanic.get_app().ctx return hasattr(ctx, "token_data")
[docs] def add_token_data_to_app_context(token_data: dict) -> None: """ Adds a dictionary of token data (presumably unpacked from a token) to the top of the sanic app's context :param token_data: ``dict`` of token data to add :type token_data: dict """ ctx = Sanic.get_app().ctx ctx.token_data = token_data
[docs] def get_token_data_from_app_context() -> dict: """ Fetches a dict of token data from the top of the sanic app's context :returns: Token ``dict`` found in current app context :rtype: dict :raises: :py:exc:`~sanic_beskar.BeskarError` on missing token """ ctx = Sanic.get_app().ctx token_data = getattr(ctx, "token_data", {}) BeskarError.require_condition( token_data != {}, """ No token_data found in app context. Make sure @auth_required decorator is specified *first* for route """, ) return token_data
[docs] def remove_token_data_from_app_context() -> None: """ Removes the dict of token data from the top of the sanic app's context """ ctx = Sanic.get_app().ctx if app_context_has_token_data(ctx): del ctx.token_data
[docs] def current_user_id() -> str | None: """ This method returns the user id retrieved from token data attached to the current sanic app's context :returns: ``id`` of current :py:class:`User`, if any :rtype: str :raises: :py:exc:`~sanic_beskar.BeskarError` if no user/token found """ token_data = get_token_data_from_app_context() user_id: str | None = token_data.get("id", None) BeskarError.require_condition( user_id is not None, "Could not fetch an id for the current user", ) return user_id
[docs] async def generate_totp_qr(user_totp: str) -> "QRCode": """ This is a helper utility to generate a :py:mod:`segno` QR code renderer, based upon a supplied `User` TOTP value. :param user_totp: TOTP configuration of the user :type user_totp: json :returns: ``Segno`` object based upon user's stored TOTP configuration :rtype: :py:class:`Segno` """ try: # pragma: no cover import segno except (ModuleNotFoundError, ImportError) as e: # pragma: no cover raise ConfigurationError( "Attempting to generate a TOTP QR code," "but you didn't install the necessary `segno` library!" ) from e return segno.make(user_totp)
[docs] async def current_user() -> Any: """ This method returns a user instance for token data attached to the current sanic app's context :returns: Current logged in ``User`` object :rtype: populated :py:attr:`user_class` attribute of the logged in :py:class:`~sanic_beskar.Beskar` instance :raises: :py:exc:`~sanic_beskar.BeskarError` if no user identified """ user_id = current_user_id() guard = current_guard() user = await guard.user_class.identify(user_id) BeskarError.require_condition( user is not None, "Could not identify the current user from the current id", ) return user
[docs] async def current_rolenames() -> set: """ This method returns the names of all roles associated with the current user :returns: Set of roles for currently logged in users :rtype: set """ token_data = get_token_data_from_app_context() if "rls" not in token_data: # This is necessary so our set arithmetic works correctly return set(["non-empty-but-definitely-not-matching-subset"]) return set(r.strip() for r in token_data["rls"].split(","))
[docs] def current_custom_claims() -> dict: """ This method returns any custom claims in the current token :returns: Custom claims for currently logged in user :rtype: dict """ token_data = get_token_data_from_app_context() return {k: v for (k, v) in token_data.items() if k not in RESERVED_CLAIMS}