crossauth_backend package¶
Subpackages¶
- crossauth_backend.common package
- Submodules
- crossauth_backend.common.error module
CrossauthErrorErrorCodeErrorCode.AuthorizationPendingErrorCode.BadRequestErrorCode.ClientExistsErrorCode.ConfigurationErrorCode.ConnectionErrorCode.ConstraintViolationErrorCode.DataFormatErrorCode.EmailNotExistErrorCode.EmailNotVerifiedErrorCode.ExpiredErrorCode.ExpiredTokenErrorCode.Factor2ResetNeededErrorCode.FetchErrorErrorCode.ForbiddenErrorCode.FormEntryErrorCode.InsufficientPriviledgesErrorCode.InsufficientScopeErrorCode.InvalidClientIdErrorCode.InvalidClientIdOrSecretErrorCode.InvalidClientSecretErrorCode.InvalidCsrfErrorCode.InvalidEmailErrorCode.InvalidHashErrorCode.InvalidKeyErrorCode.InvalidOAuthFlowErrorCode.InvalidPhoneNumberErrorCode.InvalidRedirectUriErrorCode.InvalidScopeErrorCode.InvalidSessionErrorCode.InvalidTokenErrorCode.InvalidUsernameErrorCode.KeyExistsErrorCode.MfaRequiredErrorCode.NotImplementedErrorCode.PasswordChangeNeededErrorCode.PasswordFormatErrorCode.PasswordInvalidErrorCode.PasswordMatchErrorCode.PasswordResetNeededErrorCode.SlowDownErrorCode.TwoFactorIncompleteErrorCode.UnauthorizedErrorCode.UnauthorizedClientErrorCode.UnknownErrorErrorCode.UnsupportedAlgorithmErrorCode.UserExistsErrorCode.UserNotActiveErrorCode.UserNotExistErrorCode.UsernameOrPasswordInvalidErrorCode.ValueError
- crossauth_backend.common.interfaces module
ApiKeyKeyKeyPrefixLdapUserOAuthClientPartialKeyPartialOAuthClientPartialUserPartialUserInputFieldsPartialUserSecretsUserUserInputFieldsUserSecretsUserSecretsInputFieldsUserStateUserState.activeUserState.awaiting_email_verificationUserState.awaiting_two_factor_setupUserState.awaiting_two_factor_setup_and_email_verificationUserState.disabledUserState.factor2_reset_neededUserState.password_and_factor2_reset_neededUserState.password_change_neededUserState.password_reset_needed
get_json_data()optional_equals()optional_not_equals()
- crossauth_backend.common.jwt module
- crossauth_backend.common.logger module
CrossauthLoggerCrossauthLoggerInterfaceCrossauthLoggerInterface.DebugCrossauthLoggerInterface.ErrorCrossauthLoggerInterface.InfoCrossauthLoggerInterface.NoLoggingCrossauthLoggerInterface.WarnCrossauthLoggerInterface.debug()CrossauthLoggerInterface.error()CrossauthLoggerInterface.info()CrossauthLoggerInterface.levelCrossauthLoggerInterface.set_level()CrossauthLoggerInterface.warn()
j()
- Module contents
- crossauth_backend.oauth package
- Submodules
- crossauth_backend.oauth.client module
IdTokenReturnOAuthClientOAuthClient.client_credentials_flow()OAuthClient.code_challenge_and_verifier()OAuthClient.get_oidc_config()OAuthClient.get_token_payload()OAuthClient.id_token_authorized()OAuthClient.load_config()OAuthClient.mfa_authenticators()OAuthClient.mfa_oob_complete()OAuthClient.mfa_oob_request()OAuthClient.mfa_otp_complete()OAuthClient.mfa_otp_request()OAuthClient.password_flow()OAuthClient.poll_device_code_flow()OAuthClient.random_value()OAuthClient.redirect_endpoint()OAuthClient.refresh_token_flow()OAuthClient.sha256()OAuthClient.start_authorization_code_flow()OAuthClient.start_device_code_flow()OAuthClient.user_info_endpoint()OAuthClient.validate_id_token()
OAuthClientOptionsOAuthClientOptions.audienceOAuthClientOptions.auth_server_base_urlOAuthClientOptions.client_idOAuthClientOptions.client_secretOAuthClientOptions.clock_toleranceOAuthClientOptions.code_challenge_methodOAuthClientOptions.device_authorization_urlOAuthClientOptions.jwt_key_typeOAuthClientOptions.jwt_public_keyOAuthClientOptions.jwt_public_key_fileOAuthClientOptions.jwt_secret_keyOAuthClientOptions.jwt_secret_key_fileOAuthClientOptions.key_storageOAuthClientOptions.oauth_authorize_redirectOAuthClientOptions.oauth_post_typeOAuthClientOptions.oauth_use_user_info_endpointOAuthClientOptions.oidc_configOAuthClientOptions.persist_access_tokenOAuthClientOptions.redirect_uriOAuthClientOptions.state_lengthOAuthClientOptions.verifier_length
OAuthDeviceAuthorizationResponseOAuthDeviceAuthorizationResponse.device_codeOAuthDeviceAuthorizationResponse.errorOAuthDeviceAuthorizationResponse.error_descriptionOAuthDeviceAuthorizationResponse.expires_inOAuthDeviceAuthorizationResponse.intervalOAuthDeviceAuthorizationResponse.user_codeOAuthDeviceAuthorizationResponse.verification_uriOAuthDeviceAuthorizationResponse.verification_uri_complete
OAuthDeviceResponseOAuthFlowsOAuthFlows.AllOAuthFlows.AuthorizationCodeOAuthFlows.AuthorizationCodeWithPKCEOAuthFlows.ClientCredentialsOAuthFlows.DeviceCodeOAuthFlows.OidcAuthorizationCodeOAuthFlows.PasswordOAuthFlows.PasswordMfaOAuthFlows.RefreshTokenOAuthFlows.all_flows()OAuthFlows.are_valid_flows()OAuthFlows.flow_nameOAuthFlows.flow_names()OAuthFlows.grant_type()OAuthFlows.is_valid_flow()
OAuthMfaAuthenticatorOAuthMfaAuthenticatorsOrTokenResponseOAuthMfaAuthenticatorsOrTokenResponse.access_tokenOAuthMfaAuthenticatorsOrTokenResponse.authenticatorsOAuthMfaAuthenticatorsOrTokenResponse.binding_methodOAuthMfaAuthenticatorsOrTokenResponse.challenge_typeOAuthMfaAuthenticatorsOrTokenResponse.errorOAuthMfaAuthenticatorsOrTokenResponse.error_descriptionOAuthMfaAuthenticatorsOrTokenResponse.expires_inOAuthMfaAuthenticatorsOrTokenResponse.id_payloadOAuthMfaAuthenticatorsOrTokenResponse.id_tokenOAuthMfaAuthenticatorsOrTokenResponse.mfa_tokenOAuthMfaAuthenticatorsOrTokenResponse.nameOAuthMfaAuthenticatorsOrTokenResponse.oob_channelOAuthMfaAuthenticatorsOrTokenResponse.oob_codeOAuthMfaAuthenticatorsOrTokenResponse.refresh_tokenOAuthMfaAuthenticatorsOrTokenResponse.scopeOAuthMfaAuthenticatorsOrTokenResponse.token_type
OAuthMfaAuthenticatorsResponseOAuthMfaChallengeResponseOAuthTokenResponseOAuthTokenResponse.access_tokenOAuthTokenResponse.binding_methodOAuthTokenResponse.challenge_typeOAuthTokenResponse.errorOAuthTokenResponse.error_descriptionOAuthTokenResponse.expires_inOAuthTokenResponse.id_payloadOAuthTokenResponse.id_tokenOAuthTokenResponse.mfa_tokenOAuthTokenResponse.nameOAuthTokenResponse.oob_channelOAuthTokenResponse.oob_codeOAuthTokenResponse.refresh_tokenOAuthTokenResponse.scopeOAuthTokenResponse.token_type
- crossauth_backend.oauth.clientmanager module
- crossauth_backend.oauth.resserver module
- crossauth_backend.oauth.tokenconsumer module
KeysOAuthTokenConsumerOAuthTokenConsumer.audienceOAuthTokenConsumer.auth_server_base_urlOAuthTokenConsumer.decode_header()OAuthTokenConsumer.hash()OAuthTokenConsumer.keysOAuthTokenConsumer.load_config()OAuthTokenConsumer.load_jwks()OAuthTokenConsumer.load_keys()OAuthTokenConsumer.oidc_configOAuthTokenConsumer.token_authorized()
OAuthTokenConsumerOptionsOAuthTokenConsumerOptions.audienceOAuthTokenConsumerOptions.auth_server_base_urlOAuthTokenConsumerOptions.clock_toleranceOAuthTokenConsumerOptions.jwt_key_typeOAuthTokenConsumerOptions.jwt_public_keyOAuthTokenConsumerOptions.jwt_public_key_fileOAuthTokenConsumerOptions.jwt_secret_keyOAuthTokenConsumerOptions.jwt_secret_key_fileOAuthTokenConsumerOptions.key_storageOAuthTokenConsumerOptions.oidc_configOAuthTokenConsumerOptions.persist_access_token
- crossauth_backend.oauth.wellknown module
AuthorizeQueryTypeDEFAULT_OIDCCONFIGJwksOpenIdConfigurationOpenIdConfiguration.acr_values_supportedOpenIdConfiguration.authorization_endpointOpenIdConfiguration.check_session_iframeOpenIdConfiguration.claim_types_supportedOpenIdConfiguration.claims_locales_supportedOpenIdConfiguration.claims_parameter_supportedOpenIdConfiguration.claims_supportedOpenIdConfiguration.display_values_supportedOpenIdConfiguration.end_session_endpointOpenIdConfiguration.grant_types_supportedOpenIdConfiguration.id_token_encryption_alg_values_supportedOpenIdConfiguration.id_token_encryption_enc_values_supportedOpenIdConfiguration.id_token_signing_alg_values_supportedOpenIdConfiguration.issuerOpenIdConfiguration.jwks_uriOpenIdConfiguration.op_policy_uriOpenIdConfiguration.op_tos_uriOpenIdConfiguration.registration_endpointOpenIdConfiguration.request_object_encryption_alg_values_supportedOpenIdConfiguration.request_object_encryption_enc_values_supportedOpenIdConfiguration.request_object_signing_alg_values_supportedOpenIdConfiguration.request_parameter_supportedOpenIdConfiguration.request_uri_parameter_supportedOpenIdConfiguration.require_request_uri_registrationOpenIdConfiguration.response_modes_supportedOpenIdConfiguration.response_types_supportedOpenIdConfiguration.scopes_supportedOpenIdConfiguration.service_documentationOpenIdConfiguration.subject_types_supportedOpenIdConfiguration.token_endpointOpenIdConfiguration.token_endpoint_auth_methods_supportedOpenIdConfiguration.token_endpoint_auth_signing_alg_values_supportedOpenIdConfiguration.ui_locales_supportedOpenIdConfiguration.userinfo_encryption_alg_values_supportedOpenIdConfiguration.userinfo_encryption_enc_values_supportedOpenIdConfiguration.userinfo_endpointOpenIdConfiguration.userinfo_signing_alg_values_supported
TokenBodyTypeTokenBodyType.binding_codeTokenBodyType.client_idTokenBodyType.client_secretTokenBodyType.codeTokenBodyType.code_verifierTokenBodyType.device_codeTokenBodyType.grant_typeTokenBodyType.mfa_tokenTokenBodyType.oobCodeTokenBodyType.otpTokenBodyType.passwordTokenBodyType.refresh_tokenTokenBodyType.scopeTokenBodyType.username
- Module contents
- crossauth_backend.storageimpl package
- Submodules
- crossauth_backend.storageimpl.inmemorystorage module
InMemoryKeyStorageInMemoryKeyStorage.delete_all_for_user()InMemoryKeyStorage.delete_data()InMemoryKeyStorage.delete_key()InMemoryKeyStorage.delete_matching()InMemoryKeyStorage.get_all_for_user()InMemoryKeyStorage.get_key()InMemoryKeyStorage.print()InMemoryKeyStorage.save_key()InMemoryKeyStorage.update_data()InMemoryKeyStorage.update_key()InMemoryKeyStorage.update_many_data()
InMemoryOAuthAuthorizationStorageInMemoryOAuthClientStorageInMemoryUserStorageInMemoryUserStorage.create_user()InMemoryUserStorage.delete_user_by_id()InMemoryUserStorage.delete_user_by_username()InMemoryUserStorage.get_user_by()InMemoryUserStorage.get_user_by_email()InMemoryUserStorage.get_user_by_id()InMemoryUserStorage.get_user_by_username()InMemoryUserStorage.get_users()InMemoryUserStorage.update_user()
InMemoryUserStorageOptions
- crossauth_backend.storageimpl.ldapstorage module
LdapUserStorageLdapUserStorage.create_user()LdapUserStorage.delete_user_by_id()LdapUserStorage.delete_user_by_username()LdapUserStorage.get_ldap_user()LdapUserStorage.get_user_by()LdapUserStorage.get_user_by_email()LdapUserStorage.get_user_by_id()LdapUserStorage.get_user_by_username()LdapUserStorage.get_users()LdapUserStorage.ldap_bind()LdapUserStorage.require_user_entry()LdapUserStorage.search_user()LdapUserStorage.update_user()
LdapUserStorageOptionsLdapUserStorageOptions.admin_editable_fieldsLdapUserStorageOptions.create_user_fnLdapUserStorageOptions.ldap_urlsLdapUserStorageOptions.ldap_user_search_baseLdapUserStorageOptions.ldap_username_attributeLdapUserStorageOptions.normalize_emailLdapUserStorageOptions.normalize_usernameLdapUserStorageOptions.user_editable_fields
LdapUserWithStatedefault_create_user_dn()
- crossauth_backend.storageimpl.sqlalchemystorage module
SqlAlchemyKeyStorageSqlAlchemyKeyStorage.delete_all_for_user()SqlAlchemyKeyStorage.delete_data()SqlAlchemyKeyStorage.delete_key()SqlAlchemyKeyStorage.delete_matching()SqlAlchemyKeyStorage.delete_with_prefix()SqlAlchemyKeyStorage.get_all_for_user()SqlAlchemyKeyStorage.get_key()SqlAlchemyKeyStorage.get_key_in_transaction()SqlAlchemyKeyStorage.save_key()SqlAlchemyKeyStorage.to_dict()SqlAlchemyKeyStorage.update_data()SqlAlchemyKeyStorage.update_key()SqlAlchemyKeyStorage.update_key_in_transaction()SqlAlchemyKeyStorage.update_many_data()
SqlAlchemyKeyStorageOptionsSqlAlchemyOAuthAuthorizationStorageSqlAlchemyOAuthAuthorizationStorageOptionsSqlAlchemyOAuthClientStorageSqlAlchemyOAuthClientStorage.create_client()SqlAlchemyOAuthClientStorage.delete_client()SqlAlchemyOAuthClientStorage.get_client_by_id()SqlAlchemyOAuthClientStorage.get_client_by_name()SqlAlchemyOAuthClientStorage.get_client_in_transaction()SqlAlchemyOAuthClientStorage.get_clients()SqlAlchemyOAuthClientStorage.update_client()
SqlAlchemyOAuthClientStorageOptionsSqlAlchemyUserStorageSqlAlchemyUserStorage.create_user()SqlAlchemyUserStorage.delete_user_by_id()SqlAlchemyUserStorage.delete_user_by_username()SqlAlchemyUserStorage.get_user_by()SqlAlchemyUserStorage.get_user_by_email()SqlAlchemyUserStorage.get_user_by_id()SqlAlchemyUserStorage.get_user_by_in_transaction()SqlAlchemyUserStorage.get_user_by_username()SqlAlchemyUserStorage.get_users()SqlAlchemyUserStorage.to_dict()SqlAlchemyUserStorage.update_user()
SqlAlchemyUserStorageOptionsSqlAlchemyUserStorageOptions.admin_editable_fieldsSqlAlchemyUserStorageOptions.force_id_to_numberSqlAlchemyUserStorageOptions.id_columnSqlAlchemyUserStorageOptions.joinsSqlAlchemyUserStorageOptions.normalize_emailSqlAlchemyUserStorageOptions.normalize_usernameSqlAlchemyUserStorageOptions.user_editable_fieldsSqlAlchemyUserStorageOptions.user_secrets_tableSqlAlchemyUserStorageOptions.user_tableSqlAlchemyUserStorageOptions.userid_foreign_key_column
adapt_date_iso_int()adapt_date_iso_real()adapt_datetime_epoch_int()adapt_datetime_epoch_real()adapt_datetime_iso_int()adapt_datetime_iso_real()convert_date_int()convert_date_real()convert_datetime_int()convert_datetime_real()convert_timestamp_int()convert_timestamp_real()register_sqlite_datetime()
- Module contents
Submodules¶
crossauth_backend.apikey module¶
- class crossauth_backend.apikey.ApiKeyManager(key_storage: KeyStorage, options: ApiKeyManagerOptions = {})[source]¶
Bases:
objectManager API keys.
The caller must pass a {@link KeyStorage} object. This must provide a string field called name in the returned {@link @crossauth/common!Key} objects (in other words, the databsae table behind it must have a name field).
Api keys have three forms in their value. The {@link @crossauth/common!Key} object’s value field is a base64-url-encoded random number. When the key is in a header, it is expected to be folled by a dot and a signature to protect against injection attacks. When stored in the key storage, only the unsigned part is used (before the dot), it is hashed and preceded by prefix. The signature part is dropped for storage economy. This does not compromise security so long as the signature is always validated before comparing with the database.
- async create_key(name: str, userid: str | int | None = None, data: Dict[str, Any] | None = None, expiry: int | None = None, extra_fields: Dict[str, Any] = {}) KeyReturn[source]¶
Creates a new random key and returns it, unsigned. It is also persisted in the key storage as a hash of the unsigned part prefixed with prefix().
- Parameters:
name –
- a name for the key. This is for the user to refer to it
(eg, for showing the keys the user has created or deleting a key)
- userid: id for the user who owns this key, which may be None
for keys not associated with a user
data – any application-specific extra data. If it contains an array called scope and this array contains editUser, the api key can be used for user manipulation functions (eg change password)
expiry – expiry as a number of seconds from now
extra_fields – any extra fields to save in key storage, and pass back in the Key object.
- Returns:
key: the new key as an ApiKey object
token: the token for the Authorization header (with the signature appended.)
- Return type:
Dictionary containing
- async get_key(signed_value: str) NamedKey[source]¶
Get API key from signed value.
- Parameters:
signedValue – The signed API key value
:return Dict containing the API key data
- Raises:
CrossauthError – If the key is invalid
- static hashSignedApiKeyValue(unsigned_value: str) str[source]¶
Returns the hash of the bearer value from the Authorization header.
This has little practical value other than for reporting. Unhashed tokens are never reported. @param unsignedValue the part of the Authorization header after “Berear “. @returns a hash of the value (without the prefix).
- async validate_token(header_value: str) ApiKey[source]¶
Returns the ApiKey if the token is valid, throws an exception otherwise.
- Parameters:
headerValue – the token from the Authorization header (after the “Bearer “).
:return The ApiKey object
- Raises:
CrossauthError – with code InvalidKey
- class crossauth_backend.apikey.ApiKeyManagerOptions[source]¶
Bases:
TypedDictConfiguration options for TokenEmailer
- auth_scheme: str¶
The token type in the Authorization header. Defaults to “ApiKey”
- key_length: int¶
Length in bytes of the randomly-created key (before Base64 encoding and signature)
- prefix: str¶
Prefix.api_key
- Type:
The prefix to add to the hashed key in storage. Defaults to
- Type:
class
- secret: str¶
Server secret. Needed for emailing tokens and for csrf tokens
crossauth_backend.auth module¶
- class crossauth_backend.auth.AuthenticationOptions[source]¶
Bases:
TypedDictOptions to pass to the constructor.
- friendly_name: str¶
If passed, this is what will be displayed to the user when selecting an authentication method.
- class crossauth_backend.auth.AuthenticationParameters[source]¶
Bases:
UserSecretsInputFieldsParameters needed for this this class to authenticator a user (besides username) An example is password
- expiry: int¶
- otp: str¶
- password: str¶
- totpsecret: str¶
- class crossauth_backend.auth.Authenticator(options: AuthenticationOptions = {})[source]¶
Bases:
ABCBase class for username/password authentication.
Subclass this if you want something other than PBKDF2 password hashing.
- abstractmethod async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
- capabilities() AuthenticatorCapabilities[source]¶
- abstractmethod async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
- abstractmethod async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
- factor_name: str = ''¶
- friendly_name: str¶
- abstractmethod async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
- abstractmethod async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
- require_user_entry() bool[source]¶
If your authenticator doesn’t need a user to be in the table (because it can create one), override this and return false. Default is true
- abstractmethod validate_secrets(params: AuthenticationParameters) List[str][source]¶
- class crossauth_backend.auth.AuthenticatorCapabilities[source]¶
Bases:
TypedDict- can_create_user: bool¶
- can_update_secrets: bool¶
- can_update_user: bool¶
- class crossauth_backend.auth.PasswordAuthenticator(options: AuthenticationOptions = {})[source]¶
Bases:
Authenticatorbase class for authenticators that validate passwords
crossauth_backend.crypto module¶
- class crossauth_backend.crypto.Crypto[source]¶
Bases:
objectProvides cryptographic functions
- Base32 = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789'¶
- static base64_decode(encoded: str) str[source]¶
Decodes a string from base64 to UTF-8 :param str encoded: base64-encoded text
- Returns:
URF-8 text
- static base64_encode(text: str) str[source]¶
Base64-encodes UTF-8 text :param str: text UTF-8 text
- Returns:
Base64 text
- static decode_password_hash(hash: str) PasswordHash[source]¶
Splits a hashed password into its component parts. Return it as a
PasswordHash.The format of the hash should be
` digest:int key_len:iterations:useSecret:salt:hashedPassword `The hashed password part is the Base64 encoding of the PBKDF2 password. :param str hash: the hassed password to decode. See above for format- Returns:
PasswordHashobject containing the deecoded hash components
- static encode_password_hash(hashed_password: str, salt: str, use_secret: bool, iterations: int, key_len: int, digest: str) str[source]¶
Encodes a hashed password into the string format it is stored as.
See
decodePasswordHash()for the format it is stored in.- Parameters:
hashed_password (str) – the Base64-encoded PBKDF2 hash of the password
salt (str) – the salt used for the password.
use_secret (bool) – whether or not to use the application secret as part of the hash.
iterations (int) – the number of PBKDF2 iterations
key_len (int) – the key length PBKDF2 parameter - results in a hashed password this length, before Base64,
digest (str) – The digest algorithm, eg pbkdf2
- Returns:
a string encode the above parameters.
- static hash(plaintext: str) str[source]¶
Standard hash using SHA256 (not PBKDF2 or HMAC)
- Parameters:
plaintext (:str) – text to hash
- Returns:
the string containing the hash
- async static password_hash(plaintext: str, options: HashOptions = {}) str[source]¶
Hashes a password and returns it as a base64 or base64url encoded string :param str plaintext: password to hash :param HashOptions options:
salt: salt to use. Make a random one if not passed
secret: optional application secret password to apply as a pepper
encode: if true, returns the full string as it should be stored in the database.
- Returns:
the string containing the hash and the values to decode it
- async static passwords_equal(plaintext: str, encoded_hash: str, secret: str | None = None) bool[source]¶
Returns true if the plaintext password, when hashed, equals the one in the hash, using it’s hasher settings :param str plaintext: the plaintext password :param str encoded_hash: the previously-hashed version :param str|None secret: if useHash`in `encodedHash is true, uses as a pepper for the hasher
- Returns:
true if they are equal, false otherwise
- static random_base32(length: int, dash_every: int | None = None) str[source]¶
Creates a random base-23 string :param int length: length of the string to create
- Returns:
the random value as a string. Number of bytes will be greater as it is base64 encoded.
- static random_salt() str[source]¶
Creates a random salt
- Returns:
random salt as a base64 encoded string
- static random_value(length: int) str[source]¶
Creates a random string encoded as in base64url :param int length: length of the string to create
- Returns:
the random value as a string. Number of bytes will be greater as it is base64 encoded.
- static sha256(plaintext: str) str[source]¶
Standard hash using SHA256 (not PBKDF2 or HMAC)
- Parameters:
plaintext (str) – text to hash
- Returns:
the string containing the hash
- static sign(payload: Dict[str, Any], secret: str, salt: str | None = None, timestamp: int | None = None) str[source]¶
Signs a JSON payload by creating a hash, using a secret and optionally also a salt and timestamp
- Parameters:
payload (Dict[str, Any]) – object to sign (will be stringified as a JSON)
secret (str) – secret key, which must be a string
salt (str|None) – optionally, a salt to concatenate with the payload (must be a string)
- Paramint|None timestamp:
optionally, a timestamp to include in the signed date as a Unix date
- Returns:
Base64-url encoded hash
- static sign_secure_token(payload: str, secret: str) str[source]¶
This can be called for a string payload that is a cryptographically secure random string. No salt is added and the token is assumed to be Base64Url already
- Parameters:
payload (str) – string to sign
secret (str) – the secret to sign with
- Returns:
Base64-url encoded hash
- static signable_token(payload: Dict[str, Any], salt: str | None = None, timestamp: int | None = None) str[source]¶
hash is of a JSON containing the payload, timestamp and optionally a salt. :param Dict[str, Any]: payload the payload to hash :param str|None salt: optional salt (use if the payload is small) :param int|None timestamp: time the token will expire
- Returns:
a Base64-URL-encoded string that can be hashed.
- static symmetric_decrypt(ciphertext: str, key_string: str) str[source]¶
Symmetric decryption using a key that must be a string
- Parameters:
ciphertext (str) – Base64-url encoded ciphertext
key_string (str) – the symmetric key
- Returns:
Decrypted text
- static symmetric_encrypt(plaintext: str, key_string: str, iv: bytes | None = None) str[source]¶
Symmetric encryption using a key that must be a string
- Parameters:
plaintext (str) – Text to encrypt
key_string (str) – the symmetric key
bytes|None (iv) – the iv value. In None, a random one is created
- Returns:
Encrypted text Base64-url encoded.
- static unsign(signed_message: str, secret: str, expiry: int | None = None) Dict[str, Any][source]¶
Validates a signature and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string :param int|None expiry: if set, validation will fail if the timestamp in the payload is after this date
- Returns:
if signature is valid, the payload as an object
- :raises
crossauth_backend.CrossauthError: with ErrorCodeof InvalidKey if signature is invalid or has expired.
- static unsign_secure_token(signed_message: str, secret: str, expiry: int | None = None) str[source]¶
Validates a signature signed with signSecureToken and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string
- Returns:
if signature is valid, the payload as a string
- :raises
crossauth_backend.CrossauthError: with {
ErrorCodeof InvalidKey if signature is invalid or has expired.
- class crossauth_backend.crypto.HashOptions[source]¶
Bases:
TypedDictOption parameters for
Crypto.passwordHash- digest: str¶
PBKDF2 digest method
- encode: bool¶
Whether to Base64-URL-encode the result
- iterations: int¶
Number of PBKDF2 iterations
- key_len: int¶
Length (before Base64-encoding) of the PBKDF2 key being generated
- salt: str¶
A salt to prepend to the message before hashing
- secret: str¶
A secret to append to the salt when hashing, or undefined for no secret
- class crossauth_backend.crypto.PasswordHash[source]¶
Bases:
TypedDictAn object that contains all components of a hashed password. Hashing is done with PBKDF2
- digest: str¶
The digest algorithm to use, eg sha512
- hashed_password: str¶
The actual hashed password in Base64 format
- iterations: int¶
Number of iterations for PBKDF2
- key_len: int¶
The key length parameter passed to PBKDF2 - hash will be this number of characters long
- salt: str¶
The random salt used to create the hashed password
- use_secret: bool¶
If true, secret (application secret) is also used to hash the password
crossauth_backend.emailtoken module¶
- class crossauth_backend.emailtoken.TokenEmailer(user_storage: UserStorage, key_storage: KeyStorage, options: TokenEmailerOptions = {})[source]¶
Bases:
objectSends password reset and email verification tokens to an email address
- async create_and_save_email_verification_token(userid: str | int, new_email: str = '') str[source]¶
Create and save email verification token
- async create_and_save_password_reset_token(userid: str | int) str[source]¶
Create and save password reset token
- static hash_email_verification_token(token: str) str[source]¶
Produces a hash of the given email verification token with the correct prefix for inserting into storage.
- static hash_password_reset_token(token: str) str[source]¶
Produces a hash of the given password reset token with the correct prefix for inserting into storage.
- static is_email_valid(email: str) bool[source]¶
Returns true if the given email has a valid format, false otherwise.
- Parameters:
email – the email to validate
- Returns:
true or false
- async send_email_verification_token(userid: str | int, new_email: str = '', extra_data: Dict[str, Any] = {}) None[source]¶
Send an email verification email using the Jinja2 templates.
The email address to send it to will be taken from the user’s record in user storage. It will first be validated, throwing a CrossauthError with ErrorCode of InvalidEmail if it is not valid.
- Parameters:
userid – userid to send it for
new_email – if this is a token to verify email for account activation, leave this empty. If it is for changing an email, this will be the field it is being changed do.
extra_data – these extra variables will be passed to the Jinja2 templates
- async send_password_reset_token(userid: int | str, extra_data: Dict[str, Any] = {}, as_admin: bool = False) None[source]¶
Send a password reset token email using the Jinja2 templates
- Parameters:
userid – userid to send it for
extra_data – these extra variables will be passed to the Jinja2 templates
as_admin – whether this is being sent by an admin
- static validate_email(email: str | None) None[source]¶
Returns if the given email has a valid format. Throws a CrossauthError with ErrorCode InvalidEmail otherwise.
- Parameters:
email – the email to validate
- async verify_email_verification_token(token: str) Dict[str, str | int][source]¶
Validates an email verification token.
- The following must match:
expiry date in the key storage record must be less than current time
userid in the token must match the userid in the key storage
email address in user storage must match the email in the key. If there is no email address, the username field is set if it is in email format.
expiry time in the key storage must match the expiry time in the key
Looks the token up in key storage and verifies it matches and has not expired.
- Parameters:
token – the token to validate
- Retu:returns:
the userid of the user the token is for and the email address the user is validating
- async verify_password_reset_token(token: str) User[source]¶
Validates a password reset token
- The following must match:
expiry date in the key storage record must be less than current time
userid in the token must match the userid in the key storage
the email in the token matches either the email or username field in user storage
the password in user storage must match the password in the key
expiry time in the key storage must match the expiry time in the key
Looks the token up in key storage and verifies it matches and has not expired. Also verifies the user exists and password has not changed in the meantime.
- Parameters:
token – the token to validate
- :return
the user that the token is for
- class crossauth_backend.emailtoken.TokenEmailerOptions[source]¶
Bases:
TypedDictConfiguration options for TokenEmailer
- email_from: NotRequired[str]¶
Sender for emails
- email_verification_html_body: NotRequired[str]¶
Template file containing page for producing the HTML version of the email verification email body
- email_verification_subject: NotRequired[str]¶
Subject for the the email verification email
- email_verification_text_body: NotRequired[str]¶
Template file containing page for producing the text version of the email verification email body
- password_reset_expires: NotRequired[int]¶
Number of seconds befire password reset tokens should expire. Default 1 day
- password_reset_html_body: NotRequired[str]¶
Template file containing page for producing the HTML version of the password reset email body
- password_reset_subject: NotRequired[str]¶
Subject for the the password reset email
- password_reset_text_body: NotRequired[str]¶
Template file containing page for producing the text version of the password reset email body
- prefix: NotRequired[str]¶
The prefix between the site url and the email verification/password reset link. Default “/”
- render: NotRequired[Callable[[str, Dict[str, Any]], str]]¶
if passed, use this instead of the default nunjucks renderer
- site_url: NotRequired[str]¶
3000”. No default - required parameter
- Type:
The site url, used to create a link, eg “https
- Type:
//mysite.com
- smtp_host: NotRequired[str]¶
Hostname of the SMTP server. No default - required parameter
- smtp_password: NotRequired[str]¶
Password for connecting to SMTP servger. Default undefined
- smtp_port: NotRequired[int]¶
Port the SMTP server is running on. Default 25
- smtp_use_tls: NotRequired[bool]¶
Whether or not TLS is used by the SMTP server. Default false
- smtp_username: NotRequired[str]¶
Username for connecting to SMTP servger. Default undefined
- verify_email_expires: NotRequired[int]¶
Number of seconds befire email verification tokens should expire. Default 1 day
- views: NotRequired[str]¶
The directory containing views (by default, Nunjucks templates)
crossauth_backend.session module¶
- class crossauth_backend.session.Csrf(csrf_cookie, csrf_form_or_header_value)[source]¶
Bases:
NamedTuple- csrf_form_or_header_value: str¶
Alias for field number 1
- class crossauth_backend.session.SessionManager(key_storage: KeyStorage, authenticators: Mapping[str, Authenticator], options: SessionManagerOptions = {})[source]¶
Bases:
objectClass for managing sessions.
- property allowed_factor2¶
- async apply_email_verification_token(token: str) User[source]¶
Takes an email verification token as input and applies it to the user storage.
The state is reset to active. If the token was for changing the password, the new password is saved to the user in user storage.
- Parameters:
token – the token to apply
- Returns:
the new user record
- property authenticators¶
- async cancel_two_factor_page_visit(session_id: str) Dict[str, Any][source]¶
Cancels the 2FA that was previously initiated but not completed..
If successful, returns. Otherwise an exception is thrown.
- Parameters:
session_id – the session id (unsigned)
- :return
Dict[str, Any]: the 2FA data that was created on initiation
- :raise
CrossauthError: of Unauthorized if 2FA was not initiated.
- async change_secrets(username: str, factor_number: int, new_params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None, old_params: AuthenticationParameters | None = None) User[source]¶
- async complete_two_factor_login(params: AuthenticationParameters, session_id: str, extra_fields: Mapping[str, Any] = {}, persist: bool = False) SessionTokens[source]¶
Performs the second factor authentication as the second step of the login process
If authentication is successful, the user’s state will be set to active and a new session will be created, bound to the user. The anonymous session will be deleted.
- Parameters:
params – the user-provided parameters to authenticate with (eg TOTP code).
session_id – the user’s anonymous session
extra_fields – extra fields to add to the user-bound new session table entry
persist – if true, the cookie will be perstisted (with an expiry value); otberwise it will be a session-only cookie.
- Return AuthResult containing:
session_cookie: the new session cookie csrf_cookie: the new CSRF cookie csrf_form_or_header_value: the new CSRF token corresponding to the cookie user: the newly-logged in user.
- async complete_two_factor_page_visit(params: AuthenticationParameters, session_id: str)[source]¶
Completes 2FA when visiting a protected page.
If successful, returns. Otherwise an exception is thrown.
- Parameters:
params – the parameters from user input needed to authenticate (eg TOTP code). Passed to the authenticator
session_id – the session cookie value (ie still signed)
- :raise
CrossauthError: if authentication fails.
- async complete_two_factor_setup(params: AuthenticationParameters, session_id: str) User[source]¶
Authenticates with the second factor.
If successful, the new user object is returned. Otherwise an exception is thrown, :param params the parameters from user input needed to authenticate (eg TOTP code) :param session_id the session cookie value (ie still signed) :return the user object :raise CrossauthError if authentication fails.
- async create_anonymous_session(extra_fields: Mapping[str, Any] | None = None) SessionTokens[source]¶
- async create_csrf_form_or_header_value(csrf_cookie_value: str)[source]¶
Validates the signature on the CSRF cookie value and returns a value that can be put in the form or CSRF header value.
- Parameters:
csrf_cookie_value (str) – the value from the CSRF cookie
- Returns:
the value to put in the form or CSRF header
- async create_csrf_token() Csrf[source]¶
Creates and returns a signed CSRF token based on the session ID
- Returns:
a CSRF cookie and value to put in the form or CSRF header
- async create_user(user: UserInputFields, params: UserSecrets, repeat_params: UserSecrets | None = None, skip_email_verification: bool = False, empty_password: bool = False) User[source]¶
- property csrf_cookie_name¶
Returns the name used for CSRF token cookies.
- property csrf_cookie_path¶
Returns the name used for CSRF token cookies.
- property csrf_header_name¶
Returns the name used for CSRF token cookies
- property csrf_tokens¶
- async data_for_session_id(session_id: str) Dict[str, Any] | None[source]¶
Returns the data object for a session id, or undefined, as an object.
If the user is undefined, or the key has expired, returns undefined.
- Parameters:
session_id (str) – the session key to look up in session storage
- Returns:
a string from the data field
- :raise
crossauth_backend.CrossauthError: with ErrorCodeof Connection, InvalidSessionId UserNotExist or Expired.
- async data_string_for_session_id(session_id: str) str | None[source]¶
Returns the data object for a session key, or undefined, as a JSON string (which is how it is stored in the session table)
If the user is undefined, or the key has expired, returns undefined.
- Parameters:
session_id (str) – the session id to look up in session storage
- Returns:
a string from the data field
- :raise
crossauth_backend.CrossauthError: with ErrorCodeof Connection, InvalidSessionId UserNotExist or Expired.
- async delete_session(session_id: str) None[source]¶
Deletes the given session ID from the key storage (not the cookie)
- Parameters:
session_id (str) – the session Id to delete
- async delete_session_data(session_id: str, name: str) None[source]¶
Deletes a field from the session data.
The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id; the session Id to update.
- property email_token_storage¶
- get_session_id(session_cookie_value: str)[source]¶
Returns the session ID from the signed session cookie value
- Parameters:
session_cookie_value (str) – value from the session ID cookie
- Returns:
the usigned cookie value.
- :raises
crossauth_backend.CrossauthErrorwith InvalidKey if the signature is invalid.
- async initiate_two_factor_login(user: User) SessionTokens[source]¶
Initiates the two factor login process.
Creates an anonymous session and corresponding CSRF token
- Parameters:
user – the user, which should already have been authenticated with factor1
- Returns:
a new anonymous session cookie and corresponding CSRF cookie and token.
- async initiate_two_factor_page_visit(user: User, session_id: str, request_body: Mapping[str, Any], url: str | None = None, content_type: str | None = None) SessionTokens[source]¶
Initiates the two factor process when visiting a protected page.
Creates an anonymous session and corresponding CSRF token
- Parameters:
user – the user, which should already have been authenticated with factor1
session_id – the logged in session associated with the user
request_body – the parameters from the request made before being redirected to factor2 authentication
url – the requested url, including path and query parameters content_type: optional content type from the request
- :return
If a token was passed a new anonymous session cookie and corresponding CSRF cookie and token.
- async initiate_two_factor_setup(user: User, new_factor2: str | None, session_id: str) Mapping[str, Any][source]¶
Begins the process of setting up 2FA for a user which has already been created and activated. Called when changing 2FA or changing its parameters.
- Parameters:
user – the logged in user
new_factor2 – new second factor to change user to
session_id – the session cookie for the user
- :return
the 2FA data that can be displayed to the user in the configure 2FA step (such as the secret and QR code for TOTP).
- async initiate_two_factor_signup(user: UserInputFields, params: UserSecrets, session_id: str, repeat_params: UserSecrets | None) UserIdAndData[source]¶
Creates a user with 2FA enabled.
The user storage entry will be createed, with the state set to awaitingtwofactorsetup or awaitingtwofactorsetupandemailverification. The passed session key will be updated to include the username and details needed by 2FA during the configure step.
- Parameters:
user – details to save in the user table
params – params the parameters needed to authenticate with factor1 (eg password)
session_id – the anonymous session cookie
repeat_params – if passed, these will be compared with params and if they don’t match, PasswordMatch is thrown.
- Returns:
- Dict containing:
userid: the id of the created user. userData: data that can be displayed to the user in the page to
complete 2FA set up (eg the secret key and QR codee for TOTP),
- property key_storage¶
- async login(username: str, params: AuthenticationParameters, extra_fields: Mapping[str, Any] = {}, persist: bool = False, user: User | None = None, bypass_2fa: bool = False) SessionTokens[source]¶
Performs a user login
Authenticates the username and password
Creates a session key - if 2FA is enabled, this is an anonymous session, otherwise it is bound to the user
Returns the user (without the password hash) and the session cookie.
If the user object is defined, authentication (and 2FA) is bypassed
- Parameters:
username – the username to validate
params – user-provided credentials (eg password) to authenticate with
extra_fields – add these extra fields to the session key if authentication is successful
persist – if passed, overrides the persistSessionId setting.
user –
- if this is defined, the username and password are ignored and the given user is logged in.
The 2FA step is also skipped
bypass2FA: if true, the 2FA step will be skipped
- :return
Dict containing the user, user secrets, and session cookie and CSRF cookie and token. if a 2fa step is needed, it will be an anonymouos session, otherwise bound to the user
- Raise:
- CrossauthError: with ErrorCode of Connection, UserNotValid,
PasswordNotMatch or UserNotExist.
- async repeat_two_factor_signup(session_id: str) UserIdAndData[source]¶
This can be called if the user has finished signing up with factor1 but closed the browser before completing factor2 setup. Call it if the user signs up again with the same factor1 credentials.
- Parameters:
session_id – the anonymous session ID for the user
- Returns:
- Dict containing:
userid: the id of the created user userData: data that can be displayed to the user in the page to
complete 2FA set up (eg the secret key and QR code for TOTP),
- secrets: data that is saved in the session for factor2. In the
case of TOTP, both userData and secrets contain the shared secret but only userData has the QR code, since it can be generated from the shared secret.
- async request_password_reset(email: str) None[source]¶
Sends a password reset token :param email: the user’s email (where the token will be sent)
- async reset_secret(token: str, factror_number: int, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) User[source]¶
Resets the secret for factor1 or 2 (eg reset password)
- Parameters:
token – the reset password token that was emailed
factror_number – which factor to reset (1 or 2)
params – the new secrets entered by the user (eg new password)
repeat_params – optionally, repeat of the secrets. If passed, an exception will be thrown if they do not match
:return the user object
- Raises:
CrossauthError – if the repeat_params don’t match params, the token is invalid or the user storage cannot be updated.
- property session¶
- property session_cookie_name¶
Returns the name used for session ID cookies.
- property session_cookie_path¶
Returns the name used for session ID cookies
- async update_many_session_data(session_id: str, data_array: List[KeyDataEntry]) None[source]¶
Update field sin the session data.
The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param List[crossauth_backend.KeyDataEntry] data_array: names and values.
- async update_session_activity(session_id: str)[source]¶
If session_idle_timeout is set, update the last activcity time in key storage to current time.
- Parameters:
session_id (str) – the session Id to update.
- async update_session_data(session_id: str, name: str, value: Mapping[str, Any]) None[source]¶
Update a field in the session data.
The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param str name: of the field. :param Mapping[str, Any] value: new value to store
- async update_user(current_user: User, new_user: User, skip_email_verification: bool = False, as_admin: bool = False) TokensSent[source]¶
Updates a user entry in storage :param current_user the current user details :param new_user the new user details :return dict with emailVerificationTokenSent and passwordResetTokenSent booleans
- async user_for_password_reset_token(token: str) User[source]¶
Returns the user associated with a password reset token
- Parameters:
token – the token that was emailed
- Returns:
the user
- Raises:
CrossauthError – if the token is not valid.
- property user_storage¶
- validate_csrf_cookie(csrf_cookie_value: str)[source]¶
Throws
crossauth_backend.CrossauthErrorwith InvalidKey if the passed CSRF cookie value is not valid (ie invalid signature) :param str csrf_cookie_value: the CSRF cookie value
- validate_double_submit_csrf_token(csrf_cookie_value: str, csrf_form_or_header_value: str)[source]¶
Throws
crossauth_backend.CrossauthErrorwith InvalidKey if the passed CSRF token is not valid for the given session ID. Otherwise returns without error- Parameters:
strcsrf_cookie_value – the CSRF cookie value
csrf_form_or_header_value (str) – the value from the form field or CSRF header
- class crossauth_backend.session.SessionManagerOptions[source]¶
Bases:
TokenEmailerOptionsOptions for SessionManager
- allowed_factor2: List[str]¶
Set of 2FA factor names a user is allowed to set.
The name corresponds to the key you give when adding authenticators. See authenticators in SessionManager.constructor.
- double_submit_cookie_options: DoubleSubmitCsrfTokenOptions¶
Options for csrf cookie manager
- email_from: NotRequired[str]¶
- email_token_storage: KeyStorage¶
Store for password reset and email verification tokens. If not passed, the same store as for sessions is used.
- email_verification_html_body: NotRequired[str]¶
- email_verification_subject: NotRequired[str]¶
- email_verification_text_body: NotRequired[str]¶
- enable_email_verification: bool¶
If true, users will have to verify their email address before account is created or when changing their email address. See class description for details. Default True
- enable_password_reset: bool¶
If true, allow password reset by email token. See class description for details. Default True
- password_reset_expires: NotRequired[int]¶
- password_reset_html_body: NotRequired[str]¶
- password_reset_subject: NotRequired[str]¶
- password_reset_text_body: NotRequired[str]¶
- prefix: NotRequired[str]¶
- render: NotRequired[Callable[[str, Dict[str, Any]], str]]¶
- secret: str¶
Server secret. Needed for emailing tokens and for csrf tokens
- session_cookie_options: SessionCookieOptions¶
options for session cookie manager
- site_url: str¶
Base URL for the site.
This is used when constructing URLs, eg for sending password reset tokens.
- smtp_host: NotRequired[str]¶
- smtp_password: NotRequired[str]¶
- smtp_port: NotRequired[int]¶
- smtp_use_tls: NotRequired[bool]¶
- smtp_username: NotRequired[str]¶
- user_storage: UserStorage¶
If user login is enabled, you must provide the object where users are stored.
- verify_email_expires: NotRequired[int]¶
- views: NotRequired[str]¶
- class crossauth_backend.session.SessionTokens(session_cookie, csrf_cookie, csrf_form_or_header_value, user, secrets)[source]¶
Bases:
NamedTuple- csrf_form_or_header_value: str | None¶
Alias for field number 2
- secrets: UserSecrets | None¶
Alias for field number 4
crossauth_backend.sessionendpoints module¶
When not overriding which endpoints to enable, all of these will be.
- crossauth_backend.sessionendpoints.AllEndpointsMinusOAuth = ['signup', 'api/signup', 'login', 'logout', 'changepassword', 'updateuser', 'deleteuser', 'api/login', 'api/logout', 'api/changepassword', 'api/userforsessionkey', 'api/getcsrftoken', 'api/updateuser', 'api/deleteuser', 'admin/createuser', 'admin/changepassword', 'admin/selectuser', 'admin/updateuser', 'admin/changepassword', 'admin/deleteuser', 'admin/api/createuser', 'admin/api/changepassword', 'admin/api/updateuser', 'admin/api/changepassword', 'admin/api/deleteuser', 'verifyemail', 'emailverified', 'api/verifyemail', 'requestpasswordreset', 'resetpassword', 'api/requestpasswordreset', 'api/resetpassword', 'configurefactor2', 'loginfactor2', 'changefactor2', 'factor2', 'api/configurefactor2', 'api/loginfactor2', 'api/changefactor2', 'api/factor2', 'api/cancelfactor2']¶
These are all the endpoints
- crossauth_backend.sessionendpoints.EmailVerificationApiEndpoints = ['api/verifyemail']¶
Page endpoints that depend on password reset being enabled
If not overriding which endpoints to enable with endpoints, and if password reset is enabled, then this endpoints will be added.
- crossauth_backend.sessionendpoints.EmailVerificationPageEndpoints = ['verifyemail', 'emailverified']¶
API (JSON) endpoints that depend on email verification.
If not overriding which endpoints to enable with endpoints, and if email verification is enabled, then this endpoints will be added.
- crossauth_backend.sessionendpoints.Factor2ApiEndpoints = ['api/configurefactor2', 'api/loginfactor2', 'api/changefactor2', 'api/factor2', 'api/cancelfactor2']¶
Page endpoints that depend on email verification being enabled.
If not overriding which endpoints to enable with endpoints, and if email verification is enabled, then this endpoints will be added.
- crossauth_backend.sessionendpoints.Factor2PageEndpoints = ['configurefactor2', 'loginfactor2', 'changefactor2', 'factor2']¶
These are the endpoints created ig endpoints is set to allMinusOAuth
- crossauth_backend.sessionendpoints.PasswordResetApiEndpoints = ['api/requestpasswordreset', 'api/resetpassword']¶
Endpoints for signing a user up that display HTML
When not overriding which endpoints to enable, all of these will be.
- crossauth_backend.sessionendpoints.PasswordResetPageEndpoints = ['requestpasswordreset', 'resetpassword']¶
API (JSON) endpoints that depend on password reset being enabled
If not overriding which endpoints to enable with endpoints, and if password reset is enabled, then this endpoints will be added.
- crossauth_backend.sessionendpoints.SessionAdminApiEndpoints = ['admin/api/createuser', 'admin/api/changepassword', 'admin/api/updateuser', 'admin/api/changepassword', 'admin/api/deleteuser']¶
When not overriding which endpoints to enable, and with both enableAdminEndpoints and enableAdminClientManagement enabled, all these will be.
- crossauth_backend.sessionendpoints.SessionAdminClientApiEndpoints = ['admin/api/createclient', 'admin/api/deleteclient', 'admin/api/updateclient']¶
When not overriding which endpoints to enable, and with enableAdminClientManagement enabled, all these will be.
- crossauth_backend.sessionendpoints.SessionAdminClientPageEndpoints = ['admin/selectclient', 'admin/createclient', 'admin/deleteclient', 'admin/updateclient']¶
When not overriding which endpoints to enable, and with enableAdminClientManagement enabled, all these will be.
- crossauth_backend.sessionendpoints.SessionAdminPageEndpoints = ['admin/createuser', 'admin/changepassword', 'admin/selectuser', 'admin/updateuser', 'admin/changepassword', 'admin/deleteuser']¶
When not overriding which endpoints to enable, and with both enableAdminEndpoints and enableAdminClientManagement enabled, all these will be.
- crossauth_backend.sessionendpoints.SessionApiEndpoints = ['api/login', 'api/logout', 'api/changepassword', 'api/userforsessionkey', 'api/getcsrftoken', 'api/updateuser', 'api/deleteuser']¶
When not overriding which endpoints to enable, and with enableAdminEndpoints enabled, all of these will be.
- crossauth_backend.sessionendpoints.SessionClientApiEndpoints = ['api/deleteclient', 'api/updateclient', 'api/createclient']¶
API (JSON) endpoints that depend on 2FA being enabled.
If not overriding which endpoints to enable with endpoints, and if any 2FA factors are enabled, then this endpoints will be added.
- crossauth_backend.sessionendpoints.SessionClientPageEndpoints = ['selectclient', 'createclient', 'updateclient', 'deleteclient']¶
When not overriding which endpoints to enable, all of these will be.
- crossauth_backend.sessionendpoints.SessionPageEndpoints = ['login', 'logout', 'changepassword', 'updateuser', 'deleteuser']¶
When not overriding which endpoints to enable, and with enableAdminEndpoints enabled, all of these will be.
- crossauth_backend.sessionendpoints.SignupApiEndpoints = ['api/signup']¶
Endpoints for signing a user up that display HTML
- crossauth_backend.sessionendpoints.SignupPageEndpoints = ['signup']¶
API (JSON) endpoints for signing a user up that display HTML
When not overriding which endpoints to enable, all of these will be.
crossauth_backend.storage module¶
- class crossauth_backend.storage.KeyDataEntry[source]¶
Bases:
TypedDict- data_name: str¶
- value: NotRequired[Any]¶
- class crossauth_backend.storage.KeyStorage[source]¶
Bases:
ABCBase class for storing session and API keys.
This class is subclassed for various types of session key storage. For example, PrismaKeyStorage is for storing sessions in a database table, managed by the Prisma ORM.
- static decode_data(data: str | None) Dict[str, Any][source]¶
Returns an object decoded from the data field as a JSON string
- Parameters:
data (str|None) – The JSON string to decode
- Returns:
The parsed JSON object
- Raises:
json.JSONDecodeError – If data is not a valid JSON string
- abstractmethod async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]¶
Deletes all keys from storage for the given user ID
- Parameters:
userid (int|str|None) – User ID to delete keys for
prefix (str) – Only keys starting with this prefix will be deleted
except_key (str|None) – If defined, the key with this value will not be deleted
- abstractmethod async delete_data(key_name: str, data_name: str) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.
- abstractmethod async delete_key(value: str) None[source]¶
Deletes a key from storage (e.g., the database).
- Parameters:
value (str) – The key to delete
- abstractmethod async delete_matching(key: PartialKey) None[source]¶
Deletes all matching the given specs
- Parameters:
key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted
- static encode_data(data: Dict[str, Any] | None = None) str[source]¶
Returns a JSON string encoded from the given object
- Parameters:
data (Dict[str, Any]|None) – The object to encode
- Returns:
A JSON string
- abstractmethod async get_all_for_user(userid: str | int | None = None) List[Key][source]¶
Return all keys matching the given user ID
- Parameters:
userid (str|int|None) – User to return keys for
- Returns:
List[Key]: An array of keys
- abstractmethod async get_key(key: str) Key[source]¶
Returns the matching key in the session storage or raises an exception if it doesn’t exist.
- Parameters:
key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)
- Returns:
The matching Key record.
- abstractmethod async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]¶
Saves a session key in the session storage (e.g., database).
- Parameters:
userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.
value (str) – The key value to store.
date_created (datetime) – The date/time the key was created.
expires (datetime|None) – The date/time the key expires.
extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record
- Padam str|None data:
An optional value, specific to the type of key, e.g., new email for email change tokens
- abstractmethod async update_data(key_name: str, data_name: str, value: Any | None) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.
value (Any|None) – The new value.
- abstractmethod async update_key(key: PartialKey) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.
- Parameters:
key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.
- abstractmethod async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]¶
Same as ‘update_data’ but updates several keys.
Ensure it is done as a single transaction.
- Parameters:
key_name (str) – The key to update
data_array (List[crossauth_backend.KeyDataEntry) – dataName and value pairs
- class crossauth_backend.storage.OAuthAuthorizationStorage(options: OAuthAuthorizationStorageOptions = {})[source]¶
Bases:
ABCBase class for storing scopes that have been authorized by a user (or for client credentials, for a client).
This class is subclassed for various types of storage. For example, PrismaOAuthAuthorizationStorage is for storing in a database table, managed by the Prisma ORM.
- abstractmethod async get_authorizations(client_id: str, userid: str | int | None = None) List[str | None][source]¶
Returns the matching all scopes authorized for the given client and optionally user.
- Parameters:
client_id (str) – the client_id to look up
userid (int|str|None) – the userid to look up, None for a client authorization not user authorization
- Returns:
The authorized scopes as a list.
- abstractmethod async update_authorizations(client_id: str, userid: str | int | None, authorizations: List[str | None]) None[source]¶
Saves a new set of authorizations for the given client and optionally user.
Deletes the old ones.
- Parameters:
client_id (str) – the client_id to look up
userid (str|int|None) – the userid to look up, None for a client authorization not user authorization
List[str|None]authorizations – new set of authorized scopes, which may be empty
- class crossauth_backend.storage.OAuthClientStorage(options: OAuthClientStorageOptions = {})[source]¶
Bases:
ABCBase class for storing OAuth clients.
This class is subclassed for various types of client storage. For example, PrismaOAuthClientStorage is for storing clients in a database table, managed by the Prisma ORM.
- abstractmethod async create_client(client: OAuthClient) OAuthClient[source]¶
Creates and returns a new client with random ID and optionally secret.
Saves in the database.
- Parameters:
client (crossauth_backend.OAuthClient) – the client to save.
- Returns:
The new client.
- abstractmethod async delete_client(client_id: str) None[source]¶
Deletes a key from storage.
- Parameters:
client_id (str) – the client to delete
- abstractmethod async get_client_by_id(client_id: str) OAuthClient[source]¶
Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.
- Parameters:
client_id (str) – the client_id to look up
- Returns:
he matching OAuthClient object.
- abstractmethod async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.
- Parameters:
name (str) – the client name to look up
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.
- Returns:
A list of OAuthClient objects.
:raises
crossauth_backend.CrossauthError: withErrorCodeof ‘InvalidSessionId’ if a match was not found in session storage.
- abstractmethod async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns all clients in alphabetical order of client name.
- Parameters:
skip (int|None) – skip this number of records from the start in alphabetical order
take (int|None) – return at most this number of records
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.
- Returns:
A list of OAuthClient objects.
- abstractmethod async update_client(client: PartialOAuthClient) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.
- Parameters:
client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)
:raises
crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.
- class crossauth_backend.storage.UserAndSecrets[source]¶
Bases:
TypedDict- secrets: UserSecrets¶
- class crossauth_backend.storage.UserStorage(options: UserStorageOptions = {})[source]¶
Bases:
ABCBase class for place where user details are stored.
This class is subclassed for various types of user storage, e.g. PrismaUserStorage is for storing username and password in a database table, managed by the Prisma ORM.
Username and email searches should be case insensitive, as should their unique constraints. ID searches need not be case insensitive.
- property admin_editable_fields¶
- async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]¶
Creates a user with the given details and secrets.
- Parameters:
user (crossauth_backend.UserInputFields) – will be put in the User table
secrets (crossauth_backend.UserSecretsInputFields|None) – will be put in the UserSecrets table
- Returns:
the new user as a User object
:raises
crossauth_backend.CrossauthErrorwithErrorCodeConfiguration
- abstractmethod async delete_user_by_id(id: str | int) None[source]¶
If the storage supports this, delete the user with the given ID from storage.
- Parameters:
id (str|int) – id of user to delete
- abstractmethod async delete_user_by_username(username: str) None[source]¶
If the storage supports this, delete the named user from storage.
- Parameters:
username (str) – username to delete
- abstractmethod async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given field, or throws an exception.
This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.
- Parameters:
field (str) – the field to match
value (str) – the value to match
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- abstractmethod async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given email address, or throws an exception.
If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column
- Parameters:
email – the email address to return the user of
options – optionally turn off checks. Used internally
- Raises:
crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection
- abstractmethod async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given user id, or throws an exception.
Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.
- Parameters:
id (str|int) – the user id to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthErrorwithErrorCodeeither UserNotExist or Connection
- abstractmethod async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given username, or throws an exception.
If normalize_username is true, the username should be matched normalized and lowercased (using normalize())
- Parameters:
username (str) – the username to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- abstractmethod async get_users(skip: int | None = None, take: int | None = None) List[User][source]¶
Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)
- Parameters:
skip (int|None) – skip this number of records from the start of the set
take (int|None) – only return at most this number of records
- Returns:
an array of User objects
- static normalize(string: str) str[source]¶
By default, usernames and emails are stored in lowercase, normalized format. This function returns that normalization.
- Parameters:
string (str) – the string to normalize
- Returns:
the normalized string, in lowercase with diacritics removed
- property normalize_email¶
- property normalize_username¶
- abstractmethod async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]¶
Updates an existing user with the given details and secrets.
If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.
- Parameters:
user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.
secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update
- property user_editable_fields¶
- class crossauth_backend.storage.UserStorageGetOptions[source]¶
Bases:
TypedDictPassed to get methods :class: UserStorage.
- skip_active_check: bool¶
If true, a valid user will be returned even if state is not set to active
- skip_email_verified_check: bool¶
If true, a valid user will be returned even if state is set to awaitingemailverification
- class crossauth_backend.storage.UserStorageOptions[source]¶
Bases:
TypedDictOptions passed to :class: UserStorage constructor
- admin_editable_fields: List[str]¶
Fields that admins are allowed to edit (in addition to userEditableFields)
- normalize_email: bool¶
If true, email addresses (in the email column not in the username column) will be matched as lowercase and with diacritics removed. Default true.
- normalize_username: bool¶
If true, usernames will be matched as lowercase and with diacritics removed. Default true,
Note: this doesn’t apply to the ID column
- user_editable_fields: List[str]¶
Fields that users are allowed to edit. Any fields passed to a create or update call that are not in this list will be ignored.
crossauth_backend.utils module¶
- class crossauth_backend.utils.ParamType(*values)[source]¶
Bases:
EnumType of parameter that can be parsed from an option value or environment variable
- Boolean = 4¶
- Integer = 3¶
- Json = 5¶
- JsonArray = 6¶
- Number = 2¶
- String = 1¶
- crossauth_backend.utils.attr_name(instance: Any, param: str, public: bool = False, protected: bool = False)[source]¶
- crossauth_backend.utils.set_from_env(instance: Any, param: str, param_type: ParamType, name_in_env_file: str, public: bool = False, protected: bool = False) None[source]¶
- crossauth_backend.utils.set_from_option(instance: Any, param: str, options: Mapping[str, Any], public: bool = False, protected: bool = False) None[source]¶
- crossauth_backend.utils.set_parameter(param: str, param_type: ParamType, instance: Any, options: Mapping[str, Any], env_name: str | None = None, required: bool = False, public: bool = False, protected: bool = False) None[source]¶
Sets an instance variable in the passed object from the passed options object and environment variable.
If the named parameter exists in the options object, then the instance variable is set to that value. Otherwise, if the named environment variable exists, it is set from that. Otherwise, the instance variable is not updated.
- Parameters:
param (str) – The name of the parameter in the options variable and the name of the variable in the instance.
param_type (ParamType) – The type of variable. If the value is JsonArray or Json, both the option and the environment variable value should be a string, which will be parsed.
instance (Any) – Options present in the options or environment variables will be set on a corresponding instance variable in this class or object.
options (Dict[str, Any]) – Object containing options as key/value pairs.
env_name (str) – Name of environment variable.
required (bool) – If true, an exception will be thrown if the variable is not present in options or the environment variable.
public (bool) – If false, _ will be prepended to the field nam,e in the target. Default False.
protected (bool) – If true, __ will be prepended to the field nam,e in the target. Default False. Don’t use this and public together.
- Raises:
crossauth_backend.CrossauthError: withErrorCodeConfiguration if required is set but the option was not present, or if there was a parsing error.
Module contents¶
- class crossauth_backend.ApiKey[source]¶
Bases:
KeyAn API key is a str that can be used in place of a username and password. These are not automatically created, like OAuth access tokens.
- created: datetime¶
- data: NotRequired[str]¶
- expires: datetime | NullType¶
- lastactive: NotRequired[datetime]¶
- name: str¶
A name for the key, unique to the user
- userid: NotRequired[str | int | NullType]¶
- value: str¶
- class crossauth_backend.AuthenticationOptions[source]¶
Bases:
TypedDictOptions to pass to the constructor.
- friendly_name: str¶
If passed, this is what will be displayed to the user when selecting an authentication method.
- class crossauth_backend.AuthenticationParameters[source]¶
Bases:
UserSecretsInputFieldsParameters needed for this this class to authenticator a user (besides username) An example is password
- expiry: int¶
- otp: str¶
- password: str¶
- totpsecret: str¶
- class crossauth_backend.Authenticator(options: AuthenticationOptions = {})[source]¶
Bases:
ABCBase class for username/password authentication.
Subclass this if you want something other than PBKDF2 password hashing.
- abstractmethod async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
- capabilities() AuthenticatorCapabilities[source]¶
- abstractmethod async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
- abstractmethod async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
- factor_name: str = ''¶
- friendly_name: str¶
- abstractmethod async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
- abstractmethod async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
- require_user_entry() bool[source]¶
If your authenticator doesn’t need a user to be in the table (because it can create one), override this and return false. Default is true
- abstractmethod validate_secrets(params: AuthenticationParameters) List[str][source]¶
- class crossauth_backend.AuthenticatorCapabilities[source]¶
Bases:
TypedDict- can_create_user: bool¶
- can_update_secrets: bool¶
- can_update_user: bool¶
- class crossauth_backend.AuthorizeQueryType[source]¶
Bases:
TypedDict- client_id: str¶
- code_challenge: str¶
- code_challenge_method: str¶
- redirect_uri: str¶
- response_type: Required[str]¶
- scope: str¶
- state: str¶
- exception crossauth_backend.CrossauthError(code: ErrorCode, message: str | list[str] | None = None)[source]¶
Bases:
ExceptionThrown by Crossauth functions whenever it encounters an error.
- static as_crossauth_error(e: Any, default_message: str | None = None) CrossauthError[source]¶
If the passed object is a crossauth_backend.CrossauthError instance, simply returns it. If not and it is an object with errorCode in it, creates a CrossauthError from that and errorMessage, if present. Otherwise creates a crossauth_backend.CrossauthError object with
ErrorCodeof Unknown from it, setting the message if possible.- Parameters:
e (Any) – the error to convert.
default_message (str|None) – message to use if there was none in the original exception.
- Returns:
a
crossauth_backend.CrossauthErrorinstance.
- property code_name¶
Return the name of the error code
- static from_oauth_error(error: str, error_description: str | None = None) CrossauthError[source]¶
OAuth defines certain error types. To convert the error in an OAuth response into a CrossauthError object, call this function.
- Parameters:
error (str) – as returned by an OAuth call (converted to an
ErrorCode).
- :param str error_description as returned by an OAuth call (put in the
message)
:return a crossauth_backend.CrossauthError instance.
- static http_status_name(status: str | int) str[source]¶
Returns the friendly name for an HTTP response code.
If it is not a recognized one, returns the friendly name for 500. @param status the HTTP response code, which, while being numeric,
can be in a string or number.
@returns the string version of the response code.
- property oauthErrorCode: str¶
Return the OAuth name of an error code (eg “server_error”)
- class crossauth_backend.CrossauthLogger(level: int | None = None)[source]¶
Bases:
CrossauthLoggerInterfaceA very simple logging class with no dependencies.
Logs to console.
The logging API is designed so that you can replace this with other common loggers, eg Pino. To change it, use the global
CrossauthLogger.set_logger()function. This has a parameter to tell Crossauth whether your logger accepts JSON input or not.When writing logs, we use the helper function
j()to send JSON to the logger if it is supprted, and a stringified JSON otherwise.Crossauth logs
All Crossauth log messages are JSON (or stringified JSON, depending on whether the logger supports JSON input - this one does). The following fields may be present depending on context (msg is always present):
msg : main contents of the log
- erran error object. If a subclass of Error, it wil contain at least message and
a stack trace in stack. If the error is of type
crossauth_backend.CrossauthErrorit also will also contain code and http_status.
- hashedSessionCookiefor security reasons, session cookies are not included in logs.
However, so that errors can be correlated with each other, a hash of it is included in errors originating from a session.
- hashedCsrfCookiefor security reasons, csrf cookies are not included in logs.
However, so that errors can be correlated with each other, a hash of it is included in errors originating from a session.
user : username
emailMessageId : internal id of any email that is sent
email : email address
userid : sometimes provided in addition to username, or when username not available
- hahedApiKeya hash of an API key. The unhashed version is not logged for security,
but a hash of it is logged for correlation purposes.
- headeran HTTP header that relates to an error (eg Authorization), only if
it is non-secret or invalid
- accessTokenHashhash of the JTI of an access token. For security reasons, the
unhashed version is not logged.
method: request method (GET, PUT etc)
url : relevant URL
ip : relevant IP address
scope : OAuth scope
error_code : Crossauth error code
error_code_name : String version of Crossauth error code
http_status : HTTP status that will be returned
port port service is running on (only for starting a service)
prefix prefix for endpoints (only when starting a service)
authorized whether or not a valid OAuth access token was provided
- levelName = ['NONE', 'ERROR', 'WARN', 'INFO', 'DEBUG']¶
- static logger() CrossauthLoggerInterface[source]¶
Returns the static logger instance
- rossauth_logger_accepts_json = True¶
- static set_logger(logger: CrossauthLoggerInterface, accepts_json: bool) None[source]¶
Set the static logger instance
- class crossauth_backend.Crypto[source]¶
Bases:
objectProvides cryptographic functions
- Base32 = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789'¶
- static base64_decode(encoded: str) str[source]¶
Decodes a string from base64 to UTF-8 :param str encoded: base64-encoded text
- Returns:
URF-8 text
- static base64_encode(text: str) str[source]¶
Base64-encodes UTF-8 text :param str: text UTF-8 text
- Returns:
Base64 text
- static decode_password_hash(hash: str) PasswordHash[source]¶
Splits a hashed password into its component parts. Return it as a
PasswordHash.The format of the hash should be
` digest:int key_len:iterations:useSecret:salt:hashedPassword `The hashed password part is the Base64 encoding of the PBKDF2 password. :param str hash: the hassed password to decode. See above for format- Returns:
PasswordHashobject containing the deecoded hash components
- static encode_password_hash(hashed_password: str, salt: str, use_secret: bool, iterations: int, key_len: int, digest: str) str[source]¶
Encodes a hashed password into the string format it is stored as.
See
decodePasswordHash()for the format it is stored in.- Parameters:
hashed_password (str) – the Base64-encoded PBKDF2 hash of the password
salt (str) – the salt used for the password.
use_secret (bool) – whether or not to use the application secret as part of the hash.
iterations (int) – the number of PBKDF2 iterations
key_len (int) – the key length PBKDF2 parameter - results in a hashed password this length, before Base64,
digest (str) – The digest algorithm, eg pbkdf2
- Returns:
a string encode the above parameters.
- static hash(plaintext: str) str[source]¶
Standard hash using SHA256 (not PBKDF2 or HMAC)
- Parameters:
plaintext (:str) – text to hash
- Returns:
the string containing the hash
- async static password_hash(plaintext: str, options: HashOptions = {}) str[source]¶
Hashes a password and returns it as a base64 or base64url encoded string :param str plaintext: password to hash :param HashOptions options:
salt: salt to use. Make a random one if not passed
secret: optional application secret password to apply as a pepper
encode: if true, returns the full string as it should be stored in the database.
- Returns:
the string containing the hash and the values to decode it
- async static passwords_equal(plaintext: str, encoded_hash: str, secret: str | None = None) bool[source]¶
Returns true if the plaintext password, when hashed, equals the one in the hash, using it’s hasher settings :param str plaintext: the plaintext password :param str encoded_hash: the previously-hashed version :param str|None secret: if useHash`in `encodedHash is true, uses as a pepper for the hasher
- Returns:
true if they are equal, false otherwise
- static random_base32(length: int, dash_every: int | None = None) str[source]¶
Creates a random base-23 string :param int length: length of the string to create
- Returns:
the random value as a string. Number of bytes will be greater as it is base64 encoded.
- static random_salt() str[source]¶
Creates a random salt
- Returns:
random salt as a base64 encoded string
- static random_value(length: int) str[source]¶
Creates a random string encoded as in base64url :param int length: length of the string to create
- Returns:
the random value as a string. Number of bytes will be greater as it is base64 encoded.
- static sha256(plaintext: str) str[source]¶
Standard hash using SHA256 (not PBKDF2 or HMAC)
- Parameters:
plaintext (str) – text to hash
- Returns:
the string containing the hash
- static sign(payload: Dict[str, Any], secret: str, salt: str | None = None, timestamp: int | None = None) str[source]¶
Signs a JSON payload by creating a hash, using a secret and optionally also a salt and timestamp
- Parameters:
payload (Dict[str, Any]) – object to sign (will be stringified as a JSON)
secret (str) – secret key, which must be a string
salt (str|None) – optionally, a salt to concatenate with the payload (must be a string)
- Paramint|None timestamp:
optionally, a timestamp to include in the signed date as a Unix date
- Returns:
Base64-url encoded hash
- static sign_secure_token(payload: str, secret: str) str[source]¶
This can be called for a string payload that is a cryptographically secure random string. No salt is added and the token is assumed to be Base64Url already
- Parameters:
payload (str) – string to sign
secret (str) – the secret to sign with
- Returns:
Base64-url encoded hash
- static signable_token(payload: Dict[str, Any], salt: str | None = None, timestamp: int | None = None) str[source]¶
hash is of a JSON containing the payload, timestamp and optionally a salt. :param Dict[str, Any]: payload the payload to hash :param str|None salt: optional salt (use if the payload is small) :param int|None timestamp: time the token will expire
- Returns:
a Base64-URL-encoded string that can be hashed.
- static symmetric_decrypt(ciphertext: str, key_string: str) str[source]¶
Symmetric decryption using a key that must be a string
- Parameters:
ciphertext (str) – Base64-url encoded ciphertext
key_string (str) – the symmetric key
- Returns:
Decrypted text
- static symmetric_encrypt(plaintext: str, key_string: str, iv: bytes | None = None) str[source]¶
Symmetric encryption using a key that must be a string
- Parameters:
plaintext (str) – Text to encrypt
key_string (str) – the symmetric key
bytes|None (iv) – the iv value. In None, a random one is created
- Returns:
Encrypted text Base64-url encoded.
- static unsign(signed_message: str, secret: str, expiry: int | None = None) Dict[str, Any][source]¶
Validates a signature and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string :param int|None expiry: if set, validation will fail if the timestamp in the payload is after this date
- Returns:
if signature is valid, the payload as an object
- :raises
crossauth_backend.CrossauthError: with ErrorCodeof InvalidKey if signature is invalid or has expired.
- static unsign_secure_token(signed_message: str, secret: str, expiry: int | None = None) str[source]¶
Validates a signature signed with signSecureToken and, if valid, return the unstringified payload :param str signed_message: signed message (base64-url encoded) :param str secret: secret key, which must be a string
- Returns:
if signature is valid, the payload as a string
- :raises
crossauth_backend.CrossauthError: with {
ErrorCodeof InvalidKey if signature is invalid or has expired.
- class crossauth_backend.DoubleSubmitCsrfToken(options: DoubleSubmitCsrfTokenOptions = {})[source]¶
Bases:
objectClass for creating and validating CSRF tokens according to the double-submit cookie pattern.
CSRF token is send as a cookie plus either a header or a hidden form field.
- property cookie_name¶
- create_csrf_token() str[source]¶
Creates a session key and saves in storage
Date created is the current date/time on the server.
- Returns:
a random CSRF token.
- property domain¶
- property header_name¶
- property httpOnly¶
- make_csrf_cookie(token: str) Cookie[source]¶
Returns a
Cookieobject with the given session key.- Parameters:
token (str) – the value of the csrf token, with signature
:return a
Cookieobject,
- make_csrf_cookie_string(cookie_value: str) str[source]¶
Takes a session ID and creates a string representation of the cookie (value of the HTTP Cookie header).
:param str cookie_value the value to put in the cookie
- Returns:
a string representation of the cookie and options.
- property path¶
- property sameSite¶
- property secure¶
- validate_csrf_cookie(cookie_value: str) str[source]¶
Validates the passed CSRF cookie (doesn’t check it matches the token, just that the cookie is valid).
- To be valid:
The signature in the cookie must match the token in the cookie
The token in the cookie must matched the value in the form or header after unmasking
- Parameters:
cookie_value (str) – the CSRF cookie value to validate.
:raises
crossauth_backend.CrossauthErrorwithErrorCodeof InvalidKey
- validate_double_submit_csrf_token(cookie_value: str, form_or_header_name: str) None[source]¶
Validates the passed CSRF token.
- To be valid:
The signature in the cookie must match the token in the cookie
The token in the cookie must matched the value in the form or header after unmasking
- Parameters:
cookie_value (str) – the CSRF cookie value to validate.
:param str form_or_header_name the value from the csrf_token form header or the X-CROSSAUTH-CSRF header.
:raises
crossauth_backend.CrossauthErrorwithErrorCodeof InvalidKey
- class crossauth_backend.DoubleSubmitCsrfTokenOptions[source]¶
Bases:
CookieOptionsOptions for double-submit csrf tokens
- cookie_name: NotRequired[str]¶
- domain: str¶
- expires: datetime¶
- header_name: NotRequired[str]¶
- httpOnly: bool¶
- maxAge: int¶
- path: str¶
- sameSite: bool | Literal['lax', 'strict', 'none']¶
- secret: NotRequired[str]¶
- secure: bool¶
- class crossauth_backend.DummyFactor2Authenticator(code: str, options: DummyFactor2AuthenticatorOptions = {})[source]¶
Bases:
PasswordAuthenticatorThis authenticator creates fixed one-time code
- async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
Authenticates the user by comparing the user-provided otp with the one in secrets.
Validation fails if the otp is incorrect or has expired.
- Parameters:
user – ignored
secrets – taken from the session and should contain otp and expiry
params – user input and should contain otp
- Raises:
CrossauthError – with ErrorCode InvalidToken or Expired.
- property code¶
- async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
Creates and emails a new one-time code.
- Parameters:
user – ignored
:return Dictionary with ‘otp’ and ‘expiry’ as a Unix time (number).
- async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
Does nothing for this class
- async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
- async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
Reprepare configuration for 2FA authentication
- Parameters:
username – Username (unused parameter indicated by underscore prefix)
session_key – Key object containing session data
:return Dictionary with user_data, secrets, and new_session_data, or None
- validate_secrets(params: AuthenticationParameters) List[str][source]¶
Nothing to do for this class. Returns empty set
- class crossauth_backend.DummyFactor2AuthenticatorOptions[source]¶
Bases:
AuthenticationOptionsOptional parameters for :class: DummyFactor2Authenticator.
See :func: DummyFactor2Authenticator__init__ for details
- friendly_name: str¶
- class crossauth_backend.DummySmsAuthenticator(options: SmsAuthenticatorOptions = {})[source]¶
Bases:
SmsAuthenticatorThis authenticator mocks sending an SMS OTP
- class crossauth_backend.EmailAuthenticator(options: EmailAuthenticatorOptions = {})[source]¶
Bases:
AuthenticatorThis authenticator sends a one-time code by email
- async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
Authenticates the user by comparing the user-provided otp with the one in secrets.
Validation fails if the otp is incorrect or has expired.
:param _user ignored :param secrets taken from the session and should contain otp and
expiry
:param params user input and should contain otp :raise CrossauthError with ErrorCode InvalidToken or Expired.
- async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
Creates and emails a new one-time code. @param user the user to create it for. Uses the email field if
present, username otherwise (which in this case is expected to contain an email address)
:return otp and expiry as a Unix time (number).
- async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
Does nothing for this class
- static is_email_valid(email: str) bool[source]¶
Returns whether or not the passed email has a valid form. @param email the email address to validate
:return true if it is valid. false otherwise
- async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
Creates and emails the one-time code @param user the user to create it for. Uses the email field if
present, username otherwise (which in this case is expected to contain an email address)
- :return userData containing username, email, factor2
- sessionData containing the same plus otp and expiry which
is a Unix time (number).
- async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
Creates and emails a new one-time code. :param _username ignored :param sessionKey the session containing the previously created data. :return dict
- skip_email_verification_on_signup() bool[source]¶
- :return true - as a code is sent to the registers email address, no
additional email verification is needed
- static validate_email(email: str | None) None[source]¶
Throws an exception if an email address doesn’t have a valid form. @param email the email address to validate @throws CrossauthError with ErrorCode InvalidEmail.
- validate_secrets(params: AuthenticationParameters) List[str][source]¶
Does nothing for this class
- class crossauth_backend.EmailAuthenticatorOptions[source]¶
Bases:
AuthenticationOptionsOptional parameters for :class: EmailAuthenticator.
See :func: EmailAuthenticator__init__ for details
- email_authenticator_html_body: str¶
Template file containing page for producing the HTML version of the email verification email body
- email_authenticator_subject: str¶
Subject for the the email verification email
- email_authenticator_text_bod: str¶
Template file containing page for producing the text version of the email verification email body
- email_authenticator_token_expires: int¶
Number of seconds before otps should expire. Default 5 minutes
- email_from: str¶
Sender for emails
- friendly_name: str¶
- render: Callable[[str, Dict[str, Any]], str]¶
if passed, use this instead of the default nunjucks renderer
- smtp_host: str¶
Hostname of the SMTP server. No default - required parameter
- smtp_password: str¶
Password for connecting to SMTP servger. Default undefined
- smtp_port: int¶
Port the SMTP server is running on. Default 25
- smtp_use_tls: bool¶
Whether or not TLS is used by the SMTP server. Default false
- smtp_username: str¶
Username for connecting to SMTP servger. Default undefined
- views: str¶
The directory containing views (by default, Jinja2 templates)
- class crossauth_backend.ErrorCode(*values)[source]¶
Bases:
EnumIndicates the type of error reported by
crossauth_backend.CrossauthError- AuthorizationPending = 44¶
Thrown in the OAuth device code flow
- BadRequest = 43¶
Thrown when an invalid request is made, eg configure 2FA when 2FA is switched off for user
- ClientExists = 6¶
This is returned if attempting to make a client which already exists (client_id or name/userid)
- Configuration = 31¶
Thrown when something is missing or inconsistent in configuration
- Connection = 24¶
Thrown when there is a connection error, eg to a database
- ConstraintViolation = 47¶
Thrown in database handlers where an insert causes a constraint violation
- DataFormat = 39¶
Thrown when a the data field of key storage is not valid json
- EmailNotExist = 3¶
Thrown when a a password reset is requested and the email does not exist
- EmailNotVerified = 12¶
Thrown on login attempt with a user account marked not having had the email address validated
- Expired = 23¶
Thrown when a session or API key has expired
- ExpiredToken = 46¶
Thrown in the OAuth device code flow
- Factor2ResetNeeded = 30¶
Thrown if the user needs to reset factor2 before logging in
- FetchError = 40¶
Thrown if a fetch failed
- Forbidden = 19¶
Returned with an HTTP 403 response
- FormEntry = 42¶
Thrown by user-supplied validation functions if a user details form was incorrectly filled out
- InsufficientPriviledges = 18¶
Returned if user is valid but doesn’t have permission to access resource
- InsufficientScope = 17¶
Thrown for the OAuth insufficient_scope error
- InvalidClientId = 5¶
This is returned if an OAuth2 client id is invalid
- InvalidClientIdOrSecret = 8¶
Server endpoints in this package will return this instead of InvalidClientId or InvalidClientSecret for security purposes
- InvalidClientSecret = 7¶
This is returned if an OAuth2 client secret is invalid
- InvalidCsrf = 21¶
Thrown if the CSRF token is invalid
- InvalidEmail = 32¶
Thrown if an email address in invalid
- InvalidHash = 25¶
Thrown when a hash, eg password, is not in the given format
- InvalidKey = 20¶
Thrown when a session or API key was provided that is not in the key table. For CSRF and sesison key, an InvalidCsrf or InvalidSession will be thrown instead
- InvalidOAuthFlow = 10¶
This is returned a request is made with a an oauth flow name that is not recognized
- InvalidPhoneNumber = 33¶
Thrown if a phone number in invalid
- InvalidRedirectUri = 9¶
This is returned a request is made with a redirect Uri that is not registered
- InvalidScope = 16¶
Thrown for the OAuth invalid_scope error
- InvalidSession = 22¶
Thrown if the session cookie is invalid
- InvalidToken = 36¶
Thrown when a token (eg TOTP or OTP) is invalid
- InvalidUsername = 34¶
Thrown if an email address in invalid
- KeyExists = 27¶
Thrown if you try to create a key which already exists in key storage
- MfaRequired = 37¶
Thrown during OAuth password flow if an MFA step is needed
- NotImplemented = 48¶
Thrown if a method is unimplemented, typically when a feature is not yet supported in this language.
- PasswordChangeNeeded = 28¶
Thrown if the user needs to reset his or her password
- PasswordFormat = 38¶
Thrown when a password does not match rules (length, uppercase/lowercase/digits)
- PasswordInvalid = 2¶
Thrown when a password does not match, eg during login or signup
- PasswordMatch = 35¶
Thrown when two passwords do not match each other (eg signup)
- PasswordResetNeeded = 29¶
Thrown if the user needs to reset his or her password
- SlowDown = 45¶
Thrown in the OAuth device code flow
- TwoFactorIncomplete = 13¶
Thrown on login attempt with a user account marked not having completed 2FA setup
- Unauthorized = 14¶
Thrown when a resource expecting user authorization was access and authorization not provided or wrong
- UnauthorizedClient = 15¶
Thrown for the OAuth unauthorized_client error (when the client is unauthorized as opposed to the user)
- UnknownError = 50¶
Thrown for an condition not convered above.
- UnsupportedAlgorithm = 26¶
Thrown when an algorithm is requested but not supported, eg hashing algorithm
- UserExists = 41¶
Thrown when attempting to create a user that already exists
- UserNotActive = 11¶
Thrown on login attempt with a user account marked inactive
- UserNotExist = 1¶
Thrown when a given username does not exist, eg during login
- UsernameOrPasswordInvalid = 4¶
For endpoints provided by servers in this package, this is returned instead of UserNotExist or PasswordNotMatch, for security reasons
- ValueError = 49¶
Thrown a dict field is unexpectedly missing or wrong type
- class crossauth_backend.HashOptions[source]¶
Bases:
TypedDictOption parameters for
Crypto.passwordHash- digest: str¶
PBKDF2 digest method
- encode: bool¶
Whether to Base64-URL-encode the result
- iterations: int¶
Number of PBKDF2 iterations
- key_len: int¶
Length (before Base64-encoding) of the PBKDF2 key being generated
- salt: str¶
A salt to prepend to the message before hashing
- secret: str¶
A secret to append to the salt when hashing, or undefined for no secret
- class crossauth_backend.InMemoryKeyStorage[source]¶
Bases:
KeyStorageImplementation of
KeyStoragewhere keys stored in memory. Intended for testing.- async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]¶
Deletes all keys from storage for the given user ID
- Parameters:
userid (int|str|None) – User ID to delete keys for
prefix (str) – Only keys starting with this prefix will be deleted
except_key (str|None) – If defined, the key with this value will not be deleted
- async delete_data(key_name: str, data_name: str) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.
- async delete_key(value: str) None[source]¶
Deletes a key from storage (e.g., the database).
- Parameters:
value (str) – The key to delete
- async delete_matching(key: PartialKey) None[source]¶
Deletes all matching the given specs
- Parameters:
key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted
- async get_all_for_user(userid: str | int | None = None) List[Key][source]¶
Return all keys matching the given user ID
- Parameters:
userid (str|int|None) – User to return keys for
- Returns:
List[Key]: An array of keys
- async get_key(key: str) Key[source]¶
Returns the matching key in the session storage or raises an exception if it doesn’t exist.
- Parameters:
key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)
- Returns:
The matching Key record.
- async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]¶
Saves a session key in the session storage (e.g., database).
- Parameters:
userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.
value (str) – The key value to store.
date_created (datetime) – The date/time the key was created.
expires (datetime|None) – The date/time the key expires.
extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record
- Padam str|None data:
An optional value, specific to the type of key, e.g., new email for email change tokens
- async update_data(key_name: str, data_name: str, value: Any | None) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.
value (Any|None) – The new value.
- async update_key(key: PartialKey) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.
- Parameters:
key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.
- async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]¶
Same as ‘update_data’ but updates several keys.
Ensure it is done as a single transaction.
- Parameters:
key_name (str) – The key to update
data_array (List[crossauth_backend.KeyDataEntry) – dataName and value pairs
- class crossauth_backend.InMemoryOAuthAuthorizationStorage(options: OAuthAuthorizationStorageOptions = {})[source]¶
Bases:
OAuthAuthorizationStorageImplementation of
KeyStoragewhere keys stored in memory. Intended for testing.- async get_authorizations(client_id: str, userid: str | int | None = None) List[str | None][source]¶
Returns the matching all scopes authorized for the given client and optionally user.
- Parameters:
client_id (str) – the client_id to look up
userid (int|str|None) – the userid to look up, None for a client authorization not user authorization
- Returns:
The authorized scopes as a list.
- async update_authorizations(client_id: str, userid: str | int | None, authorizations: List[str | None]) None[source]¶
Saves a new set of authorizations for the given client and optionally user.
Deletes the old ones.
- Parameters:
client_id (str) – the client_id to look up
userid (str|int|None) – the userid to look up, None for a client authorization not user authorization
List[str|None]authorizations – new set of authorized scopes, which may be empty
- class crossauth_backend.InMemoryOAuthClientStorage(options: OAuthClientStorageOptions = {})[source]¶
Bases:
OAuthClientStorageImplementation of
KeyStoragewhere keys stored in memory. Intended for testing.- async create_client(client: OAuthClient) OAuthClient[source]¶
Creates and returns a new client with random ID and optionally secret.
Saves in the database.
- Parameters:
client (crossauth_backend.OAuthClient) – the client to save.
- Returns:
The new client.
- async delete_client(client_id: str) None[source]¶
Deletes a key from storage.
- Parameters:
client_id (str) – the client to delete
- async get_client_by_id(client_id: str) OAuthClient[source]¶
Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.
- Parameters:
client_id (str) – the client_id to look up
- Returns:
he matching OAuthClient object.
- async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.
- Parameters:
name (str) – the client name to look up
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.
- Returns:
A list of OAuthClient objects.
:raises
crossauth_backend.CrossauthError: withErrorCodeof ‘InvalidSessionId’ if a match was not found in session storage.
- async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns all clients in alphabetical order of client name.
- Parameters:
skip (int|None) – skip this number of records from the start in alphabetical order
take (int|None) – return at most this number of records
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.
- Returns:
A list of OAuthClient objects.
- async update_client(client: PartialOAuthClient) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.
- Parameters:
client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)
:raises
crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.
- class crossauth_backend.InMemoryUserStorage(options: InMemoryUserStorageOptions = {})[source]¶
Bases:
UserStorageImplementation of
KeyStoragewhere keys stored in memory. Intended for testing.- async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]¶
Creates a user with the given details and secrets.
- Parameters:
user (crossauth_backend.UserInputFields) – will be put in the User table
secrets (crossauth_backend.UserSecretsInputFields|None) – will be put in the UserSecrets table
- Returns:
the new user as a User object
:raises
crossauth_backend.CrossauthErrorwithErrorCodeConfiguration
- async delete_user_by_id(id: str | int) None[source]¶
If the storage supports this, delete the user with the given ID from storage.
- Parameters:
id (str|int) – id of user to delete
- async delete_user_by_username(username: str) None[source]¶
If the storage supports this, delete the named user from storage.
- Parameters:
username (str) – username to delete
- async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given field, or throws an exception.
This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.
- Parameters:
field (str) – the field to match
value (str) – the value to match
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given email address, or throws an exception.
If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column
- Parameters:
email – the email address to return the user of
options – optionally turn off checks. Used internally
- Raises:
crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection
- async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given user id, or throws an exception.
Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.
- Parameters:
id (str|int) – the user id to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthErrorwithErrorCodeeither UserNotExist or Connection
- async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given username, or throws an exception.
If normalize_username is true, the username should be matched normalized and lowercased (using normalize())
- Parameters:
username (str) – the username to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- async get_users(skip: int | None = None, take: int | None = None) List[User][source]¶
Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)
- Parameters:
skip (int|None) – skip this number of records from the start of the set
take (int|None) – only return at most this number of records
- Returns:
an array of User objects
- async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]¶
Updates an existing user with the given details and secrets.
If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.
- Parameters:
user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.
secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update
- class crossauth_backend.JWT(token: str | None = None, payload: Dict[str, Any] = {})[source]¶
Bases:
objectEncapsulates the payload of a JWT, with both the token and decoded JSON payload.
- class crossauth_backend.Key[source]¶
Bases:
TypedDictA key (eg session ID, email reset token) as stored in a database table.
The fields defined here are the ones used by Crossauth. You may add others.
- created: datetime¶
The datetime/time the key was created, in local time on the server
- data: NotRequired[str]¶
Additional key-specific data (eg new email address for email change).
While application specific, any data Crossauth puts in this field is a strified JSON, with its own key so it can co-exist with other data.
- expires: datetime | NullType¶
The datetime/time the key expires
- lastactive: NotRequired[datetime]¶
The datetime/time key was last used (eg last time a request was made with this value as a session ID)
- userid: NotRequired[str | int | NullType]¶
the user this key is for (or undefined for an anonymous session ID)
It accepts the value null as usually this is the value stored in the database, rather than undefined. Some functions need to differentiate between a null value as opposed to the value not being defined (eg for a partial updatetime).
- value: str¶
The value of the keykey. In a cookie, the value part of cookiename=value; options…
- class crossauth_backend.KeyDataEntry[source]¶
Bases:
TypedDict- data_name: str¶
- value: NotRequired[Any]¶
- class crossauth_backend.KeyPrefix[source]¶
Bases:
objectThese are used prefixes for keye in a key storage entry
- access_token = 'access:'¶
- api_key = 'api:'¶
- authorization_code = 'authz:'¶
- device_code = 'dc:'¶
- email_verification_token = 'e:'¶
- mfa_token = 'omfa:'¶
- password_reset_token = 'p:'¶
- refresh_token = 'refresh:'¶
- session = 's:'¶
- user_code = 'uc:'¶
- class crossauth_backend.KeyStorage[source]¶
Bases:
ABCBase class for storing session and API keys.
This class is subclassed for various types of session key storage. For example, PrismaKeyStorage is for storing sessions in a database table, managed by the Prisma ORM.
- static decode_data(data: str | None) Dict[str, Any][source]¶
Returns an object decoded from the data field as a JSON string
- Parameters:
data (str|None) – The JSON string to decode
- Returns:
The parsed JSON object
- Raises:
json.JSONDecodeError – If data is not a valid JSON string
- abstractmethod async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]¶
Deletes all keys from storage for the given user ID
- Parameters:
userid (int|str|None) – User ID to delete keys for
prefix (str) – Only keys starting with this prefix will be deleted
except_key (str|None) – If defined, the key with this value will not be deleted
- abstractmethod async delete_data(key_name: str, data_name: str) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.
- abstractmethod async delete_key(value: str) None[source]¶
Deletes a key from storage (e.g., the database).
- Parameters:
value (str) – The key to delete
- abstractmethod async delete_matching(key: PartialKey) None[source]¶
Deletes all matching the given specs
- Parameters:
key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted
- static encode_data(data: Dict[str, Any] | None = None) str[source]¶
Returns a JSON string encoded from the given object
- Parameters:
data (Dict[str, Any]|None) – The object to encode
- Returns:
A JSON string
- abstractmethod async get_all_for_user(userid: str | int | None = None) List[Key][source]¶
Return all keys matching the given user ID
- Parameters:
userid (str|int|None) – User to return keys for
- Returns:
List[Key]: An array of keys
- abstractmethod async get_key(key: str) Key[source]¶
Returns the matching key in the session storage or raises an exception if it doesn’t exist.
- Parameters:
key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)
- Returns:
The matching Key record.
- abstractmethod async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]¶
Saves a session key in the session storage (e.g., database).
- Parameters:
userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.
value (str) – The key value to store.
date_created (datetime) – The date/time the key was created.
expires (datetime|None) – The date/time the key expires.
extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record
- Padam str|None data:
An optional value, specific to the type of key, e.g., new email for email change tokens
- abstractmethod async update_data(key_name: str, data_name: str, value: Any | None) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.
value (Any|None) – The new value.
- abstractmethod async update_key(key: PartialKey) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.
- Parameters:
key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.
- abstractmethod async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]¶
Same as ‘update_data’ but updates several keys.
Ensure it is done as a single transaction.
- Parameters:
key_name (str) – The key to update
data_array (List[crossauth_backend.KeyDataEntry) – dataName and value pairs
- class crossauth_backend.LdapAuthenticator(ldap_storage: LdapUserStorage, options: LdapAuthenticatorOptions = {})[source]¶
Bases:
PasswordAuthenticator- async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
Authenticates the user, returning a the user as a {@link User} object.
- @param user the username field is required and this is used for LDAP authentication.
If ldapAutoCreateAccount is true, these attributes as used for user creation (see {@link LdapUserStorage.createUser}).
@param _secrets Ignored as secrets are stored in LDAP @param params the password field is expected to contain the LDAP password. @throws {@link @crossauth/common!CrossauthError} with {@link @crossauth/common!ErrorCode} of Connection, UsernameOrPasswordInvalid.
- async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
Does nothing in this class
- async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
Does nothing in this class
- async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
Nothing to do in this class
- async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
Nothing to do in this class
- validate_secrets(params: AuthenticationParameters) List[str][source]¶
Does nothing as LDAP is responsible for password format (this class doesn’t create password entries)
- class crossauth_backend.LdapAuthenticatorOptions[source]¶
Bases:
AuthenticationOptionsOptional parameters for :class: LdapAuthenticator.
See :func: LdapAuthenticator__init__ for details
- friendly_name: str¶
- ldap_auto_create_account: bool¶
If true, an account will automatically be created (with factor1 taken from ldap_auto_create_factor1 when a user logs in with LDAP)
- ldap_auto_create_factor1: str¶
crossauth_backend.LdapAuthenticatorOptions
- Type:
See
- Type:
class
- class crossauth_backend.LdapUser[source]¶
Bases:
TypedDict- dn: str¶
The user’s dn in LDAP
- uid: NotRequired[str | List[str]]¶
- class crossauth_backend.LdapUserStorage(local_storage: UserStorage, options: LdapUserStorageOptions = {})[source]¶
Bases:
UserStorageWraps another user storage but with the authentication done in LDAP.
This class still needs a user to be created in another database, with for example a user id that can be referenced in key storage, and a state variable.
An admin account is not used. Searches are done as the user, with the user’s password.
- async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]¶
Creates a user with the given details and secrets.
- Parameters:
user (crossauth_backend.UserInputFields) – will be put in the User table
secrets (crossauth_backend.UserSecretsInputFields|None) – will be put in the UserSecrets table
- Returns:
the new user as a User object
:raises
crossauth_backend.CrossauthErrorwithErrorCodeConfiguration
- async delete_user_by_id(id: str | int) None[source]¶
If the storage supports this, delete the user with the given ID from storage.
- Parameters:
id (str|int) – id of user to delete
- async delete_user_by_username(username: str) None[source]¶
If the storage supports this, delete the named user from storage.
- Parameters:
username (str) – username to delete
- async get_ldap_user(username: str, password: str) LdapUser[source]¶
Gets the user from LDAP. Does not check local storage.
If the user doesn’t exist or authentication fails, an exception is thrown :param username: the username to fetch :param password: the LDAP password :returns: the matching LdapUser :raises: CrossauthError with ErrorCode UsernameOrPasswordInvalid or Connection
- async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given field, or throws an exception.
This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.
- Parameters:
field (str) – the field to match
value (str) – the value to match
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given email address, or throws an exception.
If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column
- Parameters:
email – the email address to return the user of
options – optionally turn off checks. Used internally
- Raises:
crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection
- async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given user id, or throws an exception.
Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.
- Parameters:
id (str|int) – the user id to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthErrorwithErrorCodeeither UserNotExist or Connection
- async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given username, or throws an exception.
If normalize_username is true, the username should be matched normalized and lowercased (using normalize())
- Parameters:
username (str) – the username to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- async get_users(skip: int | None = None, take: int | None = None) List[User][source]¶
Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)
- Parameters:
skip (int|None) – skip this number of records from the start of the set
take (int|None) – only return at most this number of records
- Returns:
an array of User objects
- async ldap_bind(dn: str, password: str) Connection[source]¶
bind and return the ldap client from https://github.com/shaozi/ldap-authentication/blob/master/index.js
- async search_user(ldap_client: Connection, user_dn: str, attributes: List[str] | None = None) LdapUser[source]¶
Search for user in LDAP
- async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]¶
Updates an existing user with the given details and secrets.
If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.
- Parameters:
user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.
secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update
- class crossauth_backend.LdapUserStorageOptions[source]¶
Bases:
UserStorageOptionsOptional parameters for {@link LdapUserStorage}.
- admin_editable_fields: List[str]¶
- create_user_fn: Callable[[UserInputFields, LdapUser], UserInputFields]¶
A function to create a user object given the entry in LDAP and additional fields. The additional fields might be useful for attributes that aren’t in LDAP and the user needs to be prompted for, for example email address. The default function sets username to uid from ldapUser, state to active and takes every field for user (overriding status and username if present).
- ldap_urls: List[str]¶
//ldap,example.com:1636 No default (required)
- Type:
Utl running LDAP server. eg ldap
- Type:
//ldap.example.com or ldaps
- ldap_user_search_base: str¶
Search base, for user queries, eg ou=users,dc=example,dc=com. Default empty
- ldap_username_attribute: str¶
Username attribute for searches. Default “cn”.
- normalize_email: bool¶
- normalize_username: bool¶
- user_editable_fields: List[str]¶
- class crossauth_backend.LocalPasswordAuthenticator(user_storage: UserStorage, options: LocalPasswordAuthenticatorOptions = {})[source]¶
Bases:
PasswordAuthenticator- NoPassword = '********'¶
- async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
Authenticates the user, returning a the user as a {@link User} object.
If you set extraFields when constructing the {@link UserStorage} instance passed to the constructor, these will be included in the returned User object. hashedPassword, if present in the User object, will be removed.
:param user the username field should contain the username :param secrets from the UserSecrets table. password is expected to be present :param params the user input. password is expected to be present :raises
crossauth_backend.CrossauthErrorwithcrossauth_backend.ErrorCodeof Connection, UserNotExist`or `PasswordInvalid, TwoFactorIncomplete, EmailNotVerified or UserNotActive.
- async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
Does nothing for this class.
- async create_password_for_storage(password: str) str[source]¶
Just calls createPasswordHash with encode set to true @param password the password to hash @returns a string for storing in storage
- async create_password_hash(password: str, salt: str | None = None, encode: bool = True) str[source]¶
Creates and returns a hash of the passed password, with the hashing parameters encoded ready for storage.
If salt is not provided, a random one is created. If secret was passed to the constructor or in the .env, and enableSecretInPasswords was set to true, it is used as the pepper. used as the pepper.
- Parameters:
password – the password to hash
salt – the salt to use. If None, a random one will be generated.
encode – fi true, a hash suitable for DB storage is created (algorithm etc prefixed to the hash)
:return the encoded hash string.
- async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
- async password_matches_hash(password: str, passwordHash: str, secret: str | None = None)[source]¶
A static version of the password hasher, provided for convenience
:param password : unhashed password :param passwordHash : hashed password :param secret secret, if used when hashing passwords, or undefined if not :return true if match, false otherwise
- async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
Does nothing for this class
- async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
Does nothing for this class
- skip_email_verification_on_signup() bool[source]¶
false, if email verification is enabled, it should be for this authenticator too
- validate_secrets(params: AuthenticationParameters) List[str][source]¶
Calls the implementor-provided validatePasswordFn
This function is called to apply local password policy (password length, uppercase/lowercase etc)
:param params the password should be in password
:return an array of errors
- class crossauth_backend.LocalPasswordAuthenticatorOptions[source]¶
Bases:
AuthenticationOptionsOptional parameters for :class: LocalPasswordAuthenticator.
See :func: LocalPasswordAuthenticator__init__ for details
- enable_secret_for_password_hash: bool¶
If true, the secret will be concatenated to the salt when generating a hash for storing the password
- friendly_name: str¶
- pbkdf2_digest: str¶
Digest method for PBKDF2 hasher.. Default sha256
- pbkdf2_iterations: int¶
Number of PBKDF2 iterations. Default 600_000
- pbkdf2_key_length: int¶
Length the PBKDF2 key to generate, before bsae64-url encoding. Default 32
- pbkdf2_salt_length: int¶
Number of characters for salt, before base64-enoding. Default 16
- secret: str¶
Application secret. If defined, it is used as the secret in PBKDF2 to hash passwords
- validate_password_fn: Callable[[AuthenticationParameters], List[str]]¶
Function that throws a {@link @crossauth/common!CrossauthError} with
crossauth_backend.CrossauthErrorwithcrossauth_backend.ErrorCodePasswordFormat if the password
doesn’t confirm to local rules (eg number of charafters) */
- class crossauth_backend.OAuthAuthorizationStorage(options: OAuthAuthorizationStorageOptions = {})[source]¶
Bases:
ABCBase class for storing scopes that have been authorized by a user (or for client credentials, for a client).
This class is subclassed for various types of storage. For example, PrismaOAuthAuthorizationStorage is for storing in a database table, managed by the Prisma ORM.
- abstractmethod async get_authorizations(client_id: str, userid: str | int | None = None) List[str | None][source]¶
Returns the matching all scopes authorized for the given client and optionally user.
- Parameters:
client_id (str) – the client_id to look up
userid (int|str|None) – the userid to look up, None for a client authorization not user authorization
- Returns:
The authorized scopes as a list.
- abstractmethod async update_authorizations(client_id: str, userid: str | int | None, authorizations: List[str | None]) None[source]¶
Saves a new set of authorizations for the given client and optionally user.
Deletes the old ones.
- Parameters:
client_id (str) – the client_id to look up
userid (str|int|None) – the userid to look up, None for a client authorization not user authorization
List[str|None]authorizations – new set of authorized scopes, which may be empty
- class crossauth_backend.OAuthClient[source]¶
Bases:
TypedDictOAuth client data as stored in a database table
- client_id: str¶
The client_id, which is auto-generated and immutable
- client_name: str¶
A user-friendly name for the client (not used as part of the OAuth API).
- client_secret: NotRequired[str | None]¶
Client secret, which is autogenerated.
If there is no client secret, it should be set to undefined.
This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the secret to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or undefined.
- confidential: bool¶
Whether or not the client is confidential (and can therefore keep the client secret secret)
- redirect_uri: List[str]¶
An array of value redirect URIs for the client.
- userid: NotRequired[str | int | None]¶
ID of the user who owns this client, which may be undefined for not being owned by a specific user.
This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the user ID to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or number (depending on the ID type in your user storage) or undefined.
- valid_flow: List[str]¶
An array of OAuth flows allowed for this client.
See
OAuthFlows.
- class crossauth_backend.OAuthClientStorage(options: OAuthClientStorageOptions = {})[source]¶
Bases:
ABCBase class for storing OAuth clients.
This class is subclassed for various types of client storage. For example, PrismaOAuthClientStorage is for storing clients in a database table, managed by the Prisma ORM.
- abstractmethod async create_client(client: OAuthClient) OAuthClient[source]¶
Creates and returns a new client with random ID and optionally secret.
Saves in the database.
- Parameters:
client (crossauth_backend.OAuthClient) – the client to save.
- Returns:
The new client.
- abstractmethod async delete_client(client_id: str) None[source]¶
Deletes a key from storage.
- Parameters:
client_id (str) – the client to delete
- abstractmethod async get_client_by_id(client_id: str) OAuthClient[source]¶
Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.
- Parameters:
client_id (str) – the client_id to look up
- Returns:
he matching OAuthClient object.
- abstractmethod async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.
- Parameters:
name (str) – the client name to look up
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.
- Returns:
A list of OAuthClient objects.
:raises
crossauth_backend.CrossauthError: withErrorCodeof ‘InvalidSessionId’ if a match was not found in session storage.
- abstractmethod async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns all clients in alphabetical order of client name.
- Parameters:
skip (int|None) – skip this number of records from the start in alphabetical order
take (int|None) – return at most this number of records
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.
- Returns:
A list of OAuthClient objects.
- abstractmethod async update_client(client: PartialOAuthClient) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.
- Parameters:
client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)
:raises
crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.
- class crossauth_backend.OAuthTokenConsumer(audience: str, options: OAuthTokenConsumerOptions = {})[source]¶
Bases:
objectThis abstract class is for validating OAuth JWTs.
- property audience¶
- property auth_server_base_url¶
- property keys¶
- async load_config(oidc_config: Dict[str, Any] | None = None)[source]¶
Loads OpenID Connect configuration, or fetches it from the authorization server (using the well-known enpoint appended to authServerBaseUrl ) :param Dict[str, Any]|None oidcConfig: the configuration, or undefined to load it from
the authorization server
- :raises
common!CrossauthError: object withErrorCodeof Connection if the fetch to the authorization server failed.
- :raises
- async load_jwks(jwks: Keys | None = None)[source]¶
Loads the JWT signature validation keys, or fetches them from the authorization server (using the URL in the OIDC configuration). :param Dict[str, Any]|None jwks: the keys to load, or undefined to fetch them from
the authorization server.
- :raises
crossauth_backend.CrossauthError: object withErrorCodeof Connection if the fetch to the authorization server failed, the OIDC configuration wasn’t set or the keys could not be parsed.
- :raises
- async load_keys()[source]¶
The RSA public keys or symmetric keys for the authorization server, either passed to the constructor or fetched from the authorization server.
- property oidc_config¶
- async token_authorized(token: str, tokenType: Literal['access', 'refresh', 'id']) Dict[str, Any] | None[source]¶
If the given token is valid, the payload is returned. Otherwise undefined is returned.
The signature must be valid, the expiry must not have passed and, if tokenType is defined,. the type claim in the payload must match it.
Doesn’t throw exceptions.
- Parameters:
token (str) – The token to validate
token_type (Literal["access", "refresh", "id"]) – If defined, the type claim in the payload must match this value
- class crossauth_backend.OAuthTokenConsumerOptions[source]¶
Bases:
TypedDictOptions that can be passed to:
OAuthTokenConsumerBase.- audience: str¶
The aud claim needs to match self value. No default (required)
- auth_server_base_url: str¶
The value to expect in the iss claim. If the iss does not match self, the token is rejected. No default (required)
- clock_tolerance: int¶
Number of seconds tolerance when checking expiration. Default 10
- jwt_key_type: str¶
Secret key if using a symmetric cipher for signing the JWT. Either this or jwt_secret_key_file is required when using self kind of cipher
- jwt_public_key: str¶
The public key if using a public key cipher for signing the JWT. Either this or jwt_public_key_file is required when using self kind of cipher. privateKey or privateKeyFile is also required.
- jwt_public_key_file: str¶
Filename for the public key if using a public key cipher for signing the JWT. Either self or jwt_public_key is required when using self kind of cipher. privateKey or privateKeyFile is also required.
- jwt_secret_key: str¶
Secret key if using a symmetric cipher for signing the JWT. Either this or jwt_secret_key_file is required when using self kind of cipher
- jwt_secret_key_file: str¶
Filename with secret key if using a symmetric cipher for signing the JWT. Either self or jwt_secret_key is required when using self kind of cipher
- key_storage: KeyStorage | None¶
If persisting tokens, you need to provide a storage to persist them to
- oidc_config: OpenIdConfiguration | Dict[str, Any] | None¶
For initializing the token consumer with a static OpenID Connect configuration.
- persist_access_token: bool¶
Whether to persist access tokens in key storage. Default false.
If you set self to True, you must also set key_storage.
- class crossauth_backend.OidcAuthenticator(options: OidcAuthenticatorOptions = {})[source]¶
Bases:
AuthenticatorOIDC AUthenticator class
- async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
Authenticates the user, returning a the user as a {@link User} object.
If you set extraFields when constructing the {@link UserStorage} instance passed to the constructor, these will be included in the returned User object. hashedPassword, if present in the User object, will be removed.
:param user the username field should contain the username :param secrets from the UserSecrets table. password is expected to be present :param params the user input. password is expected to be present :raise :class: croassuth_backend.CrossauthError with
- class:
crossauth_backend.ErrorCode of`Connection`,
UserNotExist`or `PasswordInvalid, TwoFactorIncomplete, EmailNotVerified or UserNotActive.
- async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
Does nothing for this class.
- async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
This will return p hash of the passed password. :param _username ignored :param params expected to contain password :param repeat_params if defined, this is expected to also contain
password and is checked to match the one in params
:return the newly created password in the password field.
- async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
Does nothing for this class.
- async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
Does nothing for this class.
- skip_email_verification_on_signup() bool[source]¶
@returns false, if email verification is enabled, it should be for this authenticator too
- validate_secrets(params: AuthenticationParameters) List[str][source]¶
Does nothing for this class
- class crossauth_backend.OidcAuthenticatorOptions[source]¶
Bases:
AuthenticationOptions- friendly_name: str¶
- class crossauth_backend.OpenIdConfiguration[source]¶
Bases:
TypedDictThis class encapsulate the data returned by the oidc-configuration well-known endpoint. For further details, see the OpenID Connect specification.
- acr_values_supported: NotRequired[list[str]]¶
- authorization_endpoint: str¶
- check_session_iframe: NotRequired[str]¶
- claim_types_supported: NotRequired[list[ClaimType]]¶
- claims_locales_supported: NotRequired[list[str]]¶
- claims_parameter_supported: NotRequired[bool]¶
- claims_supported: NotRequired[list[str]]¶
- display_values_supported: NotRequired[list[str]]¶
- end_session_endpoint: NotRequired[str]¶
- grant_types_supported: list[GrantType]¶
- id_token_encryption_alg_values_supported: NotRequired[list[str]]¶
- id_token_encryption_enc_values_supported: NotRequired[list[str]]¶
- id_token_signing_alg_values_supported: list[str]¶
- issuer: str¶
- jwks_uri: str¶
- op_policy_uri: NotRequired[str]¶
- op_tos_uri: NotRequired[str]¶
- registration_endpoint: NotRequired[str]¶
- request_object_encryption_alg_values_supported: NotRequired[list[str]]¶
- request_object_encryption_enc_values_supported: NotRequired[list[str]]¶
- request_object_signing_alg_values_supported: NotRequired[list[str]]¶
- request_parameter_supported: NotRequired[bool]¶
- request_uri_parameter_supported: NotRequired[bool]¶
- require_request_uri_registration: NotRequired[bool]¶
- response_modes_supported: list[ResponseMode]¶
- response_types_supported: list[str]¶
- scopes_supported: NotRequired[list[str]]¶
- service_documentation: NotRequired[str]¶
- subject_types_supported: list[SubjectType]¶
- token_endpoint: str¶
- token_endpoint_auth_methods_supported: NotRequired[list[TokenEndpointAuthMethod]]¶
- token_endpoint_auth_signing_alg_values_supported: NotRequired[list[str]]¶
- ui_locales_supported: NotRequired[list[str]]¶
- userinfo_encryption_alg_values_supported: NotRequired[list[str]]¶
- userinfo_encryption_enc_values_supported: NotRequired[list[str]]¶
- userinfo_endpoint: NotRequired[str]¶
- userinfo_signing_alg_values_supported: NotRequired[list[str]]¶
- class crossauth_backend.ParamType(*values)[source]¶
Bases:
EnumType of parameter that can be parsed from an option value or environment variable
- Boolean = 4¶
- Integer = 3¶
- Json = 5¶
- JsonArray = 6¶
- Number = 2¶
- String = 1¶
- class crossauth_backend.PartialKey[source]¶
Bases:
TypedDictSame as
crossauth_backend.Keybut all fields are NotRequired- created: datetime¶
- data: str | None¶
- expires: datetime | NullType¶
- lastactive: datetime | None¶
- userid: str | int | NullType | None¶
- value: str¶
- class crossauth_backend.PartialOAuthClient[source]¶
Bases:
TypedDictSame as OAuthClient but for partial updates
- client_id: str¶
The client_id, which is auto-generated and immutable
- client_name: str¶
A user-friendly name for the client (not used as part of the OAuth API).
- client_secret: NotRequired[str | NullType]¶
Client secret, which is autogenerated.
If there is no client secret, it should be set to undefined.
This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the secret to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or undefined.
- confidential: bool¶
Whether or not the client is confidential (and can therefore keep the client secret secret)
- redirect_uri: List[str]¶
An array of value redirect URIs for the client.
- userid: NotRequired[str | int | NullType]¶
ID of the user who owns this client, which may be undefined for not being owned by a specific user.
This field allows null as well as undefined this is used, for example, when partially updating a client and you specifically want to set the user ID to undefined, as opposed to just not wishing to change the value. Other than that, this value is always either a str or number (depending on the ID type in your user storage) or undefined.
- valid_flow: List[str]¶
An array of OAuth flows allowed for this client.
See
OAuthFlows.
- class crossauth_backend.PartialUser[source]¶
Bases:
PartialUserInputFieldsSame as User but all fields are not required
- admin: NotRequired[bool]¶
- email: NotRequired[str]¶
- email_normalized: NotRequired[str]¶
Email lowercased and non language-normalized
- factor1: NotRequired[str]¶
- factor2: NotRequired[str]¶
- id: str | int¶
- state: str¶
- username: str¶
- username_normalized: str¶
Username lowercased and non language-normalized
- class crossauth_backend.PartialUserInputFields[source]¶
Bases:
TypedDictSame as UserInputFields but all fields are not required
- admin: NotRequired[bool]¶
- email: NotRequired[str]¶
- factor1: NotRequired[str]¶
- factor2: NotRequired[str]¶
- state: str¶
- username: str¶
- class crossauth_backend.PartialUserSecrets[source]¶
Bases:
UserSecretsInputFieldsSame as UserSecrets except all fields are NotRequired
- expiry: int¶
- otp: str¶
- password: str¶
- totpsecret: str¶
- userid: str | int¶
- class crossauth_backend.PasswordAuthenticator(options: AuthenticationOptions = {})[source]¶
Bases:
Authenticatorbase class for authenticators that validate passwords
- class crossauth_backend.SMSUser[source]¶
Bases:
UserInputFields- admin: NotRequired[bool]¶
- email: NotRequired[str]¶
- factor1: str¶
- factor2: NotRequired[str]¶
- phone: str¶
- state: str¶
- username: str¶
- class crossauth_backend.SessionCookie(key_storage: KeyStorage, options: SessionCookieOptions = {})[source]¶
Bases:
objectClass for session management using a session id cookie.
- property cookie_name¶
- async create_session_key(userid: str | int | None, extra_fields: Mapping[str, Any] = {}) Key[source]¶
Creates a session key and saves in storage
Date created is the current date/time on the server.
In the unlikely event of the key already existing, it is retried up to 10 times before throwing an error with ErrorCode.KeyExists
- Parameters:
userid (str | int | None) – the user ID to store with the session key.
extra_fields (Dict[str, Any]|None) – Any fields in here will also be added to the session record
- Returns:
the new session key
- :raises
crossauth_backend.CrossauthError: with ErrorCodeKeyExists if maximumattempts exceeded trying to create a unique session id
- async delete_all_for_user(userid: str | int, except_key: str | None = None) None[source]¶
Deletes all keys for the given user :param str|int userid: the user to delete keys for :param str|None except_key: if defined, don’t delete this key
- property domain¶
- async get_session_key(session_id: str) Key[source]¶
Returns the user matching the given session key in session storage, or throws an exception.
Looks the user up in the
UserStorageinstance passed to the constructor.Undefined will also fail is CookieAuthOptions.filterFunction is defined and returns false,
- Parameters:
session_id (str) – the unsigned value of the session cookie
- Returns:
a
crossauth_backend.Userobject, with the password hash removed.
:raises
crossauth_backend.CrossauthError: withErrorCodeset to InvalidSessionId, Expired or UserNotExist.
- async get_user_for_session_id(session_id: str, options: UserStorageGetOptions = {}) UserAndKey[source]¶
Returns the user matching the given session key in session storage, or throws an exception.
Looks the user up in the
crossauth_backend.UserStorageinstance passed to the constructor.Undefined will also fail is CookieAuthOptions.filterFunction is defined and returns false,
- Parameters:
session_id (str) – the value in the session cookie
options (crossauth_backend.UserStorageGetOptions) – See
crossauth_backend.UserStorageGetOptions
- Returns:
a
crossauth_backend.Userobject, with the password hash removed, and the:class:crossauth_backend.Key with the unhashed session_id
:raises
crossauth_backend.CrossauthError: withErrorCodeset to InvalidSessionId or Expired.
- static hash_session_id(session_id: str) str[source]¶
Returns a hash of a session ID, with the session ID prefix for storing in the storage table. :param str session_id the session ID to hash
- Returns:
a base64-url-encoded string that can go into the storage
- property httpOnly¶
- property idle_timeout¶
- make_cookie(session_key: Key, persist: bool | None = None) Cookie[source]¶
Returns a
Cookieobject with the given session key.This class is compatible, for example, with Express.
- Parameters:
session_key (crossauth_backend.Key) – the value of the session key
persist (bool|None) – if passed, overrides the persistSessionId setting
- Returns:
a
Cookieobject,
- make_cookie_string(cookie: Cookie) str[source]¶
Takes a session ID and creates a string representation of the cookie (value of the HTTP Cookie header).
- Parameters:
cookie (Cookie) – the cookie vlaues to make a string from
- Returns:
a string representation of the cookie and options.
- property maxAge¶
- property path¶
- property sameSite¶
- property secure¶
- unsign_cookie(cookie_value: str) str[source]¶
Unsigns a cookie and returns the original value. :param str cookie_value: the signed cookie value
- Returns:
the unsigned value
:raises
crossauth_backend.CrossauthError: if the signature is invalid.
- async update_session_key(session_key: PartialKey) None[source]¶
Updates a session record in storage :param crossauth_backend.PartialKey session_key: the fields to update. value must be set, and will not be updated. All other defined fields will be updated.
:raises
crossauth_backend.CrossauthError: if the session does not exist.
- class crossauth_backend.SessionCookieOptions[source]¶
Bases:
CookieOptions,TokenEmailerOptionsOptions for double-submit csrf tokens
- cookie_name: str¶
Name of cookie. Defaults to “CSRFTOKEN”
- domain: str¶
- email_from: NotRequired[str]¶
- email_verification_html_body: NotRequired[str]¶
- email_verification_subject: NotRequired[str]¶
- email_verification_text_body: NotRequired[str]¶
- expires: datetime¶
- filter_function: Callable[[Key], bool]¶
self will be called with the session key to filter sessions before returning. Function should return true if the session is valid or false otherwise.
- hash_session_id: bool¶
If true, session IDs are stored in hashed form in the key storage. Default False.
- httpOnly: bool¶
- idle_timeout: int¶
If non zero, sessions will time out after self number of seconds have elapsed without activity. Default 0 (no timeout)
- maxAge: int¶
- password_reset_expires: NotRequired[int]¶
- password_reset_html_body: NotRequired[str]¶
- password_reset_subject: NotRequired[str]¶
- password_reset_text_body: NotRequired[str]¶
- path: str¶
- persist: bool¶
If true, sessions cookies will be persisted between browser sessions. Default True
- prefix: NotRequired[str]¶
- render: NotRequired[Callable[[str, Dict[str, Any]], str]]¶
- sameSite: bool | Literal['lax', 'strict', 'none']¶
- secret: str¶
App secret
- secure: bool¶
- site_url: NotRequired[str]¶
- smtp_host: NotRequired[str]¶
- smtp_password: NotRequired[str]¶
- smtp_port: NotRequired[int]¶
- smtp_use_tls: NotRequired[bool]¶
- smtp_username: NotRequired[str]¶
- user_storage: UserStorage¶
If user login is enabled, you must provide the user storage class
- verify_email_expires: NotRequired[int]¶
- views: NotRequired[str]¶
- class crossauth_backend.SessionManager(key_storage: KeyStorage, authenticators: Mapping[str, Authenticator], options: SessionManagerOptions = {})[source]¶
Bases:
objectClass for managing sessions.
- property allowed_factor2¶
- async apply_email_verification_token(token: str) User[source]¶
Takes an email verification token as input and applies it to the user storage.
The state is reset to active. If the token was for changing the password, the new password is saved to the user in user storage.
- Parameters:
token – the token to apply
- Returns:
the new user record
- property authenticators¶
- async cancel_two_factor_page_visit(session_id: str) Dict[str, Any][source]¶
Cancels the 2FA that was previously initiated but not completed..
If successful, returns. Otherwise an exception is thrown.
- Parameters:
session_id – the session id (unsigned)
- :return
Dict[str, Any]: the 2FA data that was created on initiation
- :raise
CrossauthError: of Unauthorized if 2FA was not initiated.
- async change_secrets(username: str, factor_number: int, new_params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None, old_params: AuthenticationParameters | None = None) User[source]¶
- async complete_two_factor_login(params: AuthenticationParameters, session_id: str, extra_fields: Mapping[str, Any] = {}, persist: bool = False) SessionTokens[source]¶
Performs the second factor authentication as the second step of the login process
If authentication is successful, the user’s state will be set to active and a new session will be created, bound to the user. The anonymous session will be deleted.
- Parameters:
params – the user-provided parameters to authenticate with (eg TOTP code).
session_id – the user’s anonymous session
extra_fields – extra fields to add to the user-bound new session table entry
persist – if true, the cookie will be perstisted (with an expiry value); otberwise it will be a session-only cookie.
- Return AuthResult containing:
session_cookie: the new session cookie csrf_cookie: the new CSRF cookie csrf_form_or_header_value: the new CSRF token corresponding to the cookie user: the newly-logged in user.
- async complete_two_factor_page_visit(params: AuthenticationParameters, session_id: str)[source]¶
Completes 2FA when visiting a protected page.
If successful, returns. Otherwise an exception is thrown.
- Parameters:
params – the parameters from user input needed to authenticate (eg TOTP code). Passed to the authenticator
session_id – the session cookie value (ie still signed)
- :raise
CrossauthError: if authentication fails.
- async complete_two_factor_setup(params: AuthenticationParameters, session_id: str) User[source]¶
Authenticates with the second factor.
If successful, the new user object is returned. Otherwise an exception is thrown, :param params the parameters from user input needed to authenticate (eg TOTP code) :param session_id the session cookie value (ie still signed) :return the user object :raise CrossauthError if authentication fails.
- async create_anonymous_session(extra_fields: Mapping[str, Any] | None = None) SessionTokens[source]¶
- async create_csrf_form_or_header_value(csrf_cookie_value: str)[source]¶
Validates the signature on the CSRF cookie value and returns a value that can be put in the form or CSRF header value.
- Parameters:
csrf_cookie_value (str) – the value from the CSRF cookie
- Returns:
the value to put in the form or CSRF header
- async create_csrf_token() Csrf[source]¶
Creates and returns a signed CSRF token based on the session ID
- Returns:
a CSRF cookie and value to put in the form or CSRF header
- async create_user(user: UserInputFields, params: UserSecrets, repeat_params: UserSecrets | None = None, skip_email_verification: bool = False, empty_password: bool = False) User[source]¶
- property csrf_cookie_name¶
Returns the name used for CSRF token cookies.
- property csrf_cookie_path¶
Returns the name used for CSRF token cookies.
- property csrf_header_name¶
Returns the name used for CSRF token cookies
- property csrf_tokens¶
- async data_for_session_id(session_id: str) Dict[str, Any] | None[source]¶
Returns the data object for a session id, or undefined, as an object.
If the user is undefined, or the key has expired, returns undefined.
- Parameters:
session_id (str) – the session key to look up in session storage
- Returns:
a string from the data field
- :raise
crossauth_backend.CrossauthError: with ErrorCodeof Connection, InvalidSessionId UserNotExist or Expired.
- async data_string_for_session_id(session_id: str) str | None[source]¶
Returns the data object for a session key, or undefined, as a JSON string (which is how it is stored in the session table)
If the user is undefined, or the key has expired, returns undefined.
- Parameters:
session_id (str) – the session id to look up in session storage
- Returns:
a string from the data field
- :raise
crossauth_backend.CrossauthError: with ErrorCodeof Connection, InvalidSessionId UserNotExist or Expired.
- async delete_session(session_id: str) None[source]¶
Deletes the given session ID from the key storage (not the cookie)
- Parameters:
session_id (str) – the session Id to delete
- async delete_session_data(session_id: str, name: str) None[source]¶
Deletes a field from the session data.
The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id; the session Id to update.
- property email_token_storage¶
- get_session_id(session_cookie_value: str)[source]¶
Returns the session ID from the signed session cookie value
- Parameters:
session_cookie_value (str) – value from the session ID cookie
- Returns:
the usigned cookie value.
- :raises
crossauth_backend.CrossauthErrorwith InvalidKey if the signature is invalid.
- async initiate_two_factor_login(user: User) SessionTokens[source]¶
Initiates the two factor login process.
Creates an anonymous session and corresponding CSRF token
- Parameters:
user – the user, which should already have been authenticated with factor1
- Returns:
a new anonymous session cookie and corresponding CSRF cookie and token.
- async initiate_two_factor_page_visit(user: User, session_id: str, request_body: Mapping[str, Any], url: str | None = None, content_type: str | None = None) SessionTokens[source]¶
Initiates the two factor process when visiting a protected page.
Creates an anonymous session and corresponding CSRF token
- Parameters:
user – the user, which should already have been authenticated with factor1
session_id – the logged in session associated with the user
request_body – the parameters from the request made before being redirected to factor2 authentication
url – the requested url, including path and query parameters content_type: optional content type from the request
- :return
If a token was passed a new anonymous session cookie and corresponding CSRF cookie and token.
- async initiate_two_factor_setup(user: User, new_factor2: str | None, session_id: str) Mapping[str, Any][source]¶
Begins the process of setting up 2FA for a user which has already been created and activated. Called when changing 2FA or changing its parameters.
- Parameters:
user – the logged in user
new_factor2 – new second factor to change user to
session_id – the session cookie for the user
- :return
the 2FA data that can be displayed to the user in the configure 2FA step (such as the secret and QR code for TOTP).
- async initiate_two_factor_signup(user: UserInputFields, params: UserSecrets, session_id: str, repeat_params: UserSecrets | None) UserIdAndData[source]¶
Creates a user with 2FA enabled.
The user storage entry will be createed, with the state set to awaitingtwofactorsetup or awaitingtwofactorsetupandemailverification. The passed session key will be updated to include the username and details needed by 2FA during the configure step.
- Parameters:
user – details to save in the user table
params – params the parameters needed to authenticate with factor1 (eg password)
session_id – the anonymous session cookie
repeat_params – if passed, these will be compared with params and if they don’t match, PasswordMatch is thrown.
- Returns:
- Dict containing:
userid: the id of the created user. userData: data that can be displayed to the user in the page to
complete 2FA set up (eg the secret key and QR codee for TOTP),
- property key_storage¶
- async login(username: str, params: AuthenticationParameters, extra_fields: Mapping[str, Any] = {}, persist: bool = False, user: User | None = None, bypass_2fa: bool = False) SessionTokens[source]¶
Performs a user login
Authenticates the username and password
Creates a session key - if 2FA is enabled, this is an anonymous session, otherwise it is bound to the user
Returns the user (without the password hash) and the session cookie.
If the user object is defined, authentication (and 2FA) is bypassed
- Parameters:
username – the username to validate
params – user-provided credentials (eg password) to authenticate with
extra_fields – add these extra fields to the session key if authentication is successful
persist – if passed, overrides the persistSessionId setting.
user –
- if this is defined, the username and password are ignored and the given user is logged in.
The 2FA step is also skipped
bypass2FA: if true, the 2FA step will be skipped
- :return
Dict containing the user, user secrets, and session cookie and CSRF cookie and token. if a 2fa step is needed, it will be an anonymouos session, otherwise bound to the user
- Raise:
- CrossauthError: with ErrorCode of Connection, UserNotValid,
PasswordNotMatch or UserNotExist.
- async repeat_two_factor_signup(session_id: str) UserIdAndData[source]¶
This can be called if the user has finished signing up with factor1 but closed the browser before completing factor2 setup. Call it if the user signs up again with the same factor1 credentials.
- Parameters:
session_id – the anonymous session ID for the user
- Returns:
- Dict containing:
userid: the id of the created user userData: data that can be displayed to the user in the page to
complete 2FA set up (eg the secret key and QR code for TOTP),
- secrets: data that is saved in the session for factor2. In the
case of TOTP, both userData and secrets contain the shared secret but only userData has the QR code, since it can be generated from the shared secret.
- async request_password_reset(email: str) None[source]¶
Sends a password reset token :param email: the user’s email (where the token will be sent)
- async reset_secret(token: str, factror_number: int, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) User[source]¶
Resets the secret for factor1 or 2 (eg reset password)
- Parameters:
token – the reset password token that was emailed
factror_number – which factor to reset (1 or 2)
params – the new secrets entered by the user (eg new password)
repeat_params – optionally, repeat of the secrets. If passed, an exception will be thrown if they do not match
:return the user object
- Raises:
CrossauthError – if the repeat_params don’t match params, the token is invalid or the user storage cannot be updated.
- property session¶
- property session_cookie_name¶
Returns the name used for session ID cookies.
- property session_cookie_path¶
Returns the name used for session ID cookies
- async update_many_session_data(session_id: str, data_array: List[KeyDataEntry]) None[source]¶
Update field sin the session data.
The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param List[crossauth_backend.KeyDataEntry] data_array: names and values.
- async update_session_activity(session_id: str)[source]¶
If session_idle_timeout is set, update the last activcity time in key storage to current time.
- Parameters:
session_id (str) – the session Id to update.
- async update_session_data(session_id: str, name: str, value: Mapping[str, Any]) None[source]¶
Update a field in the session data.
The data field in the session entry is assumed to be a JSON string. The field with the given name is updated or set if not already set. :param str session_id: the session Id to update. :param str name: of the field. :param Mapping[str, Any] value: new value to store
- async update_user(current_user: User, new_user: User, skip_email_verification: bool = False, as_admin: bool = False) TokensSent[source]¶
Updates a user entry in storage :param current_user the current user details :param new_user the new user details :return dict with emailVerificationTokenSent and passwordResetTokenSent booleans
- async user_for_password_reset_token(token: str) User[source]¶
Returns the user associated with a password reset token
- Parameters:
token – the token that was emailed
- Returns:
the user
- Raises:
CrossauthError – if the token is not valid.
- property user_storage¶
- validate_csrf_cookie(csrf_cookie_value: str)[source]¶
Throws
crossauth_backend.CrossauthErrorwith InvalidKey if the passed CSRF cookie value is not valid (ie invalid signature) :param str csrf_cookie_value: the CSRF cookie value
- validate_double_submit_csrf_token(csrf_cookie_value: str, csrf_form_or_header_value: str)[source]¶
Throws
crossauth_backend.CrossauthErrorwith InvalidKey if the passed CSRF token is not valid for the given session ID. Otherwise returns without error- Parameters:
strcsrf_cookie_value – the CSRF cookie value
csrf_form_or_header_value (str) – the value from the form field or CSRF header
- class crossauth_backend.SessionManagerOptions[source]¶
Bases:
TokenEmailerOptionsOptions for SessionManager
- allowed_factor2: List[str]¶
Set of 2FA factor names a user is allowed to set.
The name corresponds to the key you give when adding authenticators. See authenticators in SessionManager.constructor.
- double_submit_cookie_options: DoubleSubmitCsrfTokenOptions¶
Options for csrf cookie manager
- email_from: NotRequired[str]¶
- email_token_storage: KeyStorage¶
Store for password reset and email verification tokens. If not passed, the same store as for sessions is used.
- email_verification_html_body: NotRequired[str]¶
- email_verification_subject: NotRequired[str]¶
- email_verification_text_body: NotRequired[str]¶
- enable_email_verification: bool¶
If true, users will have to verify their email address before account is created or when changing their email address. See class description for details. Default True
- enable_password_reset: bool¶
If true, allow password reset by email token. See class description for details. Default True
- password_reset_expires: NotRequired[int]¶
- password_reset_html_body: NotRequired[str]¶
- password_reset_subject: NotRequired[str]¶
- password_reset_text_body: NotRequired[str]¶
- prefix: NotRequired[str]¶
- render: NotRequired[Callable[[str, Dict[str, Any]], str]]¶
- secret: str¶
Server secret. Needed for emailing tokens and for csrf tokens
- session_cookie_options: SessionCookieOptions¶
options for session cookie manager
- site_url: str¶
Base URL for the site.
This is used when constructing URLs, eg for sending password reset tokens.
- smtp_host: NotRequired[str]¶
- smtp_password: NotRequired[str]¶
- smtp_port: NotRequired[int]¶
- smtp_use_tls: NotRequired[bool]¶
- smtp_username: NotRequired[str]¶
- user_storage: UserStorage¶
If user login is enabled, you must provide the object where users are stored.
- verify_email_expires: NotRequired[int]¶
- views: NotRequired[str]¶
- class crossauth_backend.SmsAuthenticator(options: SmsAuthenticatorOptions = {})[source]¶
Bases:
AuthenticatorAbstract base class for sending OTP by SMS
- async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
Authenticates the user by comparing the user-provided otp with the one in secrets.
Validation fails if the otp is incorrect or has expired.
- Parameters:
_user – ignored
secrets – taken from the session and should contain otp and expiry
params – user input and should contain otp
- Raises:
CrossauthError – with ErrorCode InvalidToken or Expired.
- async create_one_time_secrets(user: User) Dict[str, Any][source]¶
Creates and sends a new one-time code.
- Parameters:
user – the user to create it for. Uses the phone field which should start with +
- Returns:
otp and expiry as a Unix time (number).
- async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) Dict[str, Any][source]¶
Does nothing for this class
- static is_phone_valid(number: str) bool[source]¶
Returns whether or not the passed phone number has a valid form.
- Parameters:
number – the phone number to validate
- Returns:
true if it is valid, false otherwise
- async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
Creates and sends the one-time code
- Parameters:
user – the user to create it for. Uses the phone field which is expected to be a phone number starting with +
- Returns:
userData containing username, phone, factor2 sessionData containing the same plus otp and expiry which
is a Unix time (number).
- async reprepare_configuration(username: str, session_key: Key) Dict[str, Dict[str, Any] | None] | None[source]¶
Creates and sends a new one-time code.
- Parameters:
_username – ignored
session_key – the session containing the previously created data.
- Returns:
Dictionary containing userData, secrets, and newSessionData
- skip_email_verification_on_signup() bool[source]¶
- Returns:
false - doesn’t replace email verification
- static validate_phone(number: str | None) None[source]¶
Throws an exception if a phone number doesn’t have a valid form.
It must start with a + and be 8 to 15 digits
- Parameters:
number – the phone number to validate
- Raises:
CrossauthError – with ErrorCode InvalidPhoneNumber.
- validate_secrets(params: AuthenticationParameters) List[str][source]¶
Does nothing for this class
- class crossauth_backend.SmsAuthenticatorOptions[source]¶
Bases:
AuthenticationOptionsOptional parameters for :class: EmailAuthenticator.
See :func: EmailAuthenticator__init__ for details
- friendly_name: str¶
- render: Callable[[str, Dict[str, Any]], str]¶
if passed, use this instead of the default jinja2 renderer
- sms_authenticator_body: str¶
Template file containing text for producing SMS messages. Default smsauthenticationbody.njk
- sms_authenticator_from: str¶
Sender for SMSs
- sms_authenticator_token_expires: int¶
Number of seconds before otps should expire. Default 5 minutes
- views: str¶
The directory containing views (by default, Jinja2 templates)
- class crossauth_backend.SqlAlchemyKeyStorage(engine: AsyncEngine, options: SqlAlchemyKeyStorageOptions = {})[source]¶
Bases:
KeyStorage- async delete_all_for_user(userid: str | int | None, prefix: str, except_key: str | None = None) None[source]¶
Deletes all keys from storage for the given user ID
- Parameters:
userid (int|str|None) – User ID to delete keys for
prefix (str) – Only keys starting with this prefix will be deleted
except_key (str|None) – If defined, the key with this value will not be deleted
- async delete_data(key_name: str, data_name: str) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically delete a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to delete. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is deleted.
- async delete_key(value: str) None[source]¶
Deletes a key from storage (e.g., the database).
- Parameters:
value (str) – The key to delete
- async delete_matching(key: PartialKey) None[source]¶
Deletes all matching the given specs
- Parameters:
key (crossauth_backend.PartialKey) – Any key matching all defined values in this object will be deleted
- async get_all_for_user(userid: str | int | None = None) List[Key][source]¶
Return all keys matching the given user ID
- Parameters:
userid (str|int|None) – User to return keys for
- Returns:
List[Key]: An array of keys
- async get_key(key: str) Key[source]¶
Returns the matching key in the session storage or raises an exception if it doesn’t exist.
- Parameters:
key (ster) – The key to look up, as it will appear in this storage (typically unsigned, hashed)
- Returns:
The matching Key record.
- async save_key(userid: str | int | None, value: str, date_created: datetime, expires: datetime | None = None, data: str | None = None, extra_fields: Mapping[str, Any] | None = None) None[source]¶
Saves a session key in the session storage (e.g., database).
- Parameters:
userid (int|str|None) – The ID of the user. This matches the primary key in the UserStorage implementation.
value (str) – The key value to store.
date_created (datetime) – The date/time the key was created.
expires (datetime|None) – The date/time the key expires.
extra_fields (Mapping[str, Any]|None) – These will also be saved in the key record
- Padam str|None data:
An optional value, specific to the type of key, e.g., new email for email change tokens
- async update_data(key_name: str, data_name: str, value: Any) None[source]¶
The ‘data’ field in a key entry is a JSON string. This method should atomically update a field in it.
- Parameters:
key_name (str) – The name of the key to update, as it appears in the table.
data_name (str) – The field name to update. This can contain dots, e.g., ‘part1.part2’, which means ‘part2’ within ‘part1’ is updated.
value (Any|None) – The new value.
- async update_key(key: PartialKey) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, raise a CrossauthError with ErrorCode ‘InvalidKey’.
- Parameters:
key (crossauth_backend.PartialKey) – The fields defined in this will be updated. ‘id’ must be present and it will not be updated.
- async update_key_in_transaction(conn: AsyncConnection, key: PartialKey) None[source]¶
- async update_many_data(key_name: str, data_array: List[KeyDataEntry]) None[source]¶
Same as ‘update_data’ but updates several keys.
Ensure it is done as a single transaction.
- Parameters:
key_name (str) – The key to update
data_array (List[crossauth_backend.KeyDataEntry) – dataName and value pairs
- class crossauth_backend.SqlAlchemyKeyStorageOptions[source]¶
Bases:
TypedDictOptional parameters for :class: SqlAlchemyKeyStorage.
See :func: SqlAlchemyKeyStorage__init__ for details
- key_table: str¶
- userid_foreign_key_column: str¶
- class crossauth_backend.SqlAlchemyOAuthClientStorage(engine: AsyncEngine, options: SqlAlchemyOAuthClientStorageOptions = {})[source]¶
Bases:
OAuthClientStorage- async create_client(client: OAuthClient) OAuthClient[source]¶
Creates and returns a new client with random ID and optionally secret.
Saves in the database.
- Parameters:
client (crossauth_backend.OAuthClient) – the client to save.
- Returns:
The new client.
- async delete_client(client_id: str) None[source]¶
Deletes a key from storage.
- Parameters:
client_id (str) – the client to delete
- async get_client_by_id(client_id: str) OAuthClient[source]¶
Returns the matching client by its auto-generated id in the storage or throws an exception if it doesn’t exist.
- Parameters:
client_id (str) – the client_id to look up
- Returns:
he matching OAuthClient object.
- async get_client_by_name(name: str, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns the matching client in the storage by friendly name or throws an exception if it doesn’t exist.
- Parameters:
name (str) – the client name to look up
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients with this name.
- Returns:
A list of OAuthClient objects.
:raises
crossauth_backend.CrossauthError: withErrorCodeof ‘InvalidSessionId’ if a match was not found in session storage.
- async get_client_in_transaction(conn: AsyncConnection, field: str | None, value: str | None, userid: int | str | NullType | None = None, skip: int | None = None, take: int | None = None) List[OAuthClient][source]¶
- async get_clients(skip: int | None = None, take: int | None = None, userid: str | int | None | NullType = None) List[OAuthClient][source]¶
Returns all clients in alphabetical order of client name.
- Parameters:
skip (int|None) – skip this number of records from the start in alphabetical order
take (int|None) – return at most this number of records
userid (str|int|None) – if defined, only return clients belonging to this user. if None, return only clients with a null userid. if not provided, return all clients.
- Returns:
A list of OAuthClient objects.
- async update_client(client: PartialOAuthClient) None[source]¶
If the given session key exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ‘InvalidClient’.
- Parameters:
client (crossauth_backend.OAuthClient) – all fields to update (client_id must be set but will not be updated)
:raises
crossauth_backend.CrossauthError: with ‘InvalidClient’ if the client doesn’t exist.
- class crossauth_backend.SqlAlchemyOAuthClientStorageOptions[source]¶
Bases:
OAuthClientStorageOptionsOptional parameters for :class: SqlAlchemyUserStorage.
See :func: SqlAlchemyUserStorage__init__ for details
- client_table: str¶
Name of client table. Default OAuthClient
- redirect_uri_table: str¶
Name of the redirect uri table. Default OAuthClientRedirectUri.
- userid_foreign_key_column: str¶
Column name for the userid field in the client table. Default userid
- valid_flow_table: str¶
Name of the valid flows table. Default OAuthClientValidFlow
- class crossauth_backend.SqlAlchemyUserStorage(engine: AsyncEngine, options: SqlAlchemyUserStorageOptions = {})[source]¶
Bases:
UserStorage- async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]¶
Creates a user with the given details and secrets.
- Parameters:
user (crossauth_backend.UserInputFields) – will be put in the User table
secrets (crossauth_backend.UserSecretsInputFields|None) – will be put in the UserSecrets table
- Returns:
the new user as a User object
:raises
crossauth_backend.CrossauthErrorwithErrorCodeConfiguration
- async delete_user_by_id(id: str | int) None[source]¶
If the storage supports this, delete the user with the given ID from storage.
- Parameters:
id (str|int) – id of user to delete
- async delete_user_by_username(username: str) None[source]¶
If the storage supports this, delete the named user from storage.
- Parameters:
username (str) – username to delete
- async get_user_by(field: str, value: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given field, or throws an exception.
This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.
- Parameters:
field (str) – the field to match
value (str) – the value to match
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given email address, or throws an exception.
If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column
- Parameters:
email – the email address to return the user of
options – optionally turn off checks. Used internally
- Raises:
crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection
- async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given user id, or throws an exception.
Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.
- Parameters:
id (str|int) – the user id to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthErrorwithErrorCodeeither UserNotExist or Connection
- async get_user_by_in_transaction(conn: AsyncConnection, field: str, value: str | int) UserAndSecrets[source]¶
- async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given username, or throws an exception.
If normalize_username is true, the username should be matched normalized and lowercased (using normalize())
- Parameters:
username (str) – the username to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- async get_users(skip: int | None = None, take: int | None = None) List[User][source]¶
Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)
- Parameters:
skip (int|None) – skip this number of records from the start of the set
take (int|None) – only return at most this number of records
- Returns:
an array of User objects
- async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]¶
Updates an existing user with the given details and secrets.
If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.
- Parameters:
user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.
secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update
- class crossauth_backend.SqlAlchemyUserStorageOptions[source]¶
Bases:
UserStorageOptionsOptional parameters for :class: SqlAlchemyUserStorage.
See :func: SqlAlchemyUserStorage__init__ for details
- admin_editable_fields: List[str]¶
- force_id_to_number: bool¶
This works around a Fastify and Sveltekit limitation. If the id passed to getUserById() is a string but is numeric, first try forcing it to a number before selecting. If that fails, try it as the string, Default true.
- id_column: str¶
Name of the id column in the user table. Can be set to username if that is your primary key. Default id.
- joins: List[str]¶
Other tables to join. Default is [] (UserSecrets is alwayws joined)
- normalize_email: bool¶
- normalize_username: bool¶
- user_editable_fields: List[str]¶
- user_secrets_table: str¶
Name of user secrets table (Default UserSecrets
- user_table: str¶
Name of user table Default User
- userid_foreign_key_column: str¶
Name of the user id column in the user secrets. Default userid.
- class crossauth_backend.TokenBodyType[source]¶
Bases:
TypedDict- binding_code: str¶
- client_id: Required[str]¶
- client_secret: str¶
- code: str¶
- code_verifier: str¶
- device_code: str¶
- grant_type: Required[str]¶
- mfa_token: str¶
- oobCode: str¶
- otp: str¶
- password: str¶
- refresh_token: str¶
- scope: str¶
- username: str¶
- class crossauth_backend.TotpAuthenticator(app_name: str, options: AuthenticationOptions = {})[source]¶
Bases:
AuthenticatorThis authenticator sends a one-time code by email
- async authenticate_user(user: UserInputFields | None, secrets: UserSecretsInputFields, params: AuthenticationParameters) None[source]¶
Authenticates the user using the saved TOTP parameters and the passed code.
- Parameters:
_user – ignored
secrets – should contain totpsecret that was saved in the session data
params – should contain otp
- async create_one_time_secrets(user: User) UserSecretsInputFields[source]¶
Does nothing for this class
- async create_persistent_secrets(username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters | None = None) UserSecretsInputFields[source]¶
Creates and returns a totpsecret
allow_empty_secrets is ignored.
- Parameters:
username – the user to create these for
_params – ignored
_repeat_params – ignored
- Returns:
Dictionary where the totpsecret field will be populated.
- async prepare_configuration(user: UserInputFields) Dict[str, Dict[str, Any]] | None[source]¶
Creates a shared secret and returns it, along with image data for the QR code to display.
- Parameters:
user – the username is expected to be present. All other fields are ignored.
- Returns:
userData containing username, totpsecret, factor2 and qr.
sessionData containing the same except qr.
- Return type:
Dictionary containing
- async reprepare_configuration(username: str, session_key: Key) Dict[str, Any] | None[source]¶
For cases when the 2FA page was closed without completing. Returns the same data as prepare_configuration, without generating a new secret.
- Parameters:
username – user to return this for
session_key – the session key, which should contain the sessionData from prepare_configuration
- Returns:
userData containing totpsecret, factor2 and qr.
secrets containing totpsecret.
newSessionData containing the same except qr.
- Return type:
Dictionary containing
- skip_email_verification_on_signup() bool[source]¶
- Returns:
- false - if email verification is enabled, it should be used
for this class
- validate_secrets(params: AuthenticationParameters) List[str][source]¶
Does nothing for this class
- class crossauth_backend.User[source]¶
Bases:
UserInputFieldsThis adds ID to
UserInputFields.If your username field is unique and immutable, you can omit ID (passing username anywhere an ID) is expected. However, if you want users to be able to change their username, you should include ID field and make that immutable instead.
- admin: NotRequired[bool]¶
- email: NotRequired[str]¶
- email_normalized: NotRequired[str]¶
Email lowercased and non language-normalized
- factor1: str¶
- factor2: NotRequired[str]¶
- id: str | int¶
ID fied, which may be auto-generated
- state: str¶
- username: str¶
- username_normalized: NotRequired[str]¶
Username lowercased and non language-normalized
- class crossauth_backend.UserInputFields[source]¶
Bases:
TypedDictDescribes a user as fetched from the user storage (eg, database table), excluding auto-generated fields such as an auto-generated ID
This is extendible with additional fields - provide them to the
UserStorageclass as extraFields.You may want to do this if you want to pass additional user data back to the caller, eg real name.
The fields defined here are the ones used by Crossauth. You may add others.
- admin: NotRequired[bool]¶
Whether or not the user has administrator priviledges (and can acess admin-only functions).
- email: NotRequired[str]¶
You can optionally include an email address field in your user table. If your username is an email address, you do not need a separate field.
- factor1: str¶
Factor for primary authentication.
Should match the name of an authenticator
- factor2: NotRequired[str]¶
Factor for second factor authentication.
Should match the name of an authenticator
- state: str¶
You are free to define your own states. The ones Crossauth recognises are defined in
UserState.
- username: str¶
The username. This may be an email address or anything else, application-specific.
- class crossauth_backend.UserSecrets[source]¶
Bases:
UserSecretsInputFieldsThis adds the user ID toi
UserSecretsInputFields.- expiry: int¶
- otp: str¶
- password: str¶
- totpsecret: str¶
- userid: str | int¶
- class crossauth_backend.UserSecretsInputFields[source]¶
Bases:
TypedDictSecrets, such as a password, are not in the User object to prevent them accidentally being leaked to the frontend. All functions that return secrets return them in this separate object.
The fields in this class are the ones that are not autogenerated by the database.
- expiry: int¶
- otp: str¶
- password: str¶
- totpsecret: str¶
- class crossauth_backend.UserState[source]¶
Bases:
objectThese are used valiues for state in a
crossauth_backend.Userobject- active = 'active'¶
- awaiting_email_verification = 'awaitingemailverification'¶
- awaiting_two_factor_setup = 'awaitingtwofactorsetup'¶
- awaiting_two_factor_setup_and_email_verification = 'awaitingtwofactorsetupandemailverification'¶
- disabled = 'disabled'¶
- factor2_reset_needed = 'factor2resetneeded'¶
- password_and_factor2_reset_needed = 'passwordandfactor2resetneeded'¶
- password_change_needed = 'passwordchangeneeded'¶
- password_reset_needed = 'passwordresetneeded'¶
- class crossauth_backend.UserStorage(options: UserStorageOptions = {})[source]¶
Bases:
ABCBase class for place where user details are stored.
This class is subclassed for various types of user storage, e.g. PrismaUserStorage is for storing username and password in a database table, managed by the Prisma ORM.
Username and email searches should be case insensitive, as should their unique constraints. ID searches need not be case insensitive.
- property admin_editable_fields¶
- async create_user(user: UserInputFields, secrets: UserSecretsInputFields | None = None) User[source]¶
Creates a user with the given details and secrets.
- Parameters:
user (crossauth_backend.UserInputFields) – will be put in the User table
secrets (crossauth_backend.UserSecretsInputFields|None) – will be put in the UserSecrets table
- Returns:
the new user as a User object
:raises
crossauth_backend.CrossauthErrorwithErrorCodeConfiguration
- abstractmethod async delete_user_by_id(id: str | int) None[source]¶
If the storage supports this, delete the user with the given ID from storage.
- Parameters:
id (str|int) – id of user to delete
- abstractmethod async delete_user_by_username(username: str) None[source]¶
If the storage supports this, delete the named user from storage.
- Parameters:
username (str) – username to delete
- abstractmethod async get_user_by(field: str, value: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given field, or throws an exception.
This does no normalisation. Currently it is only used for the OAuth client if you set user_creation_type to “merge”, “embed” or “custom”.
- Parameters:
field (str) – the field to match
value (str) – the value to match
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- abstractmethod async get_user_by_email(email: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given email address, or throws an exception.
If normalize_email is true, email should be matched normalized and lowercased (using normalize()) If the email field doesn’t exist, username is assumed to be the email column
- Parameters:
email – the email address to return the user of
options – optionally turn off checks. Used internally
- Raises:
crossauth_backend.CrossauthError – with ErrorCode either UserNotExist or Connection
- abstractmethod async get_user_by_id(id: str | int, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given user id, or throws an exception.
Note that implementations are free to define what the user ID is. It can be a number or string, or can simply be username.
- Parameters:
id (str|int) – the user id to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthErrorwithErrorCodeeither UserNotExist or Connection
- abstractmethod async get_user_by_username(username: str, options: UserStorageGetOptions = {}) UserAndSecrets[source]¶
Returns user matching the given username, or throws an exception.
If normalize_username is true, the username should be matched normalized and lowercased (using normalize())
- Parameters:
username (str) – the username to return the user of
options (UserStorageGetOptions) – optionally turn off checks. Used internally
:raises
crossauth_backend.CrossauthError: withErrorCodeeither UserNotExist or Connection
- abstractmethod async get_users(skip: int | None = None, take: int | None = None) List[User][source]¶
Returns all users in the storage, in a fixed order defined by the storage (e.g. alphabetical by username)
- Parameters:
skip (int|None) – skip this number of records from the start of the set
take (int|None) – only return at most this number of records
- Returns:
an array of User objects
- static normalize(string: str) str[source]¶
By default, usernames and emails are stored in lowercase, normalized format. This function returns that normalization.
- Parameters:
string (str) – the string to normalize
- Returns:
the normalized string, in lowercase with diacritics removed
- property normalize_email¶
- property normalize_username¶
- abstractmethod async update_user(user: PartialUser, secrets: PartialUserSecrets | None = None) None[source]¶
Updates an existing user with the given details and secrets.
If the given user exists in the database, update it with the passed values. If it doesn’t exist, throw a CrossauthError with ErrorCode InvalidKey.
- Parameters:
user (crossauth_backend.PartialUser) – The ‘id’ field must be set, but all others are optional. Any parameter not set (or None) will not be updated. If you want to set something to None in the database, pass the value as None, not undefined.
secrets (crossauth_backend.PartialUserSecrets|None) – Optional secrets to update
- property user_editable_fields¶
- class crossauth_backend.UserStorageGetOptions[source]¶
Bases:
TypedDictPassed to get methods :class: UserStorage.
- skip_active_check: bool¶
If true, a valid user will be returned even if state is not set to active
- skip_email_verified_check: bool¶
If true, a valid user will be returned even if state is set to awaitingemailverification
- class crossauth_backend.UserStorageOptions[source]¶
Bases:
TypedDictOptions passed to :class: UserStorage constructor
- admin_editable_fields: List[str]¶
Fields that admins are allowed to edit (in addition to userEditableFields)
- normalize_email: bool¶
If true, email addresses (in the email column not in the username column) will be matched as lowercase and with diacritics removed. Default true.
- normalize_username: bool¶
If true, usernames will be matched as lowercase and with diacritics removed. Default true,
Note: this doesn’t apply to the ID column
- user_editable_fields: List[str]¶
Fields that users are allowed to edit. Any fields passed to a create or update call that are not in this list will be ignored.
- crossauth_backend.default_create_user_dn(user: UserInputFields, ldap_user: LdapUser) UserInputFields[source]¶
- crossauth_backend.default_password_validator(params: AuthenticationParameters) List[str][source]¶
- crossauth_backend.j(arg: Mapping[str, Any] | str) Dict[str, Any] | str[source]¶
Helper function that returns JSON if the error log supports it, otherwise a string
- crossauth_backend.register_sqlite_datetime()[source]¶
SQLite has no date column types. As of Python 3.12, the default adapters which made this seamless don’t work.
If you don’t have your own adapters and you want to store dates etc as real, call this function to register the ones supplied with Crossauth and declare your columns as date/datetime/timestamp as normal.
- crossauth_backend.set_parameter(param: str, param_type: ParamType, instance: Any, options: Mapping[str, Any], env_name: str | None = None, required: bool = False, public: bool = False, protected: bool = False) None[source]¶
Sets an instance variable in the passed object from the passed options object and environment variable.
If the named parameter exists in the options object, then the instance variable is set to that value. Otherwise, if the named environment variable exists, it is set from that. Otherwise, the instance variable is not updated.
- Parameters:
param (str) – The name of the parameter in the options variable and the name of the variable in the instance.
param_type (ParamType) – The type of variable. If the value is JsonArray or Json, both the option and the environment variable value should be a string, which will be parsed.
instance (Any) – Options present in the options or environment variables will be set on a corresponding instance variable in this class or object.
options (Dict[str, Any]) – Object containing options as key/value pairs.
env_name (str) – Name of environment variable.
required (bool) – If true, an exception will be thrown if the variable is not present in options or the environment variable.
public (bool) – If false, _ will be prepended to the field nam,e in the target. Default False.
protected (bool) – If true, __ will be prepended to the field nam,e in the target. Default False. Don’t use this and public together.
- Raises:
crossauth_backend.CrossauthError: withErrorCodeConfiguration if required is set but the option was not present, or if there was a parsing error.