Source code for crossauth_backend.oauth.clientmanager

# Copyright (c) 2024 Matthew Baker.  All rights reserved.  Licenced under the Apache Licence 2.0.  See LICENSE file
from typing import TypedDict, Required, List, NamedTuple
from nulltype import NullType
import urllib.parse
from crossauth_backend.storage import OAuthClientStorage
from crossauth_backend.utils import set_parameter, ParamType
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.common.logger import CrossauthLogger, j
from crossauth_backend.common.interfaces import OAuthClient
from crossauth_backend.oauth.client import OAuthFlows
from crossauth_backend.crypto import Crypto

CLIENT_ID_LENGTH = 16
CLIENT_SECRET_LENGTH = 32

[docs] class UpdateClientReturn(NamedTuple): client: OAuthClient new_secret: bool
[docs] class OAuthClientManagerOptions(TypedDict, total=False): """ Options for :class:`OAuthClientManager` """ oauth_pbkdf2digest : str """ PBKDF2 HMAC for hashing client secret """ oauth_pbkdf2iterations : int """ PBKDF2 iterations for hashing client secret """ oauth_pbkdf2key_length : int """ PBKDF2 key length for hashing client secret """ client_storage : Required[OAuthClientStorage] """ Database for storage clients """
[docs] class OAuthClientManager: def __init__(self, options: OAuthClientManagerOptions): if not options["client_storage"]: raise CrossauthError(ErrorCode.Configuration, "Must specify client_storage when adding a client manager") self.__client_storage = options["client_storage"] self.__oauth_pbkdf2_digest = "sha256" self.__oauth_pbkdf2_iterations = 40000 self.__oauth_pbkdf2_key_length = 32 set_parameter("oauth_pbkdf2_digest", ParamType.String, self, options, "OAUTH_PBKDF2_DIGEST") set_parameter("oauth_pbkdf2_key_length", ParamType.String, self, options, "OAUTH_PBKDF2_KEYLENGTH") set_parameter("require_redirect_uri_registration", ParamType.Boolean, self, options, "OAUTH_REQUIRE_REDIRECT_URI_REGISTRATION")
[docs] async def create_client(self, client_name: str, redirect_uri: List[str], valid_flow: List[str]|None = None, confidential: bool = True, userid: str|int|None = None) -> OAuthClient: client_id = OAuthClientManager.random_client_id() client_secret = None plaintext = None if confidential: plaintext = OAuthClientManager.random_client_secret() client_secret = await Crypto.password_hash(plaintext, { "encode": True, "iterations": self.__oauth_pbkdf2_iterations, "key_len": self.__oauth_pbkdf2_key_length, "digest": self.__oauth_pbkdf2_digest, }) for uri in redirect_uri: OAuthClientManager.validate_uri(uri) if not valid_flow: valid_flow = OAuthFlows.all_flows() client : OAuthClient = { "client_id": client_id, "client_name": client_name, "redirect_uri": redirect_uri, "confidential": confidential, "valid_flow": valid_flow, } if (userid is not None): client["userid"] = userid if (client_secret is not None): client["client_secret"] = client_secret new_client = None for try_num in range(5): try: new_client = await self.__client_storage.create_client(client) break except Exception as e: if try_num == 4: ce = CrossauthError.as_crossauth_error(e) if ce.code != ErrorCode.ClientExists: raise e else: client["client_id"] = OAuthClientManager.random_client_id() if not new_client: raise CrossauthError(ErrorCode.ClientExists) if "client_secret" in new_client and type(new_client["client_secret"]) is not NullType and plaintext: new_client["client_secret"] = plaintext return new_client
[docs] async def update_client(self, client_id: str, client: OAuthClient, reset_secret: bool = False) -> UpdateClientReturn: old_client = await self.__client_storage.get_client_by_id(client_id) new_secret = False plaintext = None if (client.get("confidential") is True and not old_client["confidential"]) or \ (client.get("confidential") is True and reset_secret): plaintext = OAuthClientManager.random_client_secret() client["client_secret"] = await Crypto.password_hash(plaintext, { "encode": True, "iterations": self.__oauth_pbkdf2_iterations, "key_len": self.__oauth_pbkdf2_key_length, "digest": self.__oauth_pbkdf2_digest, }) new_secret = True elif client.get("confidential") is False and "client_secret" in client: del client["client_secret"] if "redirect_uri" in client: for uri in client["redirect_uri"]: OAuthClientManager.validate_uri(uri) client["client_id"] = client_id await self.__client_storage.update_client(client) new_client = await self.__client_storage.get_client_by_id(client_id) if plaintext: new_client["client_secret"] = plaintext return UpdateClientReturn(client, new_secret)
[docs] @staticmethod def random_client_id() -> str: return Crypto.random_value(CLIENT_ID_LENGTH)
[docs] @staticmethod def random_client_secret() -> str: return Crypto.random_value(CLIENT_SECRET_LENGTH)
[docs] @staticmethod def validate_uri(uri: str): valid = False try: valid_uri = urllib.parse.urlparse(uri) valid = not valid_uri.fragment except: # test if it's a valid relative url try: valid_uri = urllib.parse.urlparse(uri, scheme="http") valid = not valid_uri.fragment except Exception as e2: CrossauthLogger.logger().debug(j({"err": e2})) if not valid: raise CrossauthError.from_oauth_error("invalid_request", f"Invalid redirect Uri {uri}")