diff --git a/src/idpyoidc/server/client_authn.py b/src/idpyoidc/server/client_authn.py index 8a0c72da..49bda42e 100755 --- a/src/idpyoidc/server/client_authn.py +++ b/src/idpyoidc/server/client_authn.py @@ -4,6 +4,7 @@ from typing import Dict from typing import Optional from typing import Union +from urllib.parse import unquote from cryptojwt.exception import BadSignature from cryptojwt.exception import Invalid @@ -92,16 +93,17 @@ def is_usable( raise NotImplementedError() -def basic_authn(authorization_token: str): - if not authorization_token.startswith("Basic "): +def basic_authn(authorization_header: str, urldecode_client_id_secret=False): + if not authorization_header.startswith("Basic "): raise ClientAuthenticationError("Wrong type of authorization token") - _tok = as_bytes(authorization_token[6:]) - # Will raise ValueError type exception if not base64 encoded - _tok = base64.b64decode(_tok) - part = as_unicode(_tok).split(":", 1) + _tok = base64.b64decode(authorization_header[6:].encode("utf-8")) + part = _tok.decode("utf-8").split(":", 1) + if len(part) != 2: raise ValueError("Illegal token") + if urldecode_client_id_secret: + part = [unquote(p) for p in part] return dict(zip(["id", "secret"], part)) @@ -168,7 +170,9 @@ def _verify( endpoint=None, # Optional[Endpoint] **kwargs, ): - client_info = basic_authn(authorization_token) + kwargs = getattr(endpoint, "kwargs", {}) or {} + enable_oauth2_1 = kwargs.get("enable_oauth2_1", False) + client_info = basic_authn(authorization_token, urldecode_client_id_secret=enable_oauth2_1) _context = self.upstream_get("context") if _context.cdb[client_info["id"]]["client_secret"] == client_info["secret"]: return {"client_id": client_info["id"]}