# Copyright (c) 2026 Matthew Baker. All rights reserved. Licenced under the Apache Licence 2.0. See LICENSE file
from crossauth_backend.auth import PasswordAuthenticator, AuthenticationOptions, AuthenticationParameters
from crossauth_backend.storageimpl.ldapstorage import LdapUserStorage
from crossauth_backend.utils import set_parameter, ParamType
from crossauth_backend.common.interfaces import UserInputFields, UserSecretsInputFields, User, UserState, Key
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.common.logger import CrossauthLogger, j
from typing import List, Optional, Dict, Any
[docs]
class LdapAuthenticatorOptions(AuthenticationOptions, total=False):
"""
Optional parameters for :class: LdapAuthenticator.
See :func: LdapAuthenticator__init__ for details
"""
ldap_auto_create_account : bool
"""
If true, an account will automatically be created (with factor1 taken
from `ldap_auto_create_factor1` when a user logs in with LDAP)
"""
ldap_auto_create_factor1 : str
""" See :class:crossauth_backend.LdapAuthenticatorOptions """
[docs]
class LdapAuthenticator(PasswordAuthenticator):
def __init__(self, ldap_storage: LdapUserStorage, options: LdapAuthenticatorOptions = {}):
"""
Constructor
:param the storage that defines the LDAP server and databse for storing users locally
:param options see :class:`crossauth_backend.LocalPasswordAuthenticatorOptions`
"""
super().__init__({"friendly_name": "LDAP", **options})
self.__ldap_auto_create_account : bool = False
self.__ldap_storage : LdapUserStorage = ldap_storage
self.__ldap_auto_create_factor1 = "ldap"
set_parameter("ldap_auto_create_account", ParamType.Boolean, self, options, "LDAP_AUTO_CREATE_ACCOUNT")
set_parameter("ldap_auto_create_factor1", ParamType.String, self, options, "LDAP_AUTO_CREATE_FACTOR1")
[docs]
async def authenticate_user(self, user: UserInputFields|None, secrets: UserSecretsInputFields, params: AuthenticationParameters) -> None:
"""
Authenticates the user, returning a the user as a {@link User} object.
@param user the `username` field is required and this is used for LDAP authentication.
If `ldapAutoCreateAccount` is true, these attributes as used for user creation (see {@link LdapUserStorage.createUser}).
@param _secrets Ignored as secrets are stored in LDAP
@param params the `password` field is expected to contain the LDAP password.
@throws {@link @crossauth/common!CrossauthError} with {@link @crossauth/common!ErrorCode} of `Connection`, `UsernameOrPasswordInvalid`.
"""
if ("password" not in params or params["password"] == ""):
raise CrossauthError(ErrorCode.PasswordInvalid, "Password not provided")
if (user is None):
raise CrossauthError(ErrorCode.InvalidUsername, "Must provide a user")
await self.__ldap_storage.get_ldap_user(user["username"], params["password"])
local_user: User
try:
if self.__ldap_auto_create_account:
try:
resp = await self.__ldap_storage.get_user_by_username(user["username"])
local_user = resp["user"]
local_user["factor1"] = self.__ldap_auto_create_factor1
except:
CrossauthLogger.logger().debug(j({"msg": "Creating user", "user": user["username"]}))
local_user = await self.__ldap_storage.create_user(
{"factor1": self.__ldap_auto_create_factor1, **user},
params
)
else:
resp = await self.__ldap_storage.get_user_by_username(user["username"])
local_user = resp["user"]
if local_user["state"] == UserState.awaiting_two_factor_setup:
raise CrossauthError(ErrorCode.TwoFactorIncomplete)
if local_user["state"] == UserState.awaiting_email_verification:
raise CrossauthError(ErrorCode.EmailNotVerified)
if local_user["state"] == UserState.disabled:
raise CrossauthError(ErrorCode.UserNotActive)
except Exception as e1:
CrossauthLogger.logger().debug(j({"err": e1}))
raise e1
[docs]
def validate_secrets(self, params: AuthenticationParameters) -> List[str]:
"""
Does nothing as LDAP is responsible for password format (this class doesn't create password entries)
"""
return []
[docs]
async def create_persistent_secrets(self,
username: str,
params: AuthenticationParameters,
repeat_params: AuthenticationParameters|None = None) -> UserSecretsInputFields:
"""
Does nothing in this class
"""
return {}
[docs]
async def create_one_time_secrets(self, user: User) -> UserSecretsInputFields:
"""
Does nothing in this class
"""
return {}
[docs]
def can_create_user(self) -> bool:
"""
Returns true
"""
return True
[docs]
def can_update_secrets(self) -> bool:
"""
Returns false
"""
return False
[docs]
def can_update_user(self) -> bool:
"""
Returns true
"""
return True
[docs]
def skip_email_verification_on_signup(self) -> bool:
"""
Returns false
"""
return False
[docs]
async def prepare_configuration(self, user: UserInputFields) -> Optional[Dict[str, Dict[str, Any]]]:
""" Nothing to do in this class """
return None
[docs]
async def reprepare_configuration(self, username: str, session_key: Key) -> Optional[Dict[str, Dict[str, Any] | Optional[Dict[str, Any]]]]:
""" Nothing to do in this class """
return None