# Copyright (c) 2026 Matthew Baker. All rights reserved. Licenced under the Apache Licence 2.0. See LICENSE file
from crossauth_backend.auth import Authenticator, AuthenticationOptions, AuthenticationParameters
from crossauth_backend.storage import KeyStorage
from crossauth_backend.common.interfaces import UserInputFields, UserSecretsInputFields, User, Key
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.utils import set_parameter, ParamType
from crossauth_backend.common.logger import CrossauthLogger, j
from typing import List, Optional, Dict, Any, Callable, cast
from datetime import timedelta, datetime
import re
import secrets
from abc import abstractmethod
from jinja2 import Template
[docs]
class SMSUser(UserInputFields):
phone : str
[docs]
class SmsAuthenticatorOptions(AuthenticationOptions, total=False):
"""
Optional parameters for :class: EmailAuthenticator.
See :func: EmailAuthenticator__init__ for details
"""
views : str
""" The directory containing views (by default, Jinja2 templates) """
sms_authenticator_body: str
"""
Template file containing text for producing SMS messages. Default `smsauthenticationbody.njk`
"""
sms_authenticator_from: str
""" Sender for SMSs """
sms_authenticator_token_expires: int
""" Number of seconds before otps should expire. Default 5 minutes """
render : Callable[[str, Dict[str,Any]], str]
""" if passed, use this instead of the default jinja2 renderer """
[docs]
class SmsAuthenticator(Authenticator):
"""
Abstract base class for sending OTP by SMS
"""
def __init__(self, options: SmsAuthenticatorOptions = {}):
"""
Constructor
:param options see :class:`crossauth_backend.SmsAuthenticatorOptions`
"""
super().__init__({"friendly_name": "SMS OTP", **options})
self._views: str = "views"
self._sms_authenticator_body: str|None = "smsauthenticationbody.njk"
self._sms_authenticator_from : str = ""
self._sms_authenticator_token_expires: int = 60*5
self._render : Callable[[str, Dict[str,Any]], str]|None = None
set_parameter("views", ParamType.String, self, options, "VIEWS", False, False, True)
set_parameter("sms_authenticator_body", ParamType.String, self, options, "SMS_AUTHENTICATOR_BODY", False, False, True)
set_parameter("sms_authenticator_from", ParamType.String, self, options, "SMS_AUTHENTICATOR_FROM", True, False, True)
set_parameter("sms_authenticator_token_expires", ParamType.Integer, self, options, "SMS_AUTHENTICATOR_TOKEN_EXPIRES", False, False, True)
if ("render" in options):
self._render = options["render"]
[docs]
def mfa_type(self) -> str:
"""
Used by the OAuth password_mfa grant type.
"""
return "oob"
[docs]
def mfa_channel(self) -> str:
"""
Used by the OAuth password_mfa grant type.
"""
return "sms"
@abstractmethod
async def _send_sms(self, to: str, body: str) -> str:
"""
Send an SMS
Args:
to: number to send SMS to (starting with `+`)
body: text to send
Returns:
the send message ID
"""
pass
[docs]
async def prepare_configuration(self, user: UserInputFields) -> Optional[Dict[str, Dict[str, Any]]]:
"""
Creates and sends the one-time code
Args:
user: the user to create it for. Uses the `phone` field which
is expected to be a phone number starting with `+`
Returns:
`userData` containing `username`, `phone`, `factor2`
`sessionData` containing the same plus `otp` and `expiry` which
is a Unix time (number).
"""
if not self.factor_name:
raise CrossauthError(ErrorCode.Configuration,
"Please set factorName on SmsAuthenticator before using")
otp = SmsAuthenticator.zero_pad(secrets.randbelow(1000000), 6)
if ("phone" not in user):
raise CrossauthError(ErrorCode.InvalidPhoneNumber, "For sending SMS, phone must be present in user")
number = cast(str, user["phone"])
SmsAuthenticator.validate_phone(number)
now = datetime.now()
expiry = int((now + timedelta(seconds=self._sms_authenticator_token_expires)).timestamp() * 1000)
user_data : SMSUser = {
"username": user.get("username"),
"phone": number,
"factor2": self.factor_name
}
session_data = {
"username": user.get("username"),
"factor2": self.factor_name,
"expiry": expiry,
"phone": number,
"otp": otp
}
data = {"otp": otp}
body = ""
if self._render:
body = self._render(self._sms_authenticator_body, data)
else:
template = Template(self._views + "/" + self._email_authenticator_body)
body = template.render(data)
message_id = await self._send_sms(number, body)
CrossauthLogger.logger().info(j({
"msg": "Sent factor otp sms",
"smsMessageId": message_id,
"phone": number
}))
return {"userData": user_data, "sessionData": session_data}
[docs]
async def reprepare_configuration(self, username: str, session_key: Key) -> Optional[Dict[str, Dict[str, Any] | Optional[Dict[str, Any]]]]:
"""
Creates and sends a new one-time code.
Args:
_username: ignored
session_key: the session containing the previously created data.
Returns:
Dictionary containing userData, secrets, and newSessionData
"""
if ("data" not in session_key or "2fa" not in session_key["data"]):
raise CrossauthError(ErrorCode.InvalidKey, "2FA data not present in session")
data = KeyStorage.decode_data(session_key["data"])["2fa"]
otp = SmsAuthenticator.zero_pad(secrets.randbelow(1000000), 6)
now = datetime.now()
expiry = int((now + timedelta(seconds=self._sms_authenticator_token_expires)).timestamp() * 1000)
message_id = await self._send_sms(data["phone"], otp)
CrossauthLogger.logger().info(j({
"msg": "Sent factor otp sms",
"smsMessageId": message_id,
"phone": data["phone"]
}))
return {
"userData": {"phone": data["phone"], "factor2": data["factor2"], "otp": otp},
"secrets": {},
"newSessionData": {**data, "otp": otp, "expiry": expiry}
}
[docs]
async def authenticate_user(self, user: UserInputFields|None, secrets: UserSecretsInputFields, params: AuthenticationParameters) -> None:
"""
Authenticates the user by comparing the user-provided otp with the one
in secrets.
Validation fails if the otp is incorrect or has expired.
Args:
_user: ignored
secrets: taken from the session and should contain `otp` and `expiry`
params: user input and should contain `otp`
Raises:
CrossauthError: with ErrorCode `InvalidToken` or `Expired`.
"""
if params.get("otp") != secrets.get("otp"):
raise CrossauthError(ErrorCode.InvalidToken, "Invalid code")
now = int(datetime.now().timestamp() * 1000)
if "expiry" not in secrets or now > secrets["expiry"]:
raise CrossauthError(ErrorCode.Expired, "Token has expired")
[docs]
async def create_persistent_secrets(self,
username: str,
params: AuthenticationParameters,
repeat_params: AuthenticationParameters|None = None) -> Dict[str, Any]:
"""
Does nothing for this class
"""
return {}
[docs]
async def create_one_time_secrets(self, user: User) -> Dict[str, Any]:
"""
Creates and sends a new one-time code.
Args:
user: the user to create it for. Uses the `phone` field which
should start with `+`
Returns:
`otp` and `expiry` as a Unix time (number).
"""
otp = SmsAuthenticator.zero_pad(secrets.randbelow(1000000), 6)
now = datetime.now()
expiry = int((now + timedelta(seconds=self._sms_authenticator_token_expires)).timestamp() * 1000)
if ("phone" not in user):
raise CrossauthError(ErrorCode.InvalidPhoneNumber, "To send SMSs, phone must be present in user data")
phone = user["phone"]
message_id = await self._send_sms(phone, otp)
CrossauthLogger.logger().info(j({
"msg": "Sent factor otp sms",
"smsMessageId": message_id,
"phone": phone
}))
return {"otp": otp, "expiry": expiry}
[docs]
def can_create_user(self) -> bool:
"""
Returns:
true - this class can create users
"""
return True
[docs]
def can_update_user(self) -> bool:
"""
Returns:
true - this class can update users
"""
return True
[docs]
def can_update_secrets(self) -> bool:
"""
Returns:
false - users cannot update secrets
"""
return False
[docs]
def secret_names(self) -> List[str]:
"""
Returns:
empty - this authenticator has no persistent secrets
"""
return []
[docs]
def transient_secret_names(self) -> List[str]:
"""
Returns:
otp
"""
return ["otp"]
[docs]
def validate_secrets(self, params: AuthenticationParameters) -> List[str]:
"""
Does nothing for this class
"""
return []
[docs]
def skip_email_verification_on_signup(self) -> bool:
"""
Returns:
false - doesn't replace email verification
"""
return False
[docs]
@staticmethod
def is_phone_valid(number: str) -> bool:
"""
Returns whether or not the passed phone number has a valid form.
Args:
number: the phone number to validate
Returns:
true if it is valid, false otherwise
"""
pattern = r'^\+[1-9][0-9]{7,14}$'
return bool(re.match(pattern, str(number)))
[docs]
@staticmethod
def validate_phone(number: Optional[str]) -> None:
"""
Throws an exception if a phone number doesn't have a valid form.
It must start with a `+` and be 8 to 15 digits
Args:
number: the phone number to validate
Raises:
CrossauthError: with ErrorCode `InvalidPhoneNumber`.
"""
if number is None or not SmsAuthenticator.is_phone_valid(number):
raise CrossauthError(ErrorCode.InvalidPhoneNumber)
[docs]
@staticmethod
def zero_pad(num: int, places: int) -> str:
"""
Takes a number and turns it into a zero-padded string
Args:
num: number to pad
places: total number of required digits
Returns:
zero-padded string
"""
zero = places - len(str(num)) + 1
return ("0" * max(0, zero - 1)) + str(num) if zero > 0 else str(num)