# Copyright (c) 2026 Matthew Baker. All rights reserved. Licenced under the Apache Licence 2.0. See LICENSE file
from crossauth_backend.auth import Authenticator, AuthenticationOptions, AuthenticationParameters
from crossauth_backend.storage import KeyStorage
from crossauth_backend.common.interfaces import UserInputFields, UserSecretsInputFields, User, Key
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.common.logger import CrossauthLogger, j
from typing import List, Optional, Dict, Any
import secrets
import base64
import pyotp
import qrcode
import io
def random_int(max_val: int) -> int:
"""Generate a random integer between 0 and max_val (inclusive)"""
return secrets.randbelow(max_val + 1)
[docs]
class TotpAuthenticator(Authenticator):
"""
This authenticator sends a one-time code by email
"""
def __init__(self, app_name: str, options: AuthenticationOptions = {}):
"""
Constructor
:param app_name this forms part of the QR code that users scan into
their authenticator app. The name will appear in their app
:param options see :class:`crossauth_backend.AuthenticationOptions`
"""
super().__init__({"friendly_name": "Email OTP", **options})
self._app_name = app_name
[docs]
def mfa_type(self) -> str:
"""
Used by the OAuth password_mfa grant type.
"""
return "otp"
[docs]
def mfa_channel(self) -> str:
"""
Used by the OAuth password_mfa grant type.
"""
return "none"
async def _create_secret(self, username: str, secret: Optional[str] = None) -> Dict[str, str]:
"""
Creates a TOTP secret and generates QR code URL
Args:
username: The username for the TOTP secret
secret: Optional existing secret, if None a new one is generated
Returns:
Dictionary containing qr_url and secret
"""
if not secret:
secret = pyotp.random_base32()
qr_url = ""
try:
# Create TOTP object
totp = pyotp.TOTP(secret)
# Generate provisioning URI
provisioning_uri = totp.provisioning_uri(
name=username,
issuer_name=self._app_name
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img1 = img.get_image()
buffered = io.BytesIO()
img1.save(buffered, format="PNG")
qr_url = "data:image/png;base64, " + base64.b64encode(buffered.getvalue()).decode("utf-8")
#img_str = base64.b64encode(buffered.getvalue()).decode()
#qr_url = f"data:image/png;base64,{img_str}"
except Exception as err:
CrossauthLogger.logger().debug(j({"err": str(err)}))
raise CrossauthError(ErrorCode.UnknownError,
"Couldn't generate 2FA URL")
return {"qr_url": qr_url, "secret": secret}
async def _get_secret_from_session(
self,
username: str,
session_key: Key
) -> Dict[str, str]:
"""
Retrieves TOTP secret and other data from session
Args:
username: The username
session_key: The session key containing TOTP data
Returns:
Dictionary containing qr_url, secret, and factor2
"""
if ("data" not in session_key):
raise CrossauthError(ErrorCode.InvalidKey, "No data found in session")
data = KeyStorage.decode_data(session_key["data"])
if data and "2fa" in data:
data = data["2fa"]
# const data = getJsonData(sessionKey);
if "totpsecret" not in data:
raise CrossauthError(ErrorCode.Unauthorized,
"TOTP data not in session")
if "factor2" not in data:
raise CrossauthError(ErrorCode.Unauthorized,
"TOTP factor name not in session")
saved_secret = data["totpsecret"]
secret_data = await self._create_secret(username, saved_secret)
qr_url = secret_data["qr_url"]
secret = secret_data["secret"]
return {
"qr_url": qr_url,
"secret": secret,
"factor2": data["factor2"]
}
[docs]
async def prepare_configuration(self, user: UserInputFields) -> Optional[Dict[str, Dict[str, Any]]]:
"""
Creates a shared secret and returns it, along with image data for the QR
code to display.
Args:
user: the `username` is expected to be present. All other fields
are ignored.
Returns:
Dictionary containing:
- `userData` containing `username`, `totpsecret`, `factor2` and `qr`.
- `sessionData` containing the same except `qr`.
"""
if not self.factor_name:
raise CrossauthError(ErrorCode.Configuration,
"Please set factor_name on TotpAuthenticator before using")
secret_data = await self._create_secret(user["username"])
qr_url = secret_data["qr_url"]
secret = secret_data["secret"]
userData = {
"username": user["username"],
"qr": qr_url,
"totpsecret": secret,
"factor2": self.factor_name
}
sessionData = {
"username": user["username"],
"totpsecret": secret,
"factor2": self.factor_name
}
return {"userData": userData, "sessionData": sessionData}
[docs]
async def reprepare_configuration(
self,
username: str,
session_key: Key
) -> Optional[Dict[str, Any]]:
"""
For cases when the 2FA page was closed without completing. Returns the
same data as `prepare_configuration`, without generating a new secret.
Args:
username: user to return this for
session_key: the session key, which should contain the
`sessionData` from `prepare_configuration`
Returns:
Dictionary containing:
- `userData` containing `totpsecret`, `factor2` and `qr`.
- `secrets` containing `totpsecret`.
- `newSessionData` containing the same except `qr`.
"""
sessionData = await self._get_secret_from_session(username, session_key)
qr_url = sessionData["qr_url"]
secret = sessionData["secret"]
factor2 = sessionData["factor2"]
return {
"userData": {"qr": qr_url, "totpsecret": secret, "factor2": factor2},
"secrets": {"totpsecret": secret},
"newSessionData": None
}
[docs]
async def authenticate_user(self, user: UserInputFields|None, secrets: UserSecretsInputFields, params: AuthenticationParameters) -> None:
"""
Authenticates the user using the saved TOTP parameters and the passed
code.
Args:
_user: ignored
secrets: should contain `totpsecret` that was saved in the session data
params: should contain `otp`
"""
if "totpsecret" not in secrets or "otp" not in params:
raise CrossauthError(ErrorCode.InvalidToken,
"TOTP secret or code not given")
code = params["otp"]
secret = secrets["totpsecret"]
# Verify TOTP code
totp = pyotp.TOTP(secret)
if not totp.verify(code):
raise CrossauthError(ErrorCode.InvalidToken,
"Invalid TOTP code")
[docs]
async def create_persistent_secrets(self,
username: str,
params: AuthenticationParameters,
repeat_params: AuthenticationParameters|None = None) -> UserSecretsInputFields:
"""
Creates and returns a `totpsecret`
`allow_empty_secrets` is ignored.
Args:
username: the user to create these for
_params: ignored
_repeat_params: ignored
Returns:
Dictionary where the `totpsecret` field will be populated.
"""
secret_data = await self._create_secret(username)
secret = secret_data["secret"]
return {"totpsecret": secret}
[docs]
async def create_one_time_secrets(self, user: User) -> UserSecretsInputFields:
"""
Does nothing for this class
"""
return {}
[docs]
def can_create_user(self) -> bool:
"""
Returns:
true - this class can create users
"""
return True
[docs]
def can_update_user(self) -> bool:
"""
Returns:
true - this class can update users
"""
return True
[docs]
def can_update_secrets(self) -> bool:
"""
Returns:
false - users cannot update secrets
"""
return False
[docs]
def secret_names(self) -> List[str]:
"""
Returns:
List containing `totpsecret`
"""
return ["totpsecret"]
[docs]
def transient_secret_names(self) -> List[str]:
"""
Returns:
List containing `otp`
"""
return ["otp"]
[docs]
def validate_secrets(self, params: AuthenticationParameters) -> List[str]:
"""
Does nothing for this class
"""
return []
[docs]
def skip_email_verification_on_signup(self) -> bool:
"""
Returns:
false - if email verification is enabled, it should be used
for this class
"""
return False