import datetime
import re
import textwrap
import uuid
import warnings
from collections.abc import Callable
from importlib import import_module
from importlib.util import find_spec
from typing import TYPE_CHECKING, Any, cast
import aiofiles
import jinja2
import jwt
import pendulum
import ujson
from passlib.context import CryptContext
from passlib.totp import TOTP
from sanic import Request, Sanic
from sanic.compat import Header
from sanic.log import logger
from sanic_beskar.constants import (
DEFAULT_CONFIRMATION_SUBJECT,
DEFAULT_CONFIRMATION_TEMPLATE,
DEFAULT_HASH_ALLOWED_SCHEMES,
DEFAULT_HASH_AUTOTEST,
DEFAULT_HASH_AUTOUPDATE,
DEFAULT_HASH_DEPRECATED_SCHEMES,
DEFAULT_HASH_SCHEME,
DEFAULT_JWT_ALGORITHM,
DEFAULT_JWT_ALLOWED_ALGORITHMS,
DEFAULT_PASETO_VERSION,
DEFAULT_PASSWORD_POLICY,
DEFAULT_RESET_SUBJECT,
DEFAULT_RESET_TEMPLATE,
DEFAULT_ROLES_DISABLED,
DEFAULT_TOKEN_ACCESS_LIFESPAN,
DEFAULT_TOKEN_COOKIE_NAME,
DEFAULT_TOKEN_HEADER_NAME,
DEFAULT_TOKEN_HEADER_TYPE,
DEFAULT_TOKEN_PLACES,
DEFAULT_TOKEN_PROVIDER,
DEFAULT_TOKEN_REFRESH_LIFESPAN,
DEFAULT_TOKEN_RESET_LIFESPAN,
DEFAULT_TOTP_ENFORCE,
DEFAULT_TOTP_SECRETS_DATA,
DEFAULT_TOTP_SECRETS_TYPE,
DEFAULT_USER_CLASS_VALIDATION_METHOD,
IS_REGISTRATION_TOKEN_CLAIM,
IS_RESET_TOKEN_CLAIM,
REFRESH_EXPIRATION_CLAIM,
RESERVED_CLAIMS,
VITAM_AETERNUM,
AccessType,
)
from sanic_beskar.exceptions import (
AuthenticationError,
BeskarError,
BlacklistedError,
ClaimCollisionError,
ConfigurationError,
EarlyRefreshError,
ExpiredAccessError,
ExpiredRefreshError,
InvalidRegistrationToken,
InvalidResetToken,
InvalidTokenHeader,
InvalidUserError,
LegacyScheme,
MissingClaimError,
MissingToken,
MissingUserError,
MisusedRegistrationToken,
MisusedResetToken,
TOTPRequired,
)
from sanic_beskar.utilities import (
JSONEncoder,
duration_from_string,
get_request,
is_valid_json,
normalize_rbac,
)
if TYPE_CHECKING:
from pyseto import KeyInterface, Paseto, Token
[docs]
class Beskar:
"""
Comprises the implementation for the :py:mod:`sanic-beskar`
:py:mod:`sanic` extension. Provides a tool that allows password
authentication and token provision for applications and designated
endpoints
"""
def __init__(
self: "Beskar",
app: Sanic | None = None,
user_class: object | None = None,
is_blacklisted: Callable | None = None,
encode_token_hook: Callable | None = None,
refresh_token_hook: Callable | None = None,
rbac_populate_hook: Callable | None = None,
) -> None:
self.app: Sanic
self.pwd_ctx: CryptContext = CryptContext()
self.totp_ctx: TOTP = TOTP(new=True)
self.totp_secrets_type: str
self.hash_scheme = None
self.salt = None
self.token_provider = "jwt" # nosec
self.paseto_ctx: Paseto
self.paseto_key: bytes | str
self.paseto_token: Token
self.rbac_definitions: dict = {}
if app is not None and user_class is not None:
self.init_app(
app,
user_class,
is_blacklisted,
encode_token_hook,
refresh_token_hook,
rbac_populate_hook,
)
[docs]
async def open_session(self, request: Request) -> None:
"""required for Sanic"""
pass
[docs]
def init_app(
self,
app: Sanic,
user_class: object,
is_blacklisted: Callable | None = None,
encode_token_hook: Callable | None = None,
refresh_token_hook: Callable | None = None,
rbac_populate_hook: Callable | None = None,
) -> Sanic:
"""
Initializes the :py:class:`Beskar` extension
Args:
app (Sanic): The :py:mod:`Sanic` app to bind this extension. Defaults to None.
user_class (object): Class used to interact with a `User`. Defaults to None.
is_blacklisted (Callable, optional): A method that may optionally be
used to check the token against a blacklist when access or refresh
is requested should take the jti for the token to check as a single
argument. Returns True if the jti is blacklisted, False otherwise.
Defaults to `False`.
encode_token_hook (Callable, optional): A method that may optionally be
called right before an encoded jwt is generated. Should take
payload_parts which contains the ingredients for the jwt.
Defaults to `None`.
refresh_token_hook (Callable, optional): A method that may optionally be called
right before an encoded jwt is refreshed. Should take payload_parts
which contains the ingredients for the jwt. Defaults to `None`.
rbac_populate_hook (Callable, optional): A method that may optionally be called
at Beskar init time, or periodcally, to populate a RBAC dictionary mapping
user Roles to RBAC rights. Defaults to `None`.
Returns:
Object: Initialized sanic-beskar object.
Raises:
ConfigurationError: Invalid/missing configuration value is detected.
"""
self.app = app
app.register_middleware(self.open_session, "request")
self.roles_disabled = app.config.get(
"BESKAR_ROLES_DISABLED",
DEFAULT_ROLES_DISABLED,
)
self.hash_autoupdate = app.config.get(
"BESKAR_HASH_AUTOUPDATE",
DEFAULT_HASH_AUTOUPDATE,
)
self.hash_autotest = app.config.get(
"BESKAR_HASH_AUTOTEST",
DEFAULT_HASH_AUTOTEST,
)
self.pwd_ctx = CryptContext(
schemes=app.config.get(
"BESKAR_HASH_ALLOWED_SCHEMES",
DEFAULT_HASH_ALLOWED_SCHEMES,
),
default=app.config.get(
"BESKAR_HASH_SCHEME",
DEFAULT_HASH_SCHEME,
),
deprecated=app.config.get(
"BESKAR_HASH_DEPRECATED_SCHEMES",
DEFAULT_HASH_DEPRECATED_SCHEMES,
),
)
if self.pwd_ctx.default_scheme().startswith("pbkdf2_"):
if not find_spec("fastpbkdf2"):
warnings.warn(
textwrap.dedent("""
You are using a `pbkdf2` hashing scheme, but didn't install
the `fastpbkdf2` module, which will give you like 40%
speed improvements. you should go do that now.
"""),
UserWarning,
)
self.user_class = self._validate_user_class(user_class)
self.is_blacklisted = is_blacklisted or (lambda t: False)
self.encode_token_hook = encode_token_hook
self.refresh_token_hook = refresh_token_hook
self.rbac_populate_hook = rbac_populate_hook
self.access_lifespan: pendulum.Duration
self.refresh_lifespan: pendulum.Duration
# Populate our config defaults
self.set_config()
# Run config security checks
self.audit()
# If the user provided a base RBAC policy, lets consume it
if app.config.get("BESKAR_RBAC_POLICY"):
try:
self.rbac_definitions = normalize_rbac(app.config.get("BESKAR_RBAC_POLICY", {}))
except Exception as e:
raise ConfigurationError(
f"Failure loading supplied BESKAR_RBAC_POLICY " f"from config: {e}"
) from e
if isinstance(self.access_lifespan, dict):
self.access_lifespan = pendulum.duration(**self.access_lifespan)
elif isinstance(self.access_lifespan, str):
self.access_lifespan = duration_from_string(self.access_lifespan)
ConfigurationError.require_condition(
isinstance(self.access_lifespan, datetime.timedelta),
"access lifespan was not configured",
)
if isinstance(self.refresh_lifespan, dict):
self.refresh_lifespan = pendulum.duration(**self.refresh_lifespan)
if isinstance(self.refresh_lifespan, str):
self.refresh_lifespan = duration_from_string(self.refresh_lifespan)
ConfigurationError.require_condition(
isinstance(self.refresh_lifespan, datetime.timedelta),
"refresh lifespan was not configured",
)
if self.token_provider == "paseto": # nosec B105
try: # pragma: no cover
from pyseto import Key, Paseto, Token # noqa
except (ImportError, ModuleNotFoundError) as e:
raise ConfigurationError(
"Trying to use PASETO, " "but you did't install the `pyseto` module"
) from e
self.paseto_parsed_keys: KeyInterface = Key.new(
version=self.paseto_version, purpose="local", key=self.paseto_key
)
self.paseto_ctx = Paseto(exp=self.access_lifespan.seconds, include_iat=False)
self.paseto_token = Token # type: ignore
# TODO: add 'issuser', at the very least
if self.totp_secrets_type:
"""
If we are saying we are using a TOTP secret protection type,
we need to ensure the type is something supported (file, string, wallet),
and that the BESKAR_TOTP_SECRETS_DATA is populated.
"""
self.totp_secrets_type = self.totp_secrets_type.lower()
ConfigurationError.require_condition(
self.totp_secrets_data,
'If "BESKAR_TOTP_SECRETS_TYPE" is set, you must also'
'provide a valid value for "BESKAR_TOTP_SECRETS_DATA"',
)
if self.totp_secrets_type == "file":
self.totp_ctx = TOTP.using(secrets_path=app.config.get("BESKAR_TOTP_SECRETS_DATA")) # type: ignore
elif self.totp_secrets_type == "string":
self.totp_ctx = TOTP.using(secrets=app.config.get("BESKAR_TOTP_SECRETS_DATA")) # type: ignore
elif self.totp_secrets_type == "wallet":
self.totp_ctx = TOTP.using(wallet=app.config.get("BESKAR_TOTP_SECRETS_DATA")) # type: ignore
else:
raise ConfigurationError(
f'If {"BESKAR_TOTP_SECRETS_TYPE"} is set, it must be one'
f'of the following schemes: {["file", "string", "wallet"]}'
)
else:
self.totp_ctx = TOTP.using() # type: ignore
self.is_testing = app.config.get("TESTING", False)
"""
If we are supporting RBAC, lets go pull the current, massage it, and store
it. Additionally, setup a listener to know when to go pull updated RBAC
info whenever the application causes or detects a change.
Application owner must manually trigger this if there is a change, by sending
a tickle to the ``beskar.rbac.update`` signal watcher.
"""
if self.rbac_populate_hook:
ConfigurationError.require_condition(
callable(self.rbac_populate_hook),
"rbac_populate_hook was configured, but doesn't appear callable",
)
@app.signal("beskar.rbac.update")
async def rbac_populate() -> None:
"""Sanic worker to popluate the RBAC policy"""
_rbac_dump = await self.rbac_populate_hook() # type: ignore
self.rbac_definitions = normalize_rbac(_rbac_dump)
logger.debug(f"RBAC definitions updated: {self.rbac_definitions}")
@app.before_server_start
async def init_rbac_populate(app: Sanic) -> None:
"""Sanic worker to call the RBAC policy updater at app init"""
logger.info("Populating initial RBAC definitions")
await app.dispatch("beskar.rbac.update")
app.add_task(init_rbac_populate(app)) # type: ignore
if not hasattr(app.ctx, "extensions"):
app.ctx.extensions = {}
app.ctx.extensions["beskar"] = self
return app
[docs]
def set_config(self) -> None:
"""
Simple helper to populate all the config settings, making `init_app()` easier to read
"""
self.encode_key = self.app.config["SECRET_KEY"]
self.allowed_algorithms = self.app.config.get(
"JWT_ALLOWED_ALGORITHMS",
DEFAULT_JWT_ALLOWED_ALGORITHMS,
)
self.encode_algorithm = self.app.config.get(
"JWT_ALGORITHM",
DEFAULT_JWT_ALGORITHM,
)
self.access_lifespan = self.app.config.get(
"TOKEN_ACCESS_LIFESPAN",
DEFAULT_TOKEN_ACCESS_LIFESPAN,
)
self.refresh_lifespan = self.app.config.get(
"TOKEN_REFRESH_LIFESPAN",
DEFAULT_TOKEN_REFRESH_LIFESPAN,
)
self.reset_lifespan = self.app.config.get(
"TOKEN_RESET_LIFESPAN",
DEFAULT_TOKEN_RESET_LIFESPAN,
)
self.token_places = self.app.config.get(
"TOKEN_PLACES",
DEFAULT_TOKEN_PLACES,
)
self.cookie_name = self.app.config.get(
"TOKEN_COOKIE_NAME",
DEFAULT_TOKEN_COOKIE_NAME,
)
self.header_name = self.app.config.get(
"TOKEN_HEADER_NAME",
DEFAULT_TOKEN_HEADER_NAME,
)
self.header_type = self.app.config.get(
"TOKEN_HEADER_TYPE",
DEFAULT_TOKEN_HEADER_TYPE,
)
self.user_class_validation_method = self.app.config.get(
"USER_CLASS_VALIDATION_METHOD",
DEFAULT_USER_CLASS_VALIDATION_METHOD,
)
self.confirmation_template = self.app.config.get(
"BESKAR_CONFIRMATION_TEMPLATE",
DEFAULT_CONFIRMATION_TEMPLATE,
)
self.confirmation_uri = self.app.config.get(
"BESKAR_CONFIRMATION_URI",
)
self.confirmation_sender = self.app.config.get(
"BESKAR_CONFIRMATION_SENDER",
)
self.confirmation_subject = self.app.config.get(
"BESKAR_CONFIRMATION_SUBJECT",
DEFAULT_CONFIRMATION_SUBJECT,
)
self.reset_template = self.app.config.get(
"BESKAR_RESET_TEMPLATE",
DEFAULT_RESET_TEMPLATE,
)
self.reset_uri = self.app.config.get(
"BESKAR_RESET_URI",
)
self.reset_sender = self.app.config.get(
"BESKAR_RESET_SENDER",
)
self.reset_subject = self.app.config.get(
"BESKAR_RESET_SUBJECT",
DEFAULT_RESET_SUBJECT,
)
self.totp_enforce = self.app.config.get(
"BESKAR_TOTP_ENFORCE",
DEFAULT_TOTP_ENFORCE,
)
self.totp_secrets_type = self.app.config.get(
"BESKAR_TOTP_SECRETS_TYPE",
DEFAULT_TOTP_SECRETS_TYPE,
)
self.totp_secrets_data = self.app.config.get(
"BESKAR_TOTP_SECRETS_DATA",
DEFAULT_TOTP_SECRETS_DATA,
)
self.token_provider = self.app.config.get(
"BESKAR_TOKEN_PROVIDER",
DEFAULT_TOKEN_PROVIDER,
)
self.token_provider = self.token_provider.lower()
self.paseto_version = self.app.config.get(
"BESKAR_PASETO_VERSION",
DEFAULT_PASETO_VERSION,
)
self.paseto_key = self.app.config.get(
"BESKAR_PASETO_KEY",
self.encode_key,
)
self.password_policy = self.app.config.get(
"BESKAR_PASSWORD_POLICY",
DEFAULT_PASSWORD_POLICY,
)
# Catch anything remaining unset and default
for setting in DEFAULT_PASSWORD_POLICY:
if setting not in self.password_policy:
self.password_policy[setting] = DEFAULT_PASSWORD_POLICY[setting]
[docs]
def audit(self) -> None:
"""
Perform some basic sanity check of settings to make sure the developer didn't
try to do some blatantly lame stuff
"""
valid_schemes = self.pwd_ctx.schemes()
ConfigurationError.require_condition(
self.hash_scheme in valid_schemes or self.hash_scheme is None,
f'If {"BESKAR_HASH_SCHEME"} is set, it must be one of the following schemes: {valid_schemes}',
)
ConfigurationError.require_condition(
self.app.config.get("SECRET_KEY") is not None,
"There must be a SECRET_KEY app config setting set",
)
ConfigurationError.require_condition(
len(self.app.config.SECRET_KEY) >= int(self.password_policy["length"])
or self.app.config.get("I_MAKE_POOR_CHOICES", False),
f"your SECRET_KEY is weak in length [{len(self.app.config.SECRET_KEY)} < "
f"{self.password_policy['length']}]! fix it, or set 'I_MAKE_POOR_CHOICES' to True.",
)
ConfigurationError.require_condition(
self.password_policy["length"] >= 8
or self.app.config.get("I_MAKE_POOR_CHOICES", False),
"your password policy secret key length is weak! fix it, or set 'I_MAKE_POOR_CHOICES' to True. "
"See https://pages.nist.gov/800-63-3/sp800-63b.html#appA for more information.",
)
ConfigurationError.require_condition(
getattr(self, f"encode_{self.token_provider}_token"),
"Invalid `token_provider` configured. Please check docs and try again.",
)
ConfigurationError.require_condition(
0 < self.paseto_version < 5,
"Invalid `paseto_version` configured. Valid are [1, 2, 3, 4] only.",
)
if self.password_policy["attempt_lockout"] in [0, None, ""]:
warnings.warn(
"The PASSWORD_POLICY['attempt_lockout'] value is insecure, "
"and should not be used. See https://pages.nist.gov/800-63-3/sp800-63b.html#throttle"
)
def _validate_user_class(self, user_class: Any) -> Any:
"""
Validates the supplied :py:data:`user_class` to make sure that it has the
class methods and attributes necessary to function correctly.
After validating class methods, will attempt to instantiate a dummy
instance of the user class to test for the requisite attributes
Requirements:
- :py:meth:`lookup` method. Accepts a string parameter, returns instance
- :py:meth:`identify` method. Accepts an identity parameter, returns instance
- :py:attribute:`identity` attribute. Provides unique id for the instance
- :py:attribute:`rolenames` attribute. Provides list of roles attached to instance
- :py:attribute:`password` attribute. Provides hashed password for instance
Args:
user_class (:py:class:`User`): `User` class to use.
Returns:
User: Validated `User` object
Raises:
:py:exc:`~sanic_beskar.exceptions.BeskarError`: Missing requirements
"""
BeskarError.require_condition(
getattr(user_class, "lookup", None) is not None,
textwrap.dedent("""
The user_class must have a lookup class method:
user_class.lookup(<str>) -> <user instance>
"""),
)
BeskarError.require_condition(
getattr(user_class, "identify", None) is not None,
textwrap.dedent("""
The user_class must have an identify class method:
user_class.identify(<identity>) -> <user instance>
"""),
)
dummy_user = None
try:
dummy_user = user_class()
except Exception:
logger.debug(
"Skipping instance validation because "
"user cannot be instantiated without arguments"
)
if dummy_user:
BeskarError.require_condition(
hasattr(dummy_user, "identity"),
textwrap.dedent("""
Instances of user_class must have an identity attribute:
user_instance.identity -> <unique id for instance>
"""),
)
BeskarError.require_condition(
self.roles_disabled or hasattr(dummy_user, "rolenames"),
textwrap.dedent("""
Instances of user_class must have a rolenames attribute:
user_instance.rolenames -> [<role1>, <role2>, ...]
"""),
)
BeskarError.require_condition(
hasattr(dummy_user, "password"),
textwrap.dedent("""
Instances of user_class must have a password attribute:
user_instance.rolenames -> <hashed password>
"""),
)
return user_class
[docs]
async def generate_user_totp(self) -> object:
"""
Generates a :py:mod:`passlib` TOTP for a user. This must be manually saved/updated to the
:py:class:`User` object.
. ..note:: The application secret(s) should be stored in a secure location, and each
secret should contain a large amount of entropy (to prevent brute-force attacks
if the encrypted keys are leaked). :py:func:`passlib.generate_secret()` is
provided as a convenience helper to generate a new application secret of suitable size.
Best practice is to load these values from a file via secrets_path, pulled in value, or
utilizing a `passlib wallet`, and then have your application give up permission
to read this file once it's running.
:returns: New :py:mod:`passlib` TOTP secret object
"""
if not self.app.config.get("BESKAR_TOTP_SECRETS_TYPE"):
warnings.warn(
textwrap.dedent("""
Sanic_Beskar is attempting to generate a new TOTP
for a user, but you haven't configured a BESKAR_TOTP_SECRETS_TYPE
value, which means you aren't properly encrypting these stored
TOTP secrets. *tsk*tsk*
"""),
UserWarning,
)
return self.totp_ctx.new()
async def _verify_totp(self, token: str, user: object) -> Any:
"""
Verifies that a plaintext password matches the hashed version of that
password using the stored :py:mod:`passlib` password context
"""
BeskarError.require_condition(
self.totp_ctx is not None,
"Beskar must be initialized before this method is available",
)
totp_factory = self.totp_ctx.new()
"""
Optionally, if a :py:class:`User` model has a :py:meth:`get_cache_verify` method,
call it, and use that response as the :py:data:`last_counter` value.
"""
_last_counter = None
if hasattr(user, "get_cache_verify") and callable(user.get_cache_verify):
_last_counter = await user.get_cache_verify()
verify = totp_factory.verify(token, user.totp, last_counter=_last_counter) # type: ignore
"""
Optionally, if our User model has a :py:func:`cache_verify` function,
call it, providing the good verification :py:data:`counter` and
:py:data:`cache_seconds` to be stored by :py:func:`cache_verify` function.
This is for security against replay attacks, and should ideally be kept
in a cache, but can be stored in the db
"""
if hasattr(verify, "counter"):
if hasattr(user, "cache_verify") and callable(user.cache_verify):
logger.debug("Updating `User` token verify cache")
await user.cache_verify(counter=verify.counter, seconds=verify.cache_seconds)
return verify
[docs]
async def authenticate_totp(
self, user: str | object, token: str, lookup: str | None = "username"
) -> Any:
"""
Verifies that a TOTP validates against the stored TOTP for that
username.
If verification passes, the matching user instance is returned.
If automatically called by :py:func:`authenticate`,
it accepts a :py:class:`User` object instead of :py:data:`username`
and skips the :py:func:`lookup` call.
Args:
username (Union[str, object]): Username, email, or `User` object to
perform TOTP authentication against.
token (str): TOTP token value to validate.
lookup (str, optional): Type of lookup to perform, either `username` or `email` based.
Defaults to 'username'.
Returns:
:py:class:`User`: Validated `User` object.
Raises:
AuthenticationError: Failed TOTP authentication attempt.
"""
BeskarError.require_condition(
self.user_class is not None,
"Beskar must be initialized before this method is available",
)
"""
If we are called from `authenticate`, we already looked up the user,
don't waste the DB call again.
"""
if isinstance(user, str):
if lookup == "username":
user = await self.user_class.lookup(username=user)
elif lookup == "email":
user = await self.user_class.lookup(email=user)
else:
raise AuthenticationError("Lookup type *must* be either `username` or `email`")
else:
user = user
AuthenticationError.require_condition(
user is not None
and hasattr(user, "totp")
and user.totp
and await is_valid_json(user.totp),
"TOTP challenge is not properly configured for this user",
)
AuthenticationError.require_condition(
user is not None
and token is not None
and await self._verify_totp(
token,
user,
),
"The credentials provided are missing or incorrect",
)
return user
[docs]
async def authenticate(
self,
user: str,
password: str,
token: str | None = None,
lookup: str | None = "username",
) -> object:
"""
Verifies that a password matches the stored password for that username or
email.
If verification passes, the matching user instance is returned
.. note:: If :py:data:`BESKAR_TOTP_ENFORCE` is set to `True`
(default), and a user has a TOTP configuration, this call
must include the `token` value, or it will raise a
:py:exc:`~sanic_beskar.exceptions.TOTPRequired` exception
and not return the user.
This means either you will need to call it again, providing
the `token` value from the user, or separately call
:py:func:`authenticate_totp`,
which only performs validation of the `token` value,
and not the users password.
**Choose your own adventure.**
Args:
user (str): Username or email to authenticate
password (str): Password to validate against
token (str, optional): TOTP Token value to validate against.
Defaults to None.
lookup (str, optional): Type of lookup to perform, either `username` or `email` based.
Defaults to 'username'.
Returns:
:py:class:`User`: Authenticated `User` object.
Raises:
AuthenticationError: Failed password, TOTP, or password+TOTP attempt.
TOTPRequired: Account is required to supply TOTP.
"""
BeskarError.require_condition(
self.user_class is not None,
"Beskar must be initialized before this method is available",
)
if lookup == "username":
user_o = await self.user_class.lookup(username=user)
elif lookup == "email":
user_o = await self.user_class.lookup(email=user)
else:
raise AuthenticationError("Lookup type *must* be either `username` or `email`")
AuthenticationError.require_condition(
user_o is not None
and self._verify_password(
password,
user_o.password,
),
"The credentials provided are missing or incorrect",
)
"""
If we provided a TOTP token in this `authenticate` call,
or if the user is required to use TOTP, instead of
as a separate call to `authenticate_totp`, then lets do it here.
Failure to provide a TOTP token, when the user is required to use
TOTP, results in a `TOTPRequired` exception, and the calling
application will be required to either re-call `authenticate`
with all 3 arguments, or call `authenticate_otp` directly.
"""
if hasattr(user_o, "totp") or token:
if token:
user_o = await self.authenticate_totp(user_o, token)
elif self.totp_enforce:
raise TOTPRequired(
"Password authentication successful -- "
f"TOTP still *required* for user '{user_o.username}'."
)
"""
If we are set to BESKAR_HASH_AUTOUPDATE then check our hash
and if needed, update the user. The developer is responsible
for using the returned user object and updating the data
storage endpoint.
Else, if we are set to BESKAR_HASH_AUTOTEST then check out hash
and return exception if our hash is using the wrong scheme,
but don't modify the user.
"""
if self.hash_autoupdate:
await self.verify_and_update(user=user_o, password=password)
elif self.hash_autotest:
await self.verify_and_update(user=user_o)
return user_o
def _verify_password(self, raw_password: str, hashed_password: str) -> bool:
"""
Verifies that a plaintext password matches the hashed version of that
password using the stored :py:mod:`passlib` password context
"""
BeskarError.require_condition(
self.pwd_ctx is not None,
"Beskar must be initialized before this method is available",
)
return self.pwd_ctx.verify(raw_password, hashed_password)
def _check_user(self, user: object) -> bool:
"""
Checks to make sure that a user is valid. First, checks that the user
is not None. If this check fails, a MissingUserError is raised. Next,
checks if the user has a validation method. If the method does not
exist, the check passes. If the method exists, it is called. If the
result of the call is not truthy, a
:py:exc:`~sanic_beskar.exceptions.InvalidUserError` is raised.
"""
MissingUserError.require_condition(
user is not None,
"Could not find the requested user",
)
user_validate_method = getattr(user, self.user_class_validation_method, None)
if user_validate_method is None:
return True
InvalidUserError.require_condition(
user_validate_method(),
"The user is not valid or has had access revoked",
)
return True
[docs]
async def encode_paseto_token(
self,
user: Any,
override_access_lifespan: pendulum.Duration | None = None,
override_refresh_lifespan: pendulum.Duration | None = None,
bypass_user_check: bool | None = False,
is_registration_token: bool | None = False,
is_reset_token: bool | None = False,
**custom_claims: dict | None,
) -> str:
"""
Encodes user data into a PASETO token that can be used for authorization
at protected endpoints
.. note:: Note that any claims supplied as `custom_claims` here must be
:py:mod:`json` compatible types.
Args:
user (:py:class:`User`): `User` to generate a token for.
override_access_lifespan (pendulum.Duration, optional): Override's the
instance's access lifespan to set a custom duration after which
the new token's accessibility will expire. May not exceed the
:py:data:`refresh_lifespan`. Defaults to `None`.
override_refresh_lifespan (pendulum.Duration, optional): Override's the
instance's refresh lifespan to set a custom duration after which
the new token's refreshability will expire. Defaults to `None`.
bypass_user_check (bool, optional): Override checking the user for
being real/active. Used for registration token generation.
Defaults to `False`.
is_registration_token (bool, optional): Indicates that the token will
be used only for email-based registration. Defaults to `False`.
is_reset_token (bool, optional): Indicates that the token will
be used only for lost password reset. Defaults to `False`.
custom_claims (dict, optional): Additional claims that should be packed
in the payload. Defaults to `None`.
Returns:
str: Encoded PASETO token string.
Raises:
ClaimCollisionError: Tried to supply a RESERVED_CLAIM in the `custom_claims`.
"""
ClaimCollisionError.require_condition(
set(custom_claims.keys()).isdisjoint(RESERVED_CLAIMS),
"The custom claims collide with required claims",
)
if not bypass_user_check:
self._check_user(user)
moment = pendulum.now("UTC")
if override_refresh_lifespan is None:
refresh_lifespan = self.refresh_lifespan
else:
refresh_lifespan = override_refresh_lifespan
refresh_expiration = (moment + refresh_lifespan).int_timestamp
if override_access_lifespan is None:
access_lifespan = self.access_lifespan
else:
access_lifespan = override_access_lifespan
access_expiration = min(
(moment + access_lifespan).int_timestamp,
refresh_expiration,
)
payload_parts = {
"iat": moment.int_timestamp,
"exp": access_expiration,
"jti": str(uuid.uuid4()),
"id": user.identity,
"rls": ",".join(user.rolenames),
REFRESH_EXPIRATION_CLAIM: refresh_expiration,
}
if is_registration_token:
payload_parts[IS_REGISTRATION_TOKEN_CLAIM] = True
if is_reset_token:
payload_parts[IS_RESET_TOKEN_CLAIM] = True
logger.debug(
f"Attaching custom claims: {custom_claims}",
)
payload_parts.update(custom_claims)
if self.encode_token_hook:
self.encode_token_hook(**payload_parts)
# PASETO stores its own EXP as seconds from now()
time_delta = access_expiration - moment.int_timestamp
return self.paseto_ctx.encode(
self.paseto_parsed_keys,
payload_parts,
serializer=ujson,
exp=time_delta,
).decode(
"utf-8"
) # bytes by default, which are ugly
[docs]
async def encode_jwt_token(
self,
user: Any,
override_access_lifespan: pendulum.Duration | None = None,
override_refresh_lifespan: pendulum.Duration | None = None,
bypass_user_check: bool | None = False,
is_registration_token: bool | None = False,
is_reset_token: bool | None = False,
**custom_claims: dict | None,
) -> str:
"""
Encodes user data into a jwt token that can be used for authorization
at protected endpoints
Args:
user (:py:class:`User`): `User` to generate a token for.
override_access_lifespan (pendulum.Duration, optional): Override's the
instance's access lifespan to set a custom duration after which
the new token's accessibility will expire. May not exceed the
:py:data:`refresh_lifespan`. Defaults to `None`.
override_refresh_lifespan (pendulum.Duration, optional): Override's the
instance's refresh lifespan to set a custom duration after which
the new token's refreshability will expire. Defaults to `None`.
bypass_user_check (bool, optional): Override checking the user for
being real/active. Used for registration token generation.
Defaults to `False`.
is_registration_token (bool, optional): Indicates that the token will
be used only for email-based registration. Defaults to `False`.
is_reset_token (bool, optional): Indicates that the token will
be used only for lost password reset. Defaults to `False`.
custom_claims (dict, optional): Additional claims that should be packed
in the payload. Defaults to `None`.
Returns:
str: Encoded JWT token string.
Raises:
ClaimCollisionError: Tried to supply a RESERVED_CLAIM in the `custom_claims`.
"""
ClaimCollisionError.require_condition(
set(custom_claims.keys()).isdisjoint(RESERVED_CLAIMS),
"The custom claims collide with required claims",
)
if not bypass_user_check:
self._check_user(user)
moment = pendulum.now("UTC")
if override_refresh_lifespan is None:
refresh_lifespan = self.refresh_lifespan
else:
refresh_lifespan = override_refresh_lifespan
refresh_expiration = (moment + refresh_lifespan).int_timestamp
if override_access_lifespan is None:
access_lifespan = self.access_lifespan
else:
access_lifespan = override_access_lifespan
access_expiration = min(
(moment + access_lifespan).int_timestamp,
refresh_expiration,
)
payload_parts = {
"iat": moment.int_timestamp,
"exp": access_expiration,
"jti": str(uuid.uuid4()),
"id": user.identity,
"rls": ",".join(user.rolenames),
REFRESH_EXPIRATION_CLAIM: refresh_expiration,
}
if is_registration_token:
payload_parts[IS_REGISTRATION_TOKEN_CLAIM] = True
if is_reset_token:
payload_parts[IS_RESET_TOKEN_CLAIM] = True
logger.debug(f"Attaching custom claims: {custom_claims}")
payload_parts.update(custom_claims)
if self.encode_token_hook:
self.encode_token_hook(**payload_parts)
return jwt.encode(
payload_parts,
self.encode_key,
self.encode_algorithm,
json_encoder=JSONEncoder,
)
[docs]
async def encode_token(
self,
user: object,
override_access_lifespan: pendulum.Duration | None = None,
override_refresh_lifespan: pendulum.Duration | None = None,
bypass_user_check: bool | None = False,
is_registration_token: bool | None = False,
is_reset_token: bool | None = False,
**custom_claims: dict | None,
) -> str:
"""
Wrapper function to encode user data into a `insert_type_here` token
that can be used for authorization at protected endpoints.
Calling this will allow your app configuration to automagically create
the appropriate token type.
Args:
user (:py:class:`User`): `User` to generate a token for.
override_access_lifespan (pendulum.Duration, optional): Override's the
instance's access lifespan to set a custom duration after which
the new token's accessibility will expire. May not exceed the
:py:data:`refresh_lifespan`. Defaults to `None`.
override_refresh_lifespan (pendulum.Duration, optional): Override's the
instance's refresh lifespan to set a custom duration after which
the new token's refreshability will expire. Defaults to `None`.
bypass_user_check (bool, optional): Override checking the user for
being real/active. Used for registration token generation.
Defaults to `False`.
is_registration_token (bool, optional): Indicates that the token will
be used only for email-based registration. Defaults to `False`.
is_reset_token (bool, optional): Indicates that the token will
be used only for lost password reset. Defaults to `False`.
custom_claims (dict, optional): Additional claims that should be packed
in the payload. Defaults to `None`.
Returns:
str: Encoded token string of application configuration type `TOKEN_PROVIDER`.
Raises:
ClaimCollisionError: Tried to supply a RESERVED_CLAIM in the `custom_claims`.
"""
_token: str = await getattr(self, f"encode_{self.token_provider}_token")(
user,
override_access_lifespan=override_access_lifespan,
override_refresh_lifespan=override_refresh_lifespan,
bypass_user_check=bypass_user_check,
is_registration_token=is_registration_token,
is_reset_token=is_reset_token,
**custom_claims,
)
return _token
[docs]
async def encode_eternal_token(self, user: object, **custom_claims: dict | None) -> str:
"""
This utility function encodes an application configuration defined
type token that never expires
.. note:: This should be used sparingly since the token could become
a security concern if it is ever lost. If you use this
method, you should be sure that your application also
implements a blacklist so that a given token can be blocked
should it be lost or become a security concern
Args:
user (:py:class:`User`): `User` to generate a token for.
custom_claims (dict, optional): Additional claims that should be packed
in the payload. Defaults to `None`.
Returns:
str: Encoded, *never expiring*, token string of application configuration
type `TOKEN_PROVIDER`.
"""
return await self.encode_token(
user,
override_access_lifespan=VITAM_AETERNUM,
override_refresh_lifespan=VITAM_AETERNUM,
custom_claims=custom_claims,
)
[docs]
async def refresh_token(
self, token: str, override_access_lifespan: pendulum.Duration | None = None
) -> str:
"""
Wrapper function to creates a new token for a user if and only if the old
token's access permission is expired but its refresh permission is not yet
expired. The new token's refresh expiration moment is the same as the old
token's, but the new token's access expiration is refreshed.
Token type is determined by application configuration, when using this
helper function.
Args:
token (str): The existing token that needs to be replaced with a new,
refreshed token.
override_access_lifespan (_type_, optional): Override's the instance's
access lifespan to set a custom duration after which the new
token's accessibility will expire. May not exceed the
:py:data:`refresh_lifespan`. Defaults to `None`.
Returns:
str: Encoded token string of application configuration type `TOKEN_PROVIDER`.
"""
_token: str = await getattr(self, f"refresh_{self.token_provider}_token")(
token=token, override_access_lifespan=override_access_lifespan
)
return _token
[docs]
async def refresh_paseto_token(
self, token: str, override_access_lifespan: pendulum.Duration | None = None
) -> bytes:
"""
Creates a new PASETO token for a user if and only if the old token's access
permission is expired but its refresh permission is not yet expired.
The new token's refresh expiration moment is the same as the old
token's, but the new token's access expiration is refreshed
Args:
token (str): The existing token that needs to be replaced with a new,
refreshed token.
override_access_lifespan (_type_, optional): Override's the instance's
access lifespan to set a custom duration after which the new
token's accessibility will expire. May not exceed the
:py:data:`refresh_lifespan`. Defaults to `None`.
Returns:
bytes: Encoded PASETO token string.
"""
moment = pendulum.now("UTC")
data = await self.extract_token(token, access_type=AccessType.refresh)
user = await self.user_class.identify(data["id"])
self._check_user(user)
if not override_access_lifespan:
access_lifespan = self.access_lifespan
else:
access_lifespan = override_access_lifespan
refresh_expiration = data[REFRESH_EXPIRATION_CLAIM]
access_expiration = min(
(moment + access_lifespan).int_timestamp,
refresh_expiration,
)
custom_claims = {k: v for (k, v) in data.items() if k not in RESERVED_CLAIMS}
payload_parts = {
"iat": moment.int_timestamp,
"exp": access_expiration,
"jti": data["jti"],
"id": data["id"],
"rls": ",".join(user.rolenames),
REFRESH_EXPIRATION_CLAIM: refresh_expiration,
}
payload_parts.update(custom_claims)
if self.refresh_token_hook:
self.refresh_token_hook(**payload_parts)
# PASETO stores its own EXP as seconds from now()
time_delta = access_expiration - moment.int_timestamp
_token: bytes = self.paseto_ctx.encode(
self.paseto_parsed_keys,
payload_parts,
serializer=ujson,
exp=time_delta,
)
return _token
[docs]
async def refresh_jwt_token(
self, token: str, override_access_lifespan: pendulum.Duration | None = None
) -> str:
"""
Creates a new JWT token for a user if and only if the old token's access
permission is expired but its refresh permission is not yet expired.
The new token's refresh expiration moment is the same as the old
token's, but the new token's access expiration is refreshed
Args:
token (str): The existing token that needs to be replaced with a new,
refreshed token.
override_access_lifespan (_type_, optional): Override's the instance's
access lifespan to set a custom duration after which the new
token's accessibility will expire. May not exceed the
:py:data:`refresh_lifespan`. Defaults to `None`.
Returns:
str: Encoded JWT token string.
"""
moment = pendulum.now("UTC")
data = await self.extract_token(token, access_type=AccessType.refresh)
user = await self.user_class.identify(data["id"])
self._check_user(user)
if not override_access_lifespan:
access_lifespan: pendulum.Duration = self.access_lifespan
else:
access_lifespan = override_access_lifespan
refresh_expiration = data[REFRESH_EXPIRATION_CLAIM]
access_expiration = min(
(moment + access_lifespan).int_timestamp,
refresh_expiration,
)
custom_claims = {k: v for (k, v) in data.items() if k not in RESERVED_CLAIMS}
payload_parts = {
"iat": moment.int_timestamp,
"exp": access_expiration,
"jti": data["jti"],
"id": data["id"],
"rls": ",".join(user.rolenames),
REFRESH_EXPIRATION_CLAIM: refresh_expiration,
}
payload_parts.update(custom_claims)
if self.refresh_token_hook:
self.refresh_token_hook(**payload_parts)
return jwt.encode(
payload_parts,
self.encode_key,
self.encode_algorithm,
)
def _validate_token_data(self, data: dict, access_type: AccessType) -> None:
"""
Validates that the data for a jwt token is valid
"""
MissingClaimError.require_condition(
"jti" in data,
"Token is missing jti claim",
)
BlacklistedError.require_condition(
not self.is_blacklisted(data["jti"]),
"Token has a blacklisted jti",
)
MissingClaimError.require_condition(
"id" in data,
"Token is missing id field",
)
MissingClaimError.require_condition(
"exp" in data,
"Token is missing exp claim",
)
MissingClaimError.require_condition(
REFRESH_EXPIRATION_CLAIM in data,
f"Token is missing {REFRESH_EXPIRATION_CLAIM} claim",
)
moment = pendulum.now("UTC").int_timestamp
if access_type == AccessType.access:
MisusedRegistrationToken.require_condition(
IS_REGISTRATION_TOKEN_CLAIM not in data,
"registration token used for access",
)
MisusedResetToken.require_condition(
IS_RESET_TOKEN_CLAIM not in data,
"password reset token used for access",
)
ExpiredAccessError.require_condition(
moment <= data["exp"],
"access permission has expired",
)
elif access_type == AccessType.refresh:
MisusedRegistrationToken.require_condition(
IS_REGISTRATION_TOKEN_CLAIM not in data,
"registration token used for refresh",
)
MisusedResetToken.require_condition(
IS_RESET_TOKEN_CLAIM not in data,
"password reset token used for refresh",
)
EarlyRefreshError.require_condition(
moment > data["exp"],
"access permission for token has not expired. may not refresh",
)
ExpiredRefreshError.require_condition(
moment <= data[REFRESH_EXPIRATION_CLAIM],
"refresh permission for token has expired",
)
elif access_type == AccessType.register:
ExpiredAccessError.require_condition(
moment <= data["exp"],
"register permission has expired",
)
InvalidRegistrationToken.require_condition(
IS_REGISTRATION_TOKEN_CLAIM in data,
"invalid registration token used for verification",
)
MisusedResetToken.require_condition(
IS_RESET_TOKEN_CLAIM not in data,
"password reset token used for registration",
)
elif access_type == AccessType.reset:
MisusedRegistrationToken.require_condition(
IS_REGISTRATION_TOKEN_CLAIM not in data,
"registration token used for reset",
)
ExpiredAccessError.require_condition(
moment <= data["exp"],
"reset permission has expired",
)
InvalidResetToken.require_condition(
IS_RESET_TOKEN_CLAIM in data,
"invalid reset token used for verification",
)
def _unpack_header(self, headers: Header) -> str | None:
"""
Unpacks a token from a request header
"""
token_header: str = headers.get(self.header_name, "")
MissingToken.require_condition(
token_header,
f"Token not found in headers under '{self.header_name}'",
)
match = re.match(self.header_type + r"\s*([\w\.-]+)", token_header)
InvalidTokenHeader.require_condition(
match is not None,
"Token header structure is invalid",
)
return match.group(1) # type: ignore
def _unpack_cookie(self, cookies: dict = {}) -> str:
"""
Unpacks a jwt token from a request cookies
"""
token_cookie: str = cookies.get(self.cookie_name, "")
MissingToken.require_condition(
token_cookie, f"Token not found in cookie under '{self.cookie_name}'"
)
return token_cookie
[docs]
def read_token_from_cookie(self, request: Request) -> str:
"""
Unpacks a token from the current sanic request
Args:
request (Request): Current Sanic `Request`.
Returns:
str: Unpacked token from cookie.
"""
_request = get_request(request)
return self._unpack_cookie(_request.cookies)
[docs]
def read_token(self, request: Request) -> str:
"""
Tries to unpack the token from the current sanic request
in the locations configured by :py:data:`TOKEN_PLACES`.
Check-Order is defined by the value order in :py:data:`TOKEN_PLACES`.
Args:
request (sanic.Request): Sanic ``request`` object
Returns:
str: Token.
Raises:
:py:exc:`~sanic_beskar.exceptions.MissingToken`: Token is not found in any
:py:data:`~sanic_beskar.constants.TOKEN_PLACES`
"""
_request = get_request(request)
for place in self.token_places:
try:
_token: str = getattr(self, f"read_token_from_{place.lower()}")(_request)
return _token
except MissingToken:
pass
except AttributeError:
warnings.warn(
textwrap.dedent(f"""
Sanic_Beskar hasn't implemented reading tokens
from location '{place.lower()}'.
Please reconfigure TOKEN_PLACES.
Values accepted in TOKEN_PLACES are:
{self.token_places}
"""),
UserWarning,
)
raise MissingToken(textwrap.dedent(f"""
Could not find token in any
of the given locations: {self.token_places}
""").replace("\n", ""))
[docs]
async def send_registration_email(
self,
email: str,
user: object,
template: str | jinja2.nodes.Template | None = None,
confirmation_sender: str | None = None,
confirmation_uri: str | None = None,
subject: str | None = None,
override_access_lifespan: pendulum.Duration | None = None,
) -> dict:
"""
Sends a registration email to a new user, containing a time expiring
token usable for validation. This requires your application
is initialized with a `mail` extension, which supports
sanic-mailing's :py:class:`Message` object and a
:py:meth:`send_message` method.
Args:
user (:py:class:`User`): The user object to tie claim to
(username, id, email, etc)
template (Optional, :py:data:`filehandle`): HTML Template for confirmation
email. If not provided, a stock entry is used.
confirmation_sender (Optional, str): The sender that should be attached to the
confirmation email. Overrides the :py:data:`BESKAR_CONFIRMATION_SENDER`
config setting.
confirmation_uri (Optional, str): The uri that should be visited to complete email
registration. Should usually be a uri to a frontend or external service
that calls a 'finalize' method in the api to complete registration. Will
override the :py:data:`BESKAR_CONFIRMATION_URI` config setting.
subject (Optional, str): The registration email subject. Will override the
:py:data:`BESKAR_CONFIRMATION_SUBJECT` config setting.
override_access_lifespan (Optional, :py:data:`pendulum`): Overrides the
:py:data:`TOKEN_ACCESS_LIFESPAN` to set an access lifespan for the
registration token.
Returns:
dict: Summary of information sent, along with the `result` from mail send. (Essentially
the response of :py:func:`send_token_email`).
"""
if subject is None:
subject = self.confirmation_subject
if confirmation_uri is None:
confirmation_uri = self.confirmation_uri
sender = confirmation_sender or self.confirmation_sender
logger.debug(f"Generating token with lifespan: {override_access_lifespan}")
custom_token = await self.encode_token(
user,
override_access_lifespan=override_access_lifespan,
bypass_user_check=True,
is_registration_token=True,
)
_return: dict = await self.send_token_email(
email,
user=user,
template=template,
action_sender=sender,
action_uri=confirmation_uri,
subject=subject,
custom_token=custom_token,
)
return _return
[docs]
async def send_reset_email(
self,
email: str,
template: str | jinja2.nodes.Template | None = None,
reset_sender: str | None = None,
reset_uri: str | None = None,
subject: str | None = None,
override_access_lifespan: pendulum.Duration | None = None,
) -> dict:
"""
Sends a password reset email to a user, containing a time expiring
token usable for validation. This requires your application
is initialized with a :py:mod:`mail` extension, which supports
sanic-mailing's :py:class:`Message` object and a
:py:meth:`send_message()` method.
Args:
email (str): The email address to attempt to send to.
template (Optional, :py:data:`filehandle`): HTML Template for reset email.
If not provided, a stock entry is used.
reset_sender (Optional, str): The sender that should be attached to the
reset email. Defaults to :py:data:`BESKAR_RESET_SENDER` config setting.
reset_uri (Optional, str): The uri that should be visited to complete password
reset. Should usually be a uri to a frontend or external service that calls
the 'validate_reset_token()' method in the api to complete reset. Defaults to
:py:data:`BESKAR_RESET_URI` config setting.
subject (Optional, str): The reset email subject. Defaults to
:py:data:`BESKAR_RESET_SUBJECT` config setting.
override_access_lifespan (Optional, :py:data:`pendulum`): Overrides the
:py:data:`TOKEN_ACCESS_LIFESPAN` to set an access lifespan for the registration token.
Defaults to :py:data:`TOKEN_ACCESS_LIFESPAN` config setting.
Returns:
dict: Summary of information sent, along with the `result` from mail send. (Essentially
the response of :py:func:`send_token_email`).
"""
if subject is None:
subject = self.reset_subject
if reset_uri is None:
reset_uri = self.reset_uri
sender = reset_sender or self.reset_sender
user = await self.user_class.lookup(email=email)
MissingUserError.require_condition(
user is not None,
"Could not find the requested user",
)
logger.debug(f"Generating token with lifespan: {override_access_lifespan}")
custom_token = await self.encode_token(
user,
override_access_lifespan=override_access_lifespan,
bypass_user_check=False,
is_reset_token=True,
)
_return: dict = await self.send_token_email(
user.email,
user=user,
template=template,
action_sender=sender,
action_uri=reset_uri,
subject=subject,
custom_token=custom_token,
)
return _return
[docs]
async def send_token_email(
self,
email: str,
user: object,
template: str | jinja2.nodes.Template | None = None,
action_sender: str | None = "",
action_uri: str | None = "",
subject: str | None = "",
override_access_lifespan: pendulum.Duration | None = None,
custom_token: str = "",
) -> dict:
"""
Sends an email to a user, containing a time expiring
token usable for several actions. This requires
your application is initialized with a `mail` extension,
which supports sanic-mailing's :py:class:`Message` object and
a :py:meth:`send_message` method.
Args:
user (:py:class:`User`): The user object to tie claim to (username, id, email, etc)
email (str): The email address to attempt to send to.
template (Optional, :py:data:`filehandle`): HTML Template for the email.
If not provided, a stock entry is used.
action_sender (str): The sender that should be attached to the email.
action_uri (str): The uri that should be visited to complete this notification
action.
subject (str): The email subject.
override_access_lifespan (Optional, :py:data:`pendulum`): Overrides the
:py:data:`TOKEN_ACCESS_LIFESPAN` to set an access lifespan for the registration token.
Defaults to :py:data:`TOKEN_ACCESS_LIFESPAN` config setting.
custom_token (str): The token to be carried as the email's payload.
Returns:
dict: Summary of information sent, along with the `result` from mail send. (Essentially
the response of :py:func:`send_token_email`).
Raises:
:py:exc:`~sanic_beskar.exceptions.BeskarError`: Missing required parameters.
"""
notification = {
"result": None,
"message": None,
"user": str(user),
"email": email,
"token": custom_token,
"subject": subject,
"confirmation_uri": action_uri, # backwards compatibility
"action_uri": action_uri,
}
BeskarError.require_condition(
self.app.ctx.mail,
"Your app must have a mail extension enabled to register by email",
)
BeskarError.require_condition(
action_sender,
"A sender is required to send confirmation email",
)
BeskarError.require_condition(
custom_token,
"A custom_token is required to send notification email",
)
if template is None:
async with aiofiles.open(self.confirmation_template) as fh:
template = await fh.read()
with BeskarError.handle_errors("fail sending email"):
jinja_tmpl = jinja2.Template(template, autoescape=True, enable_async=True)
notification["message"] = (await jinja_tmpl.render_async(notification)).strip()
_mail = import_module(self.app.ctx.mail.__module__)
msg = _mail.Message(
subject=notification["subject"],
to=[notification["email"]],
from_address=action_sender,
html=notification["message"],
reply_to=[action_sender],
)
logger.debug(f"Sending email to {email}")
notification["result"] = await self.app.ctx.mail.send_message(msg)
return notification
[docs]
async def get_user_from_registration_token(self, token: str) -> Any:
"""
Gets a user based on the registration token that is supplied. Verifies
that the token is a registration token and that the user can be properly
retrieved
Args:
token (str): Registration token to validate.
Returns:
:py:class:`User`: :py:class:`User` object of looked up user after token validation
"""
data = await self.extract_token(token, access_type=AccessType.register)
user_id = data.get("id")
BeskarError.require_condition(
user_id is not None,
"Could not fetch an id from the registration token",
)
user = await self.user_class.identify(user_id)
BeskarError.require_condition(
user is not None,
"Could not identify the user from the registration token",
)
return user
[docs]
async def validate_reset_token(self, token: str) -> Any:
"""
Validates a password reset request based on the reset token
that is supplied. Verifies that the token is a reset token
and that the user can be properly retrieved
Args:
token (str): Reset token to validate.
Returns:
:py:class:`User`: object of looked up user after token validation
Raises:
:py:exc:`~sanic_beskar.exceptions.BeskarError`: Missing required parameters
"""
data = await self.extract_token(token, access_type=AccessType.reset)
user_id = data.get("id")
BeskarError.require_condition(
user_id is not None,
"Could not fetch an id from the reset token",
)
user = await self.user_class.identify(user_id)
BeskarError.require_condition(
user is not None,
"Could not identify the user from the reset token",
)
return user
[docs]
def hash_password(self, raw_password: str) -> str:
"""
Hashes a plaintext password using the stored passlib password context
Args:
raw_password (str): cleartext password for the user
Returns:
str: Properly hashed ciphertext of supplied :py:data:`raw_password`
Raises:
:py:exc:`~sanic_beskareptions.BeskarError`: No password is provided
"""
BeskarError.require_condition(
self.pwd_ctx is not None,
"Beskar must be initialized before this method is available",
)
"""
`scheme` is now set with self.pwd_ctx.update(default=scheme) due
to the depreciation in upcoming passlib 2.0.
zillions of warnings suck.
"""
return self.pwd_ctx.hash(raw_password)
[docs]
async def verify_and_update(self, user: Any, password: str = "") -> Any:
"""
Validate a password hash contained in the user object is
hashed with the defined hash scheme (:py:data:`BESKAR_HASH_SCHEME`).
If not, raise an Exception of :py:exc:`~sanic_beskar.exceptions.LegacySchema`,
unless the :py:data:`password` argument is provided, in which case an updated
:py:class:`User` will be returned, and must be saved by the calling app. The
updated :py:class:`User` will contain the users current password updated to the
currently desired hash scheme (:py:exc:`~BESKAR_HASH_SCHEME`).
Args:
user (:py:class:`User`): The user class to tie claim to (username, id,
email, etc). *MUST* include the password field, defined as :py:attr:`password`.
password (str): The user's provide password from login. If present, this is used
to validate and then attempt to update with the new :py:data:`BESKAR_HASH_SCHEME`
scheme.
Returns:
:py:class:`User`: Authenticated :py:class:`User`
Raises:
:py:exc:`~sanic_beskar.exceptions.AuthenticationError`: Authentication failure
"""
if self.pwd_ctx.needs_update(user.password):
if password:
rv, updated = self.pwd_ctx.verify_and_update(
password,
user.password,
)
AuthenticationError.require_condition(
rv,
"Could not verify password",
)
user.password = updated
else:
used_hash = self.pwd_ctx.identify(user.password)
desired_hash = self.hash_scheme
raise LegacyScheme(
f"Hash using non-current scheme '{used_hash}'." f"Use '{desired_hash}' instead."
)
return user