Source code for crossauth_backend.authenticators.emailauth

from crossauth_backend.auth import Authenticator, AuthenticationOptions, AuthenticationParameters
from crossauth_backend.storage import KeyStorage
from crossauth_backend.common.interfaces import UserInputFields, UserSecretsInputFields, User, Key
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.utils import set_parameter, ParamType
from crossauth_backend.common.logger import CrossauthLogger, j

from typing import List, Optional, Dict, Any, Callable, Literal
from datetime import timedelta, datetime
import re
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Environment , FileSystemLoader
import secrets

[docs] class EmailAuthenticatorOptions(AuthenticationOptions, total=False): """ Optional parameters for :class: EmailAuthenticator. See :func: EmailAuthenticator__init__ for details """ views : str """ The directory containing views (by default, Jinja2 templates) """ email_authenticator_text_bod: str """ Template file containing page for producing the text version of the email verification email body """ email_authenticator_html_body: str """ Template file containing page for producing the HTML version of the email verification email body """ email_authenticator_subject: str """ Subject for the the email verification email """ email_from: str """ Sender for emails """ smtp_host: str """ Hostname of the SMTP server. No default - required parameter """ smtp_port: int """ Port the SMTP server is running on. Default 25 """ smtp_use_tls: bool """ Whether or not TLS is used by the SMTP server. Default false """ smtp_username: str """ Username for connecting to SMTP servger. Default undefined """ smtp_password: str """ Password for connecting to SMTP servger. Default undefined """ email_authenticator_token_expires: int """ Number of seconds before otps should expire. Default 5 minutes """ render : Callable[[str, Dict[str,Any]], str] """ if passed, use this instead of the default nunjucks renderer """
def random_int(max_val: int) -> int: """Generate a random integer between 0 and max_val (inclusive)""" return secrets.randbelow(max_val + 1)
[docs] class EmailAuthenticator(Authenticator): """ This authenticator sends a one-time code by email """ def __init__(self, options: EmailAuthenticatorOptions = {}): """ Constructor :param options see :class:`crossauth_backend.EmailAuthenticatorOptions` """ super().__init__({"friendly_name": "Email OTP", **options}) self.__views: str = "views" self.__email_authenticator_text_body: str|None = "emailauthenticationtextbody.njk" self.__email_authenticator_html_body: str|None = None self.__email_authenticator_subject: str = "Login code" self.__email_from : str = "" self.__smtp_host: str = "" self.__smtp_port: int = 587 self.__smtp_use_tls: bool = True self.__smtp_username: str|None = None self.__smtp_password: str|None = None self.__email_authenticator_token_expires: int = 60*5 self.__render : Callable[[str, Dict[str,Any]], str]|None = None set_parameter("views", ParamType.String, self, options, "VIEWS") set_parameter("email_authenticator_text_body", ParamType.String, self, options, "EMAIL_AUTHENTICATOR_TEXT_BODY") set_parameter("email_authenticator_html_body", ParamType.String, self, options, "EMAIL_AUTHENTICATOR_HTML_BODY") set_parameter("email_authenticator_subject", ParamType.String, self, options, "EMAIL_AUTHENTICATOR_SUBJECT") set_parameter("email_from", ParamType.String, self, options, "EMAIL_FROM", True) set_parameter("smtp_host", ParamType.String, self, options, "SMTP_HOST", True) set_parameter("smtp_port", ParamType.Integer, self, options, "SMTP_PORT") set_parameter("smtp_username", ParamType.String, self, options, "SMTP_USERNAME") set_parameter("smtp_password", ParamType.String, self, options, "SMTP_PASSWORD") set_parameter("smtp_use_tls", ParamType.Boolean, self, options, "SMTP_USE_TLS") set_parameter("email_authenticator_token_expires", ParamType.Integer, self, options, "EMAIL_AUTHENTICATOR_TOKEN_EXPIRES") if "render" in options: self.__render = options["render"] else: self.jinja_env = Environment(loader=FileSystemLoader(self.__views), autoescape=True)
[docs] def mfa_type(self) -> Literal["none", "oob", "otp"]: """Used by the OAuth password_mfa grant type.""" return "oob"
[docs] def mfa_channel(self) -> Literal["none", "email", "sms"]: """Used by the OAuth password_mfa grant type.""" return "email"
def _create_emailer(self) -> smtplib.SMTP: """Create SMTP connection for sending emails""" if self.__smtp_use_tls: server = smtplib.SMTP_SSL(self.__smtp_host, self.__smtp_port) else: server = smtplib.SMTP(self.__smtp_host, self.__smtp_port) if self.__smtp_username is not None and self.__smtp_password is not None and self.__smtp_username != "" and self.__smtp_password != "": server.login(self.__smtp_username, self.__smtp_password) return server async def _send_token(self, to: str, otp: str) -> str: """Send OTP token via email""" EmailAuthenticator.validate_email(to) msg = MIMEMultipart('alternative') msg['From'] = self.__email_from msg['To'] = to msg['Subject'] = self.__email_authenticator_subject data = {"otp": otp} if self.__email_authenticator_text_body is not None and self.__email_authenticator_text_body != "": if self.__render: text_body = self.__render(self.__views + "/" + self.__email_authenticator_text_body, data) else: template = self.jinja_env.get_template(self.__email_authenticator_text_body) text_body = template.render(data) text_part = MIMEText(text_body, 'plain') msg.attach(text_part) if self.__email_authenticator_html_body is not None and self.__email_authenticator_html_body != "": if self.__render is not None: html_body = self.__render(self.__views + "/" + self.__email_authenticator_html_body, data) else: template = self.jinja_env.get_template(self.__email_authenticator_html_body) html_body = template.render(data) html_part = MIMEText(html_body, 'html') msg.attach(html_part) server = self._create_emailer() try: server.sendmail(self.__email_from, to, msg.as_string()) # Generate a message ID similar to nodemailer message_id = f"<{secrets.token_hex(16)}@{self.__smtp_host}>" return message_id finally: server.quit()
[docs] async def prepare_configuration(self, user: UserInputFields) -> Optional[Dict[str, Dict[str, Any]]]: """ Creates and emails the one-time code @param user the user to create it for. Uses the `email` field if present, `username` otherwise (which in this case is expected to contain an email address) :return `userData` containing `username`, `email`, `factor2` `sessionData` containing the same plus `otp` and `expiry` which is a Unix time (number). """ if not self.factor_name: raise CrossauthError(ErrorCode.Configuration, "Please set factorName on EmailAuthenticator before using") otp = EmailAuthenticator.zero_pad(random_int(999999), 6) email = user["email"] if "email" in user else user["username"] EmailAuthenticator.validate_email(email) now = datetime.now() expiry = int((now + timedelta(seconds=self.__email_authenticator_token_expires)).timestamp() * 1000) user_data = { "username": user["username"], "email": email, "factor2": self.factor_name } session_data : Dict[str,int|str] = { "username": user["username"], "factor2": self.factor_name, "expiry": expiry, "email": email, "otp": otp, } message_id = await self._send_token(email, otp) CrossauthLogger.logger().info(j({ "msg": "Sent factor otp email", "emailMessageId": message_id, "email": email })) return {"userData": user_data, "sessionData": session_data}
[docs] async def reprepare_configuration(self, username: str, session_key: Key) -> Optional[Dict[str, Dict[str, Any] | Optional[Dict[str, Any]]]]: """ Creates and emails a new one-time code. :param _username ignored :param sessionKey the session containing the previously created data. :return dict """ if ("data" not in session_key): raise CrossauthError(ErrorCode.InvalidKey, "2FA not found in session") data = KeyStorage.decode_data(session_key["data"])["2fa"] otp = EmailAuthenticator.zero_pad(random_int(999999), 6) now = datetime.now() expiry = int((now + timedelta(seconds=self.__email_authenticator_token_expires)).timestamp() * 1000) message_id = await self._send_token(data["email"], otp) CrossauthLogger.logger().info(j({ "msg": "Sent factor otp email", "emailMessageId": message_id, "email": data["email"] })) return { "userData": {"email": data["email"], "factor2": data["factor2"], "otp": otp}, "secrets": {}, "newSessionData": {**data, "otp": otp, "expiry": expiry} }
[docs] async def authenticate_user(self, user: UserInputFields|None, secrets: UserSecretsInputFields, params: AuthenticationParameters) -> None: """ Authenticates the user by comparing the user-provided otp with the one in secrets. Validation fails if the otp is incorrect or has expired. :param _user ignored :param secrets taken from the session and should contain `otp` and `expiry` :param params user input and should contain `otp` :raise CrossauthError with ErrorCode `InvalidToken` or `Expired`. """ if "otp" not in params or "otp" not in secrets: raise CrossauthError(ErrorCode.InvalidToken, "Code not given or stored") if params["otp"] != secrets["otp"]: raise CrossauthError(ErrorCode.InvalidToken, "Invalid code") now = int(datetime.now().timestamp() * 1000) if "expiry" not in secrets or now > secrets["expiry"]: raise CrossauthError(ErrorCode.Expired, "Token has expired")
[docs] async def create_persistent_secrets(self, username: str, params: AuthenticationParameters, repeat_params: AuthenticationParameters|None = None) -> UserSecretsInputFields: """Does nothing for this class""" return {}
[docs] async def create_one_time_secrets(self, user: User) -> UserSecretsInputFields: """ Creates and emails a new one-time code. @param user the user to create it for. Uses the `email` field if present, `username` otherwise (which in this case is expected to contain an email address) :return `otp` and `expiry` as a Unix time (number). """ otp = EmailAuthenticator.zero_pad(random_int(999999), 6) now = datetime.now() expiry = int((now + timedelta(seconds=self.__email_authenticator_token_expires)).timestamp() * 1000) email = user["email"] if "email" in user else user["username"] message_id = await self._send_token(email, otp) CrossauthLogger.logger().info(j({ "msg": "Sent factor otp email", "emailMessageId": message_id, "email": email })) return {"otp": otp, "expiry": expiry}
[docs] def can_create_user(self) -> bool: """ :return true - this class can create users""" return True
[docs] def can_update_user(self) -> bool: """ :return true - this class can update users""" return True
[docs] def can_update_secrets(self) -> bool: """ :return false - users cannot update secrets""" return False
[docs] def secret_names(self) -> List[str]: """ :return empty - this authenticator has no persistent secrets""" return []
[docs] def transient_secret_names(self) -> List[str]: """ :return otp""" return ["otp"]
[docs] def validate_secrets(self, params: AuthenticationParameters) -> List[str]: """Does nothing for this class""" return []
[docs] def skip_email_verification_on_signup(self) -> bool: """ :return true - as a code is sent to the registers email address, no additional email verification is needed """ return True
[docs] @staticmethod def is_email_valid(email: str) -> bool: """ Returns whether or not the passed email has a valid form. @param email the email address to validate :return true if it is valid. false otherwise """ # https://stackoverflow.com/questions/46155/how-can-i-validate-an-email-address-in-javascript email = str(email).lower() pattern = r'^(([^<>()[\]\.,;:\s@"]+(\.[^<>()[\]\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$' return re.match(pattern, email) is not None
[docs] @staticmethod def validate_email(email: Optional[str]) -> None: """ Throws an exception if an email address doesn't have a valid form. @param email the email address to validate @throws CrossauthError with ErrorCode `InvalidEmail`. """ if email is None or not EmailAuthenticator.is_email_valid(email): raise CrossauthError(ErrorCode.InvalidEmail)
[docs] @staticmethod def zero_pad(num: int, places: int) -> str: """ Takes a number and turns it into a zero-padded string @param num number to pad @param places total number of required digits :return zero-padded string """ zero = places - len(str(num)) + 1 return "0" * max(0, zero - 1) + str(num) if zero > 0 else str(num)