# Copyright (c) 2024 Matthew Baker. All rights reserved. Licenced under the Apache Licence 2.0. See LICENSE file
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.utils import set_parameter, ParamType, MapGetter
from crossauth_backend.common.logger import CrossauthLogger, j
from crossauth_backend.oauth.wellknown import OpenIdConfiguration, TokenBodyType
from crossauth_backend.oauth.tokenconsumer import OAuthTokenConsumer, OAuthTokenConsumerOptions
from crossauth_backend.crypto import Crypto
from typing import Dict, List, Optional, TypedDict, Any, Literal, Mapping, cast
import json
import urllib.parse
from abc import abstractmethod
import requests
from urllib.parse import urlparse
from jwt import JWT
import aiohttp
[docs]
class OAuthFlows:
"""
Crossauth allows you to define which flows are valid for a given client.
"""
All = "all"
""" All flows are allowed """
AuthorizationCode = "authorizationCode"
""" OAuth authorization code flow (without PKCE) """
AuthorizationCodeWithPKCE = "authorizationCodeWithPKCE"
""" OAuth authorization code flow with PKCE """
ClientCredentials = "clientCredentials"
""" Auth client credentials flow """
RefreshToken = "refresh_token"
""" OAuth refresh token flow """
DeviceCode = "device_code"
""" OAuth device code flow """
Password = "password"
""" OAuth password flow """
PasswordMfa = "passwordMfa"
""" The Auth0 password MFA extension to the password flow """
OidcAuthorizationCode = "oidcAuthorizationCode"
""" The OpenID Connect authorization code flow, with or without PKCE """
flow_name = {
AuthorizationCode: "Authorization Code",
AuthorizationCodeWithPKCE: "Authorization Code with PKCE",
ClientCredentials: "Client Credentials",
RefreshToken: "Refresh Token",
DeviceCode: "Device Code",
Password: "Password",
PasswordMfa: "Password MFA",
OidcAuthorizationCode: "OIDC Authorization Code",
}
""" A user friendly name for the given flow ID """
[docs]
@staticmethod
def flow_names(flows: List[str]) -> Dict[str, str]:
"""
Returns a user-friendly name for the given flow strs.
The value returned is the one in `flow_name`.
:param List[str] flows: the flows to return the names of
:return: a dictionary of strs
"""
return {flow: OAuthFlows.flow_name[flow] for flow in flows if flow in OAuthFlows.flow_name}
[docs]
@staticmethod
def is_valid_flow(flow: str) -> bool:
"""
Returns true if the given str is a valid flow name.
:param str flow: the flow to check
:return: true or false.
"""
return flow in OAuthFlows.all_flows()
[docs]
@staticmethod
def are_valid_flows(flows: List[str]) -> bool:
"""
Returns true only if all given strs are valid flows
:param List[str] flows: the flows to check
:return: true or false.
"""
return all(OAuthFlows.is_valid_flow(flow) for flow in flows)
[docs]
@staticmethod
def all_flows() -> List[str]:
""" Returns a lsit of all possible OAuth flows """
return [
OAuthFlows.AuthorizationCode,
OAuthFlows.AuthorizationCodeWithPKCE,
OAuthFlows.ClientCredentials,
OAuthFlows.RefreshToken,
OAuthFlows.DeviceCode,
OAuthFlows.Password,
OAuthFlows.PasswordMfa,
OAuthFlows.OidcAuthorizationCode,
]
[docs]
@staticmethod
def grant_type(oauthFlow: str) -> Optional[List[str]]:
"""
Returns the OAuth grant types that are valid for a given flow, or
`None` if it is not a valid flow.
:param str oauthFlow: the flow to get the grant type for.
:return: a list of grant type strs or None
"""
match oauthFlow:
case OAuthFlows.AuthorizationCode:
return ["authorization_code"]
case OAuthFlows.AuthorizationCodeWithPKCE:
return ["authorization_code"]
case OAuthFlows.OidcAuthorizationCode:
return ["authorization_code"]
case OAuthFlows.ClientCredentials:
return ["client_credentials"]
case OAuthFlows.RefreshToken:
return ["refresh_token"]
case OAuthFlows.Password:
return ["password"]
case OAuthFlows.PasswordMfa:
return ["http://auth0.com/oauth/grant-type/mfa-otp", "http://auth0.com/oauth/grant-type/mfa-oob"]
case OAuthFlows.DeviceCode:
return ["urn:ietf:params:oauth:grant-type:device_code"]
case _:
raise CrossauthError(ErrorCode.BadRequest, "Invalid OAuth flow " + oauthFlow)
[docs]
class IdTokenReturn(TypedDict, total=False):
id_payload: Mapping[str, Any]
error: str
error_description: str
[docs]
class OAuthTokenResponse(TypedDict, total=False):
"""
These are the fields that can be returned in the JSON from an OAuth call.
"""
access_token : str
refresh_token : str
id_token : str
id_payload : Mapping[str, Any]
token_type : str
expires_in : int
error : str
error_description : str
scope : str
mfa_token : str
oob_channel : str
oob_code : str
challenge_type : str
binding_method : str
name : str
[docs]
class OAuthMfaAuthenticator(TypedDict, total=False):
authenticator_type: str
id : str
active: bool
oob_channel : str
name: str
error: str
error_description: str
[docs]
class OAuthMfaAuthenticatorsResponse(TypedDict, total=False):
authenticators: List[OAuthMfaAuthenticator]
error : str
error_description: str
[docs]
class OAuthMfaAuthenticatorsOrTokenResponse(OAuthMfaAuthenticatorsResponse, OAuthTokenResponse, total=False):
pass
[docs]
class OAuthMfaChallengeResponse(TypedDict, total=False):
challenge_type: str
oob_code: str
binding_method: str
error : str
error_description: str
[docs]
class OAuthDeviceAuthorizationResponse(TypedDict, total=False):
"""
These are the fields that can be returned in the device_authorization
device code flow endpoint.
"""
device_code : str
user_code : str
verification_uri : str
verification_uri_complete : str
expires_in : str
interval : str
error : str
error_description : str
[docs]
class OAuthDeviceResponse(TypedDict, total=False):
"""
These are the fields that can be returned in the device
device code flow endpoint.
"""
client_id : str
scope_authorization_needed : bool
scope : str
error : str
error_description : str
[docs]
class OAuthClientOptions(OAuthTokenConsumerOptions, total=False):
""" Options for :class: OAuthClientBase """
state_length : int
""" Length of random state variable for passing to `authorize` endpoint
(before bsae64-url-encoding)
"""
verifier_length : int
""" Length of random code verifier to generate
(before bsae64-url-encoding)
"""
client_id : str
"""
Client ID for this client
"""
client_secret : str
"""
Client secret for this client (can be undefined for no secret)
"""
redirect_uri : str
"""
Redirect URI to send in `authorize` requests
"""
code_challenge_method : Literal["plain", "S256"]
"""
Type of code challenge for PKCE
"""
device_authorization_url : str
"""
URL to call for the device_authorization endpoint, relative to
the `auth_server_base_url`.
Default `device_authorization`
"""
oauth_post_type : Literal["json", "form"]
"""
If set to JSON, make calls to the token endpoint as JSON, otherwise
as x-www-form-urlencoded.
"""
oauth_use_user_info_endpoint : bool
"""
If your authorization server only returns certain claims in the userinfo
endpoint, rather than in the id token, set this to true
"""
oauth_authorize_redirect : str|None
"""
In the special case where you are running this in Docker on a private
machine, the client cannot redirect to the authorization endpoint given
in the OIDC configuration. You will typically set the auth_server_base_url
to the name of the docker host in this case, and set
oauth_authorize_redirect to localhost.
Default None
"""
[docs]
class OAuthClient:
"""
Base class for OAuth clients.
Flows supported are Authorization Code Flow with and without PKCE,
Client Credentials, Refresh Token, Password and Password MFA. The
latter is defined at
[auth0.com](https://auth0.com/docs/secure/multi-factor-authentication/multi-factor-authentication-factors).
It also supports the OpenID Connect Authorization Code Flow, with and
without PKCE.
"""
def __init__(self, auth_server_base_url : str, options : OAuthClientOptions):
"""
Constructor.
Args:
:param str auth_server_base_url: bsae URL for the authorization server
expected to issue access tokens. If the `iss` field in a JWT
does not match this, it is rejected.
:param crossauth_backend.OAuthClientOptions options: see :class: OAuthClientOptions
"""
self._verifier_length = 32
self._state_length = 32
self._client_id : str = ""
self._client_secret : str|None = None
self._redirect_uri : str|None = None
self._code_challenge_method : Literal["plain", "S256"] = "S256"
self._auth_server_credentials : Literal["include", "omit", "same-origin" ] | None = None
self._auth_server_mode : Literal["no-cors", "cors", "same-origin" ] | None = None
self._auth_server_headers : Dict[str, str] = {}
self._authz_code = ""
self._oidc_config : OpenIdConfiguration | None = None
self._device_authorization_url : str = "device_authorization"
self._oauth_post_type = "json"
self._oauth_use_user_info_endpoint = False
self._oauth_authorize_redirect : str|None = None
self.auth_server_base_url = auth_server_base_url
set_parameter("client_id", ParamType.String, self, options, "OAUTH_CLIENT_ID", required=True, protected=True)
set_parameter("client_secret", ParamType.String, self, options, "OAUTH_CLIENT_SECRET", protected=True)
set_parameter("redirect_uri", ParamType.String, self, options, "OAUTH_REDIRECT_URI", protected=True)
self._token_consumer = OAuthTokenConsumer(self._client_id, {"auth_server_base_url": auth_server_base_url, **options})
set_parameter("state_length", ParamType.String, self, options, "OAUTH_STATE_LENGTH", protected=True)
set_parameter("verifier_length", ParamType.String, self, options, "OAUTH_VERIFIER_LENGTH", protected=True)
set_parameter("client_secret", ParamType.String, self, options, "OAUTH_CLIENT_SECRET", protected=True)
set_parameter("code_challenge_method", ParamType.String, self, options, "OAUTH_CODE_CHALLENGE_METHOD", protected=True)
set_parameter("device_authorization_url", ParamType.String, self, options, "OAUTH_DEVICE_AUTHORIZATION_URL", protected=True)
set_parameter("auth_server_credentials", ParamType.String, self, options, "OAUTH_AUTH_SERVER_CREDENTIALS", protected=True)
set_parameter("auth_server_mode", ParamType.String, self, options, "OAUTH_AUTH_SERVER_MODE", protected=True)
set_parameter("auth_server_headers", ParamType.Json, self, options, "OAUTH_AUTH_SERVER_HEADERS", protected=True)
if (self._device_authorization_url[0:1] == "/"): self._device_authorization_url = self._device_authorization_url[1:]
set_parameter("oauth_post_type", ParamType.Json, self, options, "OAUTH_POST_TYPE", protected=True)
if (self._oauth_post_type != "json" and self._oauth_post_type != "form"):
raise CrossauthError(ErrorCode.Configuration, "oauth_post_type must be json or form")
set_parameter("oauth_use_user_info_endpoint", ParamType.Json, self, options, "OAUTH_USE_USER_INFO_ENDPOINT", protected=True)
set_parameter("oauth_authorize_redirect", ParamType.String, self, options, "OAUTH_AUTHORIZE_REDIRECT", protected=True)
[docs]
async def load_config(self, oidc_config : OpenIdConfiguration|None=None):
"""
Loads OpenID Connect configuration so that the client can determine
the URLs it can call and the features the authorization server provides.
:param oidc_config if defined, loadsa the config from this object.
Otherwise, performs a fetch by appending
`/.well-known/openid-configuration` to the
`auth_server_base_url`.
:throws :class: crossauth_backend.CrossauthError} with the following
:attr: crossauth_backend.ErrorCode.Connection if data from the URL
could not be fetched or parsed.
"""
if oidc_config:
CrossauthLogger.logger().debug(j({"msg": "Reading OIDC config locally"}))
self._oidc_config = oidc_config
return
url = f"{self.auth_server_base_url}/.well-known/openid-configuration"
urlparse(url)
CrossauthLogger.logger().debug(j({"msg": f"Fetching OIDC config from {url}"}))
headers = self._auth_server_headers
options : Dict[str, Any] = {"headers": headers}
if self._auth_server_mode is not None:
options["mode"] = self._auth_server_mode
if self._auth_server_credentials:
options["credentials"] = self._auth_server_credentials
try:
async with aiohttp.ClientSession() as session:
response = await session.get(url, **options)
response.raise_for_status()
except requests.RequestException as e:
CrossauthLogger.logger().error(j({"err": str(e)}))
raise Exception("Couldn't get OIDC configuration from URL")
self._oidc_config : OpenIdConfiguration | None= None
try:
body : Mapping[str,str] = await response.json()
self._oidc_config = {**(cast(OpenIdConfiguration, body))}
except json.JSONDecodeError:
raise Exception("Unrecognized response from OIDC configuration endpoint")
[docs]
def get_oidc_config(self):
return self._oidc_config
[docs]
@abstractmethod
def random_value(self, length : int) -> str:
"""
Produce a random Base64-url-encoded str, whose length before
base64-url-encoding is the given length,
@param length the length of the random array before base64-url-encoding.
@returns the random value as a Base64-url-encoded srting
"""
return Crypto.random_value(length);
[docs]
@abstractmethod
async def sha256(self, plaintext : str) -> str:
"""
SHA256 and Base64-url-encodes the given test
@param plaintext the text to encode
@returns the SHA256 hash, Base64-url-encode
"""
return Crypto.sha256(plaintext)
[docs]
async def code_challenge_and_verifier(self):
code_verifier = self.random_value(self._verifier_length)
code_challenge = await self.sha256(code_verifier) if self._code_challenge_method == "S256" else code_verifier
return {"code_verifier": code_verifier, "code_challenge": code_challenge}
[docs]
async def start_authorization_code_flow(self, state: str, scope : str | None = None, code_challenge: str|None = None, pkce : bool = False):
"""
Initiates the authorization code flow
:param str|None scope:, which can be None
:param bool pkce: if True, start the flow with PKCE (for public clients). Default False
"""
CrossauthLogger.logger().debug(j({"msg": "Starting authorization code flow"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't load OIDC Configuration")
if "code" not in self._oidc_config["response_types_supported"] or not "query" in self._oidc_config["response_modes_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support authorization code flow"
}
if not self._oidc_config.get("authorization_endpoint"):
return {
"error": "server_error",
"error_description": "Cannot get authorize endpoint"
}
if not self._client_id:
return {
"error": "invalid_request",
"error_description": "Cannot make authorization code flow without client id"
}
if not self._redirect_uri:
return {
"error": "invalid_request",
"error_description": "Cannot make authorization code flow without Redirect Uri"
}
base = self._oidc_config["authorization_endpoint"]
if (self._oauth_authorize_redirect):
base = self._oauth_authorize_redirect
url = f"{base}?response_type=code&client_id={urllib.parse.quote(self._client_id)}&state={urllib.parse.quote(state)}&redirect_uri={urllib.parse.quote(self._redirect_uri)}"
if scope:
url += f"&scope={urllib.parse.quote(scope)}"
if pkce:
url += f"&code_challenge={code_challenge}"
return {"url": url}
[docs]
async def redirect_endpoint(self, code : str|None = None, state : str|None = None, code_verifier: str|None = None, error : str|None =None, error_description : str|None=None) -> OAuthTokenResponse:
"""
For calling in a Redirect Uri endpoint
:param str|None code: the authorization code
:param str|None the state, if one is used by the authorization server
:param str|None any error: error message returned by the authorization server. It is passed through in the returned value
:param str|None any error_description: error description returned by the authorization server. It is passed through in the returned value
:return: an OAuth token endpoint response
"""
if not self._oidc_config:
await self.load_config()
if (not self._oidc_config):
return {"error": "server_error", "error_description": "Couldn't load OIDC configuration"}
if error is not None or not code:
if error is None:
error = "server_error"
if error_description is None:
error_description = "Unknown error"
return {"error": error, "error_description": error_description}
self.authzCode = code
if "authorization_code" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support authorization code grant"
}
if not self._oidc_config.get("token_endpoint"):
return {
"error": "server_error",
"error_description": "Cannot get token endpoint"
}
url = self._oidc_config["token_endpoint"]
grant_type = "authorization_code"
client_secret = self._client_secret
params : Dict[str, Any]= {
"grant_type": grant_type,
"client_id": self._client_id,
"code": self.authzCode,
}
if client_secret:
params["client_secret"] = client_secret
params["code_verifier"] = code_verifier
try:
ret = cast(OAuthTokenResponse, await self._post(url, params, self._auth_server_headers))
if ("id_token" in ret):
access_token : str|None = None
if "access_token" in ret:
access_token = ret["access_token"]
user_info = await self._get_id_payload(ret["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
ret["id_payload"] = user_info["id_payload"]
return ret
except Exception as e:
CrossauthLogger.logger().error(j({"cerr": e}))
CrossauthLogger.logger().debug(j({"err": e}))
return {
"error": "server_error",
"error_description": "Unable to get access token from server"
}
[docs]
async def client_credentials_flow(self, scope : str|None = None) -> OAuthTokenResponse:
"""
Start the client credentials flow
:param str|None scope:, which can be None
:return: an OAuth token endpoint response
"""
CrossauthLogger.logger().debug(j({"msg": "Starting client credentials flow"}))
if not self._oidc_config:
await self.load_config()
if self._oidc_config is None or self._oidc_config["token_endpoint"] == "":
return {"error": "server_error", "error_description": "Cannot get token endpoint"}
if "client_credentials" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support client credentials grant"
}
if self._client_id == "":
return {
"error": "invalid_request",
"error_description": "Cannot make client credentials flow without client id"
}
url = self._oidc_config["token_endpoint"]
params : TokenBodyType = {
"grant_type": "client_credentials",
"client_id": self._client_id,
}
if self._client_secret is not None:
params["client_secret"] = self._client_secret
if scope:
params["scope"] = scope
try:
ret = cast(OAuthTokenResponse, await self._post(url, params, self._auth_server_headers))
if ("id_token" in ret):
access_token : str|None = None
if "access_token" in ret:
access_token = ret["access_token"]
user_info = await self._get_id_payload(ret["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
ret["id_payload"] = user_info["id_payload"]
return ret
except Exception as e:
CrossauthLogger.logger().error(j({"err": str(e)}))
return {
"error": "server_error",
"error_description": "Error connecting to authorization server"
}
[docs]
async def password_flow(self, username : str, password : str, scope : str|None = None) -> OAuthTokenResponse:
"""
Start the password flow
:param str username:, user's username
:param str password:, user's plaintext password
:param str|None scope:, which can be None
:return: an OAuth token endpoint response
"""
CrossauthLogger.logger().debug(j({"msg": "Starting password flow"}))
if not self._oidc_config:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "password" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support password grant"
}
if not self._oidc_config.get("token_endpoint"):
return {
"error": "server_error",
"error_description": "Cannot get token endpoint"
}
url = self._oidc_config["token_endpoint"]
params : TokenBodyType = {
"grant_type": "password",
"client_id": self._client_id,
"username": username,
"password": password,
}
if (self._client_secret is not None):
params["client_secret"] = self._client_secret
if scope:
params["scope"] = scope
try:
ret = cast(OAuthTokenResponse, await self._post(url, params, self._auth_server_headers))
if ("id_token" in ret):
access_token : str|None = None
if "access_token" in ret:
access_token = ret["access_token"]
user_info = await self._get_id_payload(ret["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
ret["id_payload"] = user_info["id_payload"]
return ret
except Exception as e:
CrossauthLogger.logger().error(j({"err": str(e)}))
return {
"error": "server_error",
"error_description": "Error connecting to authorization server"
}
[docs]
async def mfa_authenticators(self, mfa_token: str) -> OAuthMfaAuthenticatorsResponse :
"""
Fields that canb be returned by the `mfaAuthenticators` function call
See Auth0's documentation for the password MFA flow.
:param str mfa_token:, The MFA token returned when the flow was initiated
:return: See :class:`OAuthMfaAuthenticatorsResponse`
"""
CrossauthLogger.logger().debug(j({"msg": "Getting valid MFA authenticators"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "http://auth0.com/oauth/grant-type/mfa-otp" not in self._oidc_config["grant_types_supported"] and \
"http://auth0.com/oauth/grant-type/mfa-oob" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support password_mfa grant"
}
if not self._oidc_config.get("issuer"):
return {"error": "server_error", "error_description": "Cannot get issuer"}
url = f"{self._oidc_config['issuer']}/mfa/authenticators" if self._oidc_config['issuer'].endswith("/") else f"{self._oidc_config['issuer']}/mfa/authenticators"
resp = await self._get(url, {'authorization': f'Bearer {mfa_token}', **self._auth_server_headers})
if not isinstance(resp, list):
return {
"error": "server_error",
"error_description": "Expected array of authenticators in mfa/authenticators response"
}
authenticators : List[OAuthMfaAuthenticator] = []
for authenticator in resp:
if not authenticator.get("id") or not authenticator.get("authenticator_type") or not authenticator.get("active"):
return {
"error": "server_error",
"error_description": "Invalid mfa/authenticators response"
}
authenticators.append({
"id": authenticator["id"],
"authenticator_type": authenticator["authenticator_type"],
"active": authenticator["active"],
"name": authenticator.get("name"),
"oob_channel": authenticator.get("oob_channel"),
})
return {"authenticators": authenticators}
[docs]
async def mfa_otp_request(self, mfa_token: str, authenticator_id: str) -> OAuthMfaChallengeResponse:
"""
This is part of the Auth0 Password MFA flow. Once the client has
received a list of valid authenticators, if it wishes to initiate
OTP, call this function
Does not throw exceptions.
:param str mfa_token: the MFA token that was returned by the authorization
server in the response from the Password Flow.
:param str authenticator_id: the authenticator ID, as returned in the response
from the :func:`mfaAuthenticators` request.
"""
CrossauthLogger.logger().debug(j({"msg": "Making MFA OTB request"}))
if not self._oidc_config:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "http://auth0.com/oauth/grant-type/mfa-otp" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support password_mfa grant"
}
if not self._oidc_config.get("issuer"):
return {"error": "server_error", "error_description": "Cannot get issuer"}
url = f"{self._oidc_config['issuer']}/mfa/challenge" if self._oidc_config['issuer'].endswith("/") else f"{self._oidc_config['issuer']}/mfa/challenge"
resp = await self._post(url, {
"client_id": self._client_id,
"client_secret": self._client_secret,
"challenge_type": "otp",
"mfa_token": mfa_token,
"authenticator_id": authenticator_id,
}, self._auth_server_headers)
if resp.get("challenge_type") != "otp":
return {
"error": resp.get("error", "server_error"),
"error_description": resp.get("error_description", "Invalid OTP challenge response")
}
return cast(OAuthMfaChallengeResponse, resp)
[docs]
async def mfa_otp_complete(self, mfa_token: str, otp: str, scope: Optional[str] = None) -> OAuthTokenResponse:
"""
Completes the Password MFA OTP flow.
:param str mfa_token: the MFA token that was returned by the authorization
server in the response from the Password Flow.
:param str otp: the OTP entered by the user
:return: an object with some of the following fields, depending on
authorization server configuration and whether there were
errors:
- `access_token` an OAuth access token
- `refresh_token` an OAuth access token
- `id_token` an OpenID Connect ID token
- `expires_in` number of seconds when the access token expires
- `scope` the scopes the user authorized
- `token_type` the OAuth token type
- `error` as per Auth0 Password MFA documentation
- `error_description` friendly error message
"""
CrossauthLogger.logger().debug(j({"msg": "Completing MFA OTP request"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "http://auth0.com/oauth/grant-type/mfa-otp" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support password_mfa grant"
}
if not self._oidc_config.get("issuer"):
return {"error": "server_error", "error_description": "Cannot get issuer"}
otpUrl = self._oidc_config["token_endpoint"]
otpResp = await self._post(otpUrl, {
"grant_type": "http://auth0.com/oauth/grant-type/mfa-otp",
"client_id": self._client_id,
"client_secret": self._client_secret,
"challenge_type": "otp",
"mfa_token": mfa_token,
"otp": otp,
"scope": scope,
}, self._auth_server_headers)
id_token : Mapping[str,Any]|None = None
if ("id_token" in otpResp):
access_token : str|None = None
if "access_token" in otpResp:
access_token = otpResp["access_token"]
user_info = await self._get_id_payload(otpResp["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
id_token = user_info["id_payload"]
ret = cast(OAuthTokenResponse, {
"id_token": otpResp.get("id_token"),
"access_token": otpResp.get("access_token"),
"refresh_token": otpResp.get("refresh_token"),
"expires_in": int(otpResp.get("expires_in", 0)),
"scope": otpResp.get("scope"),
"token_type": otpResp.get("token_type"),
"error": otpResp.get("error"),
"error_description": otpResp.get("error_description"),
})
if (id_token is not None):
ret["id_payload"] = id_token
return ret
[docs]
async def mfa_oob_request(self, mfa_token: str, authenticator_id: str) -> OAuthMfaAuthenticatorsResponse:
"""
This is part of the Auth0 Password MFA flow. Once the client has
received a list of valid authenticators, if it wishes to initiate
OOB (out of band) login, call this function
Does not throw exceptions.
:param str mfa_token: the MFA token that was returned by the authorization
server in the response from the Password Flow.
:param str authenticator_id: the authenticator ID, as returned in the response
from the :func:`mfa_authenticators` request.
:return: an object with one or more of the following defined:
- `challenge_type` as per the Auth0 MFA documentation
- `oob_code` as per the Auth0 MFA documentation
- `binding_method` as per the Auth0 MFA documentation
- `error` as per Auth0 Password MFA documentation
- `error_description` friendly error message
"""
CrossauthLogger.logger().debug(j({"msg": "Making MFA OOB request"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "http://auth0.com/oauth/grant-type/mfa-otp" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support password_mfa grant"
}
if not self._oidc_config.get("issuer"):
return {"error": "server_error", "error_description": "Cannot get issuer"}
url = f"{self._oidc_config['issuer']}/mfa/challenge" if self._oidc_config['issuer'].endswith("/") else f"{self._oidc_config['issuer']}/mfa/challenge"
resp = await self._post(url, {
"client_id": self._client_id,
"client_secret": self._client_secret,
"challenge_type": "oob",
"mfa_token": mfa_token,
"authenticator_id": authenticator_id,
}, self._auth_server_headers)
if resp.get("challenge_type") != "oob" or not resp.get("oob_code") or not resp.get("binding_method"):
return {
"error": resp.get("error", "server_error"),
"error_description": resp.get("error_description", "Invalid OOB challenge response")
}
return cast(OAuthMfaAuthenticatorsResponse, {
"challenge_type": resp.get("challenge_type"),
"oob_code": resp.get("oob_code"),
"binding_method": resp.get("binding_method"),
"error": resp.get("error"),
"error_description": resp.get("error_description"),
})
[docs]
async def mfa_oob_complete(self, mfa_token: str, oobCode: str, bindingCode: str, scope: Optional[str] = None) -> OAuthTokenResponse:
"""
Completes the Password MFA OTP flow.
Does not throw exceptions.
:param str mfa_token: the MFA token that was returned by the authorization
server in the response from the Password Flow.
:param oob_code: the code entered by the user
:return: an :class:`OAuthTokenResponse` object, which may contain
an error instead of the response fields.
"""
CrossauthLogger.logger().debug(j({"msg": "Completing MFA OOB request"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "http://auth0.com/oauth/grant-type/mfa-oob" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support password_mfa grant"
}
if not self._oidc_config.get("issuer"):
return {"error": "server_error", "error_description": "Cannot get issuer"}
url = self._oidc_config["token_endpoint"]
resp = await self._post(url, {
"grant_type": "http://auth0.com/oauth/grant-type/mfa-oob",
"client_id": self._client_id,
"client_secret": self._client_secret,
"challenge_type": "otp",
"mfa_token": mfa_token,
"oob_code": oobCode,
"binding_code": bindingCode,
"scope": scope,
}, self._auth_server_headers)
if "error" in resp and "error_description" in resp:
return {
"error": MapGetter[str].get(resp, "error", ""),
"error_description": MapGetter[str].get(resp, "error_description", ""),
}
id_token : Mapping[str,Any]|None = None
if ("id_token" in resp):
access_token : str|None = None
if "access_token" in resp:
access_token = resp["access_token"]
user_info = await self._get_id_payload(resp["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
id_token = user_info["id_payload"]
ret = cast(OAuthTokenResponse, {
"id_token": resp.get("id_token"),
"access_token": resp.get("access_token"),
"refresh_token": resp.get("refresh_token"),
"expires_in": int(resp.get("expires_in", 0)),
"scope": resp.get("scope"),
"token_type": resp.get("token_type"),
})
if (id_token is not None):
ret["id_payload"] = id_token
return ret
[docs]
async def refresh_token_flow(self, refresh_token: str) -> OAuthTokenResponse:
"""
Starts the refresh token flow
:param str refresh_token: the refresh token to exchange
:return: a :class:`OAuthTokenResponse? response
"""
CrossauthLogger.logger().debug(j({"msg": "Starting refresh token flow"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "refresh_token" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support refresh_token grant"
}
if not self._oidc_config.get("token_endpoint"):
return {
"error": "server_error",
"error_description": "Cannot get token endpoint"
}
url = self._oidc_config["token_endpoint"]
client_secret = self._client_secret
params = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": self._client_id,
}
if client_secret:
params["client_secret"] = client_secret
try:
ret = cast(OAuthTokenResponse, await self._post(url, params, self._auth_server_headers))
if ("id_token" in ret):
access_token : str|None = None
if "access_token" in ret:
access_token = ret["access_token"]
user_info = await self._get_id_payload(ret["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
ret["id_payload"] = user_info["id_payload"]
return ret
except Exception as e:
CrossauthLogger.logger().error(j({"err": str(e)}))
return {
"error": "server_error",
"error_description": "Error connecting to authorization server"
}
[docs]
async def start_device_code_flow(self, url: str, scope: Optional[str] = None) -> OAuthDeviceAuthorizationResponse:
"""
Starts the Device Code Flow on the primary device (the one wanting an access token)
:param str url: The URl for the device_authorization endpoint, as it is not defined in the OIDC configuration
:param str|None scope: optional scope to request authorization for
:return: See :class:`OAuthDeviceAuthorizationResponse`
"""
CrossauthLogger.logger().debug(j({"msg": "Starting device code flow"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "urn:ietf:params:oauth:grant-type:device_code" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support device code grant"
}
params : TokenBodyType = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"client_id": self._client_id,
}
if self._client_secret is not None:
params["client_secret"] = self._client_secret
if scope:
params["scope"] = scope
try:
ret = cast(OAuthDeviceAuthorizationResponse, await self._post(url, params, self._auth_server_headers))
if ("id_token" in ret):
access_token : str|None = None
if "access_token" in ret:
access_token = ret["access_token"]
user_info = await self._get_id_payload(ret["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
ret["id_payload"] = user_info["id_payload"]
return ret
except Exception as e:
CrossauthLogger.logger().error(j({"err": str(e)}))
return {
"error": "server_error",
"error_description": "Error connecting to authorization server"
}
[docs]
async def poll_device_code_flow(self, device_code: str) -> OAuthDeviceResponse:
"""
Polls the device endpoint to check if the device code flow has been
authorized by the user.
:param str device_code: the device code to poll
:return: See :class:`OAuthDeviceResponse`
"""
CrossauthLogger.logger().debug(j({"msg": "Starting device code flow"}))
if self._oidc_config is None:
await self.load_config()
if (self._oidc_config is None):
raise CrossauthError(ErrorCode.Connection, "Couldn't fet OIDC configuration")
if "urn:ietf:params:oauth:grant-type:device_code" not in self._oidc_config["grant_types_supported"]:
return {
"error": "invalid_request",
"error_description": "Server does not support device code grant"
}
if not self._oidc_config.get("token_endpoint"):
return {
"error": "server_error",
"error_description": "Cannot get token endpoint"
}
params : TokenBodyType = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"client_id": self._client_id,
"device_code": device_code,
}
if self._client_secret is not None:
params["client_secret"] = self._client_secret
try:
resp = await self._post(self._oidc_config["token_endpoint"], params, self._auth_server_headers)
ret = cast(OAuthDeviceResponse, resp)
if ("id_token" in ret):
access_token : str|None = None
if "access_token" in ret:
access_token = ret["access_token"]
user_info = await self._get_id_payload(ret["id_token"], access_token)
error1 : str|None = user_info["error"] if "error" in user_info else None
error_description1 : str|None = user_info["error_description"] if "error_description" in user_info else ""
if (error1 is not None):
return {
"error": error1,
"error_description": error_description1
}
if ("id_payload" in user_info):
ret["id_payload"] = user_info["id_payload"]
return ret
except Exception as e:
CrossauthLogger.logger().error(j({"err": str(e)}))
return {
"error": "server_error",
"error_description": "Error connecting to authorization server"
}
#################################################################3
## UserInfo
[docs]
async def user_info_endpoint(self, access_token : str) -> Mapping[str, Any]:
if (not self._oidc_config or "token_endpoint" not in self._oidc_config):
CrossauthLogger.logger().warn(j({"msg": "Not fetching user info as the endpoint is not defined in the OIDC Config"}))
return {
"error": "server_error",
"error_description": "Cannot get token endpoint"
}
url = self._oidc_config["token_endpoint"]
resp = await self._post(url, {}, {"authorization": "Bearer " + access_token})
return resp
async def _get_id_payload(self, id_token: str, access_token : str|None) -> IdTokenReturn:
ret : IdTokenReturn = {}
try:
payload = await self.validate_id_token(id_token)
if (not payload):
ret["error"] = "access_denied"
ret["error_description"] = "Invalid ID token received"
return ret
ret["id_payload"] = payload
if (access_token):
if (self._oauth_use_user_info_endpoint):
user_info = await self.user_info_endpoint(access_token)
if ("error" in user_info):
ret["error"] = user_info["error"]
ret["error_description"] = "Failed getting user info: "
if ("error_description" in user_info):
ret["error_description"] += user_info["error_description"]
else:
ret["error_description"] += "Unknown error"
payload = {**payload, **user_info}
ret["id_payload"] = payload
return ret
except Exception as e:
ce = CrossauthError.as_crossauth_error(e)
CrossauthLogger.logger().debug(j({"err": ce}))
CrossauthLogger.logger().error(j({"msg": "Couldn't get user info", "cerr": ce}));
ret["error"] = ce.oauthErrorCode
ret["error_description"] = "Couldn't get user info: " + ce.message
return ret
async def _post(self, url: str, params: Mapping[str, Any], headers: Dict[str, Any] = {}) -> Mapping[str, Any]:
CrossauthLogger.logger().debug(j({
"msg": "Fetch POST",
"url": url,
"params": list(params.keys())
}))
options = {}
if self._auth_server_credentials:
options["credentials"] = self._auth_server_credentials
if self._auth_server_mode:
options["mode"] = self._auth_server_mode
async with aiohttp.ClientSession() as session:
if (self._oauth_post_type == "json"):
resp = await session.post(url, json=params, headers={
'Accept': 'application/json',
'Content-Type': 'application/json',
**headers,
})
return await resp.json()
else:
resp = await session.post(url, data=params, headers={
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencodedc',
**headers,
})
return await resp.json()
async def _get(self, url: str, headers: Mapping[str, Any] = {}) -> Mapping[str, Any] | List[Any]:
CrossauthLogger.logger().debug(j({"msg": "Fetch GET", "url": url}))
options = {}
if self._auth_server_credentials:
options["credentials"] = self._auth_server_credentials
if self._auth_server_mode:
options["mode"] = self._auth_server_mode
async with aiohttp.ClientSession() as session:
resp = await session.get(url, headers={
'Accept': 'application/json',
'Content-Type': 'application/json',
**headers,
})
return await resp.json()
[docs]
async def validate_id_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
Validates an OpenID ID token, returning None if it is invalid.
Does not raise exceptions.
:param token: the token to validate. To be valid, the signature must
be valid and the `type` claim in the payload must be set to `id`.
:returns
the parsed payload or None if the token is invalid.
"""
try:
return await self._token_consumer.token_authorized(token, "id")
except Exception:
return None
[docs]
async def id_token_authorized(self, id_token: str) -> Optional[Dict[str, Any]]:
"""
Validates a token using the token consumer.
:param id_token (str): the token to validate
:returns the parsed JSON of the payload, or None if it is not valid.
"""
try:
return await self._token_consumer.token_authorized(id_token, "id")
except Exception as e:
CrossauthLogger.logger().warn(j({"err": e}))
return None
[docs]
def get_token_payload(self, token: str) -> Dict[str, Any]:
"""
Validates a token and, if valid, returns the payload
"""
instance = JWT()
return instance.decode(token, None, do_verify=False, do_time_check=False)