Source code for crossauth_backend.emailtoken


# Copyright (c) 2026 Matthew Baker.  All rights reserved.  Licenced under the Apache Licence 2.0.  See LICENSE file

from typing import TypedDict, NotRequired, Callable, Dict, Any, cast
from jinja2 import Environment, FileSystemLoader
import smtplib
import ssl
import re
import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from nulltype import Null

from crossauth_backend.common.interfaces import KeyPrefix, User, UserState
from crossauth_backend.crypto import Crypto
from crossauth_backend.storage import UserStorage, KeyStorage
from crossauth_backend.utils import set_parameter, ParamType
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.common.logger import CrossauthLogger, j

TOKEN_LENGTH = 16; # in bytes, before base64url

[docs] class TokenEmailerOptions(TypedDict): """ Configuration options for TokenEmailer """ site_url: NotRequired[str] """ The site url, used to create a link, eg "https://mysite.com:3000". No default - required parameter """ prefix: NotRequired[str] """ The prefix between the site url and the email verification/password reset link. Default "/" """ views: NotRequired[str] """ The directory containing views (by default, Nunjucks templates) """ email_verification_text_body: NotRequired[str] """ Template file containing page for producing the text version of the email verification email body """ email_verification_html_body: NotRequired[str] """ Template file containing page for producing the HTML version of the email verification email body """ email_verification_subject: NotRequired[str] """ Subject for the the email verification email """ password_reset_text_body: NotRequired[str] """ Template file containing page for producing the text version of the password reset email body """ password_reset_html_body: NotRequired[str] """ Template file containing page for producing the HTML version of the password reset email body """ password_reset_subject: NotRequired[str] """ Subject for the the password reset email """ email_from: NotRequired[str] """ Sender for emails """ smtp_host: NotRequired[str] """ Hostname of the SMTP server. No default - required parameter """ smtp_port: NotRequired[int] """ Port the SMTP server is running on. Default 25 """ smtp_use_tls: NotRequired[bool] """ Whether or not TLS is used by the SMTP server. Default false """ smtp_username: NotRequired[str] """ Username for connecting to SMTP servger. Default undefined """ smtp_password: NotRequired[str] """ Password for connecting to SMTP servger. Default undefined """ verify_email_expires: NotRequired[int] """ Number of seconds befire email verification tokens should expire. Default 1 day """ password_reset_expires: NotRequired[int] """ Number of seconds befire password reset tokens should expire. Default 1 day """ render: NotRequired[Callable[[str, Dict[str, Any]], str]] """ if passed, use this instead of the default nunjucks renderer """
[docs] class TokenEmailer: """ Sends password reset and email verification tokens to an email address """ def __init__(self, user_storage: UserStorage, key_storage: KeyStorage, options: TokenEmailerOptions = {}): """ Construct a new EmailVerifier. This emails tokens for email verification and password reset Args: user_storage: where to retrieve and update user details key_storage: where to store email verification tokens options: see TokenEmailerOptions """ self.user_storage = user_storage self.key_storage = key_storage # Set default values self.__views = "views" self.__site_url: str = "" self.__prefix = "/" self.__email_verification_text_body: str|None = "emailverificationtextbody.njk" self.__email_verification_html_body: str|None = None self.__email_verification_subject = "Please verify your email" self.__password_reset_text_body: str|None = "passwordresettextbody.njk" self.__password_reset_html_body: str|None = None self.__password_reset_subject = "Password reset" self.__email_from = "" self.__smtp_host = "" self.__smtp_port = 587 self.__smtp_use_tls: bool = True self.__smtp_username: str|None = None self.__smtp_password: str|None = None self.__verify_email_expires = 60*60*24 self.__password_reset_expires = 60*60*24 self.__render: Callable[[str, Dict[str, Any]], str]|None = None # Set parameters using options and environment variables set_parameter("site_url", ParamType.String, self, options, "SITE_URL", required=True) set_parameter("prefix", ParamType.String, self, options, "SITE_URL") set_parameter("views", ParamType.String, self, options, "VIEWS") set_parameter("email_verification_text_body", ParamType.String, self, options, "EMAIL_VERIFICATION_TEXT_BODY") set_parameter("email_verification_html_body", ParamType.String, self, options, "EMAIL_VERIFICATION_HTML_BODY") set_parameter("email_verification_subject", ParamType.String, self, options, "EMAIL_VERIFICATION_SUBJECT") set_parameter("password_reset_text_body", ParamType.String, self, options, "PASSWORD_RESET_TEXT_BODY") set_parameter("password_reset_html_body", ParamType.String, self, options, "PASSWORD_RESET_HTML_BODY") set_parameter("password_reset_subject", ParamType.String, self, options, "PASSWORD_RESET_SUBJECT") set_parameter("email_from", ParamType.String, self, options, "EMAIL_FROM") set_parameter("smtp_host", ParamType.String, self, options, "SMTP_HOST") set_parameter("smtp_port", ParamType.Integer, self, options, "SMTP_PORT") set_parameter("smtp_username", ParamType.String, self, options, "SMTP_USERNAME") set_parameter("smtp_username", ParamType.String, self, options, "SMTP_USERNAME") set_parameter("smtp_password", ParamType.String, self, options, "SMTP_PASSWORD") set_parameter("smtp_use_tls", ParamType.Boolean, self, options, "SMTP_USE_TLS") set_parameter("verify_email_expires", ParamType.String, self, options, "VERIFY_EMAIL_EXPIRES") set_parameter("password_reset_expires", ParamType.String, self, options, "PASSWORD_RESET_EXPIRES") if "render" in options: self.__render = options["render"] else: self.jinja_env = Environment(loader=FileSystemLoader(self.__views), autoescape=True)
[docs] def create_emailer(self) -> smtplib.SMTP: """Create SMTP emailer""" if self.__smtp_use_tls: server = smtplib.SMTP(self.__smtp_host, self.__smtp_port) server.starttls(context=ssl.create_default_context()) else: server = smtplib.SMTP(self.__smtp_host, self.__smtp_port) if self.__smtp_username and self.__smtp_password: server.login(self.__smtp_username, self.__smtp_password) return server
[docs] @staticmethod def hash_email_verification_token(token: str) -> str: """ Produces a hash of the given email verification token with the correct prefix for inserting into storage. """ return KeyPrefix.email_verification_token + Crypto.hash(token)
[docs] @staticmethod def hash_password_reset_token(token: str) -> str: """ Produces a hash of the given password reset token with the correct prefix for inserting into storage. """ return KeyPrefix.password_reset_token + Crypto.hash(token)
[docs] async def create_and_save_email_verification_token(self, userid: str|int, new_email: str = "") -> str: """Create and save email verification token""" max_tries = 10 try_num = 0 now = datetime.datetime.now() expiry = datetime.datetime.fromtimestamp(now.timestamp() + self.__verify_email_expires) while try_num < max_tries: token = Crypto.random_value(TOKEN_LENGTH) hash_key = TokenEmailer.hash_email_verification_token(token) try: await self.key_storage.save_key(userid, hash_key, now, expiry, new_email) return token except: token = Crypto.random_value(TOKEN_LENGTH) hash_key = TokenEmailer.hash_email_verification_token(token) try_num += 1 raise CrossauthError(ErrorCode.Connection, "failed creating a unique key")
async def _send_email_verification_token(self, token: str, email: str, extra_data: Dict[str, Any]) -> str: """ Separated out for unit testing/mocking purposes """ msg = MIMEMultipart('alternative') msg['From'] = self.__email_from msg['To'] = email msg['Subject'] = self.__email_verification_subject data : Dict[str, Any]= {'token': token, 'siteUrl': self.__site_url, 'prefix': self.__prefix} if extra_data: data = {**data, **extra_data} if self.__email_verification_text_body: if self.__render: text_content = self.__render(self.__email_verification_text_body, data) else: template = self.jinja_env.get_template(self.__email_verification_text_body) text_content = template.render(**data) part1 = MIMEText(text_content, 'plain') msg.attach(part1) html_content : str = "" if self.__email_verification_html_body: if self.__render: html_content = self.__render(self.__email_verification_html_body, data) else: template = self.jinja_env.get_template(self.__email_verification_html_body) html_content = template.render(**data) part2 = MIMEText(html_content, 'html') msg.attach(part2) server = self.create_emailer() try: result = server.send_message(msg) return str(hash(result)) # Simple message ID implementation finally: server.quit()
[docs] async def send_email_verification_token(self, userid: str|int, new_email: str = "", extra_data: Dict[str, Any] = {}) -> None: """ Send an email verification email using the Jinja2 templates. The email address to send it to will be taken from the user's record in user storage. It will first be validated, throwing a CrossauthError with ErrorCode of `InvalidEmail` if it is not valid. :param userid: userid to send it for :param new_email: if this is a token to verify email for account activation, leave this empty. If it is for changing an email, this will be the field it is being changed do. :param extra_data: these extra variables will be passed to the Jinja2 templates """ if not self.__email_verification_text_body and not self.__email_verification_html_body: error = CrossauthError(ErrorCode.Configuration, "Either emailVerificationTextBody or emailVerificationHtmlBody must be set to send email verification emails") raise error result = await self.user_storage.get_user_by_id(userid, {'skip_email_verified_check': True}) user = result['user'] email = new_email if email != "": # this message is to validate a new email (email change) TokenEmailer.validate_email(email) else: email = user["email"] if "email" in user else user["username"] if email: TokenEmailer.validate_email(email) else: email = getattr(user, 'username') TokenEmailer.validate_email(email) TokenEmailer.validate_email(email) token = await self.create_and_save_email_verification_token(userid, new_email) message_id = await self._send_email_verification_token(token, email, extra_data) CrossauthLogger.logger().info(j({'msg': "Sent email verification email", 'emailMessageId': message_id, 'email': email}))
[docs] async def verify_email_verification_token(self, token: str) -> Dict[str, str|int]: """ Validates an email verification token. The following must match: * expiry date in the key storage record must be less than current time * userid in the token must match the userid in the key storage * email address in user storage must match the email in the key. If there is no email address, the username field is set if it is in email format. * expiry time in the key storage must match the expiry time in the key Looks the token up in key storage and verifies it matches and has not expired. :param token: the token to validate Retu:returns: the userid of the user the token is for and the email address the user is validating """ hash_key = TokenEmailer.hash_email_verification_token(token) stored_token = await self.key_storage.get_key(hash_key) try: if "userid" not in stored_token or stored_token["userid"] == Null or "expires" not in stored_token: raise CrossauthError(ErrorCode.InvalidKey, "userid or expires missing from token") userid = cast(str|int, stored_token["userid"]) expires = cast(datetime.datetime, stored_token["expires"]) result = await self.user_storage.get_user_by_id(userid, {'skip_email_verified_check': True}) user = result['user'] email = user["email"] if "email" in user else user["username"] email = email.lower() if email: TokenEmailer.validate_email(email) else: email = getattr(user, 'username').lower() TokenEmailer.validate_email(email) now = datetime.datetime.now().timestamp() if now > expires.timestamp(): raise CrossauthError(ErrorCode.Expired) return {'userid': userid, 'newEmail': stored_token["data"] if "data" in stored_token else ""} finally: # Commented out as in original # try: # await self.key_storage.delete_key(hash_key) # except: # CrossauthLogger.logger.error("Couldn't delete email verification hash " + Crypto.hash(hash_key)) pass
[docs] async def delete_email_verification_token(self, token: str): """Delete email verification token""" try: hash_key = TokenEmailer.hash_email_verification_token(token) await self.key_storage.delete_key(hash_key) except Exception as e: ce = CrossauthError.as_crossauth_error(e) CrossauthLogger.logger().debug(j({'err': ce}))
[docs] async def create_and_save_password_reset_token(self, userid: str|int) -> str: """Create and save password reset token""" max_tries = 10 try_num = 0 now = datetime.datetime.now() expiry = datetime.datetime.fromtimestamp(now.timestamp() + self.__password_reset_expires) while try_num < max_tries: token = Crypto.random_value(TOKEN_LENGTH) hash_key = TokenEmailer.hash_password_reset_token(token) try: await self.key_storage.save_key(userid, hash_key, now, expiry) return token except: token = Crypto.random_value(TOKEN_LENGTH) hash_key = TokenEmailer.hash_password_reset_token(token) try_num += 1 raise CrossauthError(ErrorCode.Connection, "failed creating a unique key")
[docs] async def verify_password_reset_token(self, token: str) -> User: """ Validates a password reset token The following must match: * expiry date in the key storage record must be less than current time * userid in the token must match the userid in the key storage * the email in the token matches either the email or username field in user storage * the password in user storage must match the password in the key * expiry time in the key storage must match the expiry time in the key Looks the token up in key storage and verifies it matches and has not expired. Also verifies the user exists and password has not changed in the meantime. :param token: the token to validate :return the user that the token is for """ hash_key = TokenEmailer.hash_password_reset_token(token) CrossauthLogger.logger().debug("verifyPasswordResetToken " + token + " " + hash_key) stored_token = await self.key_storage.get_key(hash_key) if "userid" not in stored_token or stored_token["userid"] == Null : raise CrossauthError(ErrorCode.InvalidKey, "userid not present in session key") if "expires" not in stored_token or stored_token["expires"] == Null : raise CrossauthError(ErrorCode.InvalidKey, "expires not present in session key") userid = cast(int|str, stored_token["userid"]) expires = cast(datetime.datetime, stored_token["expires"]) result = await self.user_storage.get_user_by_id(userid, {'skip_active_check': True}) user = result['user'] if (user["state"] != UserState.active and user["state"] != UserState.password_reset_needed and user["state"] != UserState.password_and_factor2_reset_needed): raise CrossauthError(ErrorCode.UserNotActive) now = datetime.datetime.now().timestamp() if now > expires.timestamp(): raise CrossauthError(ErrorCode.Expired) return user
async def _send_password_reset_token(self, token: str, email: str, extra_data: Dict[str, Any]) -> str: """ Separated out for unit testing/mocking purposes """ if not self.__email_verification_text_body and not self.__email_verification_html_body: error = CrossauthError(ErrorCode.Configuration, "Either emailVerificationTextBody or emailVerificationHtmlBody must be set to send email verification emails") raise error msg = MIMEMultipart('alternative') msg['From'] = self.__email_from msg['To'] = email msg['Subject'] = self.__password_reset_subject data : Dict[str, Any] = {'token': token, 'siteUrl': self.__site_url, 'prefix': self.__prefix} if extra_data: data = {**data, **extra_data} if self.__password_reset_text_body: if self.__render: text_content = self.__render(self.__password_reset_text_body, data) else: template = self.jinja_env.get_template(self.__password_reset_text_body) text_content = template.render(data) part1 = MIMEText(text_content, 'plain') msg.attach(part1) if self.__password_reset_html_body: if self.__render: html_content = self.__render(self.__password_reset_html_body, data) else: template = self.jinja_env.get_template(self.__password_reset_html_body) html_content = template.render(data) part2 = MIMEText(html_content, 'html') msg.attach(part2) server = self.create_emailer() try: result = server.send_message(msg) return str(hash(result)) # Simple message ID implementation finally: server.quit()
[docs] async def send_password_reset_token(self, userid: int|str, extra_data: Dict[str, Any] = {}, as_admin: bool = False) -> None: """ Send a password reset token email using the Jinja2 templates :param userid: userid to send it for :param extra_data: these extra variables will be passed to the Jinja2 templates :param as_admin: whether this is being sent by an admin """ if not self.__password_reset_text_body and not self.__password_reset_html_body: error = CrossauthError(ErrorCode.Configuration, "Either passwordResetTextBody or passwordResetTextBody must be set to send email verification emails") raise error result = await self.user_storage.get_user_by_id(userid, {'skip_active_check': True}) user = result['user'] if (not as_admin and (user["state"] != UserState.active and user["state"] != UserState.password_reset_needed and user["state"] != UserState.password_and_factor2_reset_needed)): raise CrossauthError(ErrorCode.UserNotActive) email = user["email"] if "email" in user else user["username"].lower() TokenEmailer.validate_email(email) token = await self.create_and_save_password_reset_token(userid) message_id = await self._send_password_reset_token(token, email, extra_data) CrossauthLogger.logger().info(j({'msg': "Sent password reset email", 'emailMessageId': message_id, 'email': email}))
[docs] @staticmethod def is_email_valid(email: str) -> bool: """ Returns true if the given email has a valid format, false otherwise. Args: email: the email to validate Returns: true or false """ # https://stackoverflow.com/questions/46155/how-can-i-validate-an-email-address-in-javascript pattern = r'^(([^<>()[\]\.,;:\s@"]+(\.[^<>()[\]\.,;:\s@"]+)*)|.(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$' return re.match(pattern, email.lower()) is not None
[docs] @staticmethod def validate_email(email: str|None) -> None: """ Returns if the given email has a valid format. Throws a CrossauthError with ErrorCode `InvalidEmail` otherwise. :param email: the email to validate """ if email is None or not TokenEmailer.is_email_valid(email): raise CrossauthError(ErrorCode.InvalidEmail)