Source code for crossauth_backend.oauth.resserver
# Copyright (c) 2024 Matthew Baker. All rights reserved. Licenced under the Apache Licence 2.0. See LICENSE file
from crossauth_backend.common.error import CrossauthError, ErrorCode
from crossauth_backend.common.logger import CrossauthLogger, j
from crossauth_backend.oauth.tokenconsumer import OAuthTokenConsumer
from typing import TypedDict, Dict, Optional, Any, List
from jwt import JWT
[docs]
class OAuthResourceServerOptions(TypedDict):
pass
[docs]
class OAuthResourceServer:
@property
def token_consumers(self):
return self._token_consumers
def __init__(self, token_consumers : list[OAuthTokenConsumer], options : OAuthResourceServerOptions = {}):
self._token_consumers : List[OAuthTokenConsumer]
self._token_consumers = [*token_consumers]
[docs]
async def access_token_authorized(self, access_token : str) -> Optional[Dict[str, Any]]:
try:
instance = JWT()
payload = instance.decode(access_token, None, do_verify=False, do_time_check=False)
for consumer in self._token_consumers:
if (payload.get('iss') == consumer.auth_server_base_url and \
((payload.get('aud') == consumer.audience) or \
('aud' not in payload and consumer.audience == ""))):
return await consumer.token_authorized(access_token, "access")
iss = payload.get('iss')
if iss is None:
iss = ""
aud = payload.get('aud')
if aud is None:
aud = ""
CrossauthLogger.logger().warn(j({"msg": "Access token's iss " + iss + " or aud" + aud + " are not accepted"}))
raise CrossauthError(ErrorCode.Unauthorized, "Invalid issuer in access token")
except Exception as e:
CrossauthLogger.logger().warn(j({"err": str(e)}))
return None