Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/north_mcp_python_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ def __init__(
name: str | None = None,
instructions: str | None = None,
server_secret: str | None = None,
trusted_issuer_urls: list[str] | None = None,
auth_server_provider: OAuthAuthorizationServerProvider[Any, Any, Any]
| None = None,
debug: bool | None = None,
**settings: Any,
):
super().__init__(name, instructions, auth_server_provider, **settings)
self._server_secret = server_secret

self._trusted_issuer_urls = trusted_issuer_urls

# Auto-enable debug mode from environment variable if not explicitly set
if debug is None:
self._debug = is_debug_mode()
else:
self._debug = debug

# Configure logging for debug mode
if self._debug:
logging.basicConfig(
Expand Down
103 changes: 80 additions & 23 deletions src/north_mcp_python_sdk/auth.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import base64
import contextvars
import json
import logging
import urllib.request

import jwt
from jwt import PyJWKClient
from pydantic import BaseModel, Field, ValidationError
from starlette.authentication import (
AuthCredentials,
Expand Down Expand Up @@ -71,10 +74,17 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send):

user = scope.get("user")
if not isinstance(user, AuthenticatedNorthUser):
self.logger.debug("Authentication failed: user not found in context. User type: %s", type(user))
self.logger.debug(
"Authentication failed: user not found in context. User type: %s",
type(user),
)
raise AuthenticationError("user not found in context")

self.logger.debug("Setting authenticated user in context: email=%s, connectors=%s", user.email, list(user.connector_access_tokens.keys()))
self.logger.debug(
"Setting authenticated user in context: email=%s, connectors=%s",
user.email,
list(user.connector_access_tokens.keys()),
)

token = auth_context_var.set(user)
try:
Expand All @@ -88,8 +98,14 @@ class NorthAuthBackend(AuthenticationBackend):
Authentication backend that validates Bearer tokens.
"""

def __init__(self, server_secret: str | None = None, debug: bool = False):
def __init__(
self,
server_secret: str | None = None,
trusted_issuer_urls: list[str] | None = None,
debug: bool = False,
):
self._server_secret = server_secret
self._trusted_issuer_urls = trusted_issuer_urls
self.debug = debug
self.logger = logging.getLogger("NorthMCP.Auth")
if debug:
Expand Down Expand Up @@ -122,8 +138,15 @@ async def authenticate(

try:
tokens = AuthHeaderTokens.model_validate_json(decoded_auth_header)
self.logger.debug("Successfully parsed auth tokens. Has server_secret: %s, Has user_id_token: %s, Connector count: %d", tokens.server_secret is not None, tokens.user_id_token is not None, len(tokens.connector_access_tokens))
self.logger.debug("Available connectors: %s", list(tokens.connector_access_tokens.keys()))
self.logger.debug(
"Successfully parsed auth tokens. Has server_secret: %s, Has user_id_token: %s, Connector count: %d",
tokens.server_secret is not None,
tokens.user_id_token is not None,
len(tokens.connector_access_tokens),
)
self.logger.debug(
"Available connectors: %s", list(tokens.connector_access_tokens.keys())
)
except ValidationError as e:
self.logger.debug("Failed to validate auth tokens: %s", e)
raise AuthenticationError("unable to decode bearer token")
Expand All @@ -132,27 +155,61 @@ async def authenticate(
self.logger.debug("Server secret mismatch - access denied")
raise AuthenticationError("access denied")

if tokens.user_id_token:
try:
user_id_token = jwt.decode(
jwt=tokens.user_id_token,
verify=False,
options={"verify_signature": False},
if not tokens.user_id_token:
self.logger.debug("Authentication successful without user ID token")
return AuthCredentials(), AuthenticatedNorthUser(
connector_access_tokens=tokens.connector_access_tokens,
)

try:
decoded_token = jwt.decode(
jwt=tokens.user_id_token,
options={"verify_signature": False},
)

if self._trusted_issuer_urls:
self._verify_token_signature(
raw_token=tokens.user_id_token,
decoded_token=decoded_token,
)

email = user_id_token.get("email")

self.logger.debug("Successfully decoded user ID token. Email: %s", email)
email = decoded_token.get("email")
self.logger.debug("Successfully decoded user ID token. Email: %s", email)
return AuthCredentials(), AuthenticatedNorthUser(
connector_access_tokens=tokens.connector_access_tokens, email=email
)
except Exception as e:
self.logger.debug("Failed to decode user ID token: %s", e)
raise AuthenticationError("invalid user id token")

return AuthCredentials(), AuthenticatedNorthUser(
connector_access_tokens=tokens.connector_access_tokens, email=email
)
except Exception as e:
self.logger.debug("Failed to decode user ID token: %s", e)
raise AuthenticationError("invalid user id token")
def _verify_token_signature(self, raw_token: str, decoded_token: dict) -> None:
self.logger.debug("Verifying user ID token signature against trusted issuers")
issuer = decoded_token.get("iss")
if not issuer:
raise Exception("user id token issuer not found in token")

self.logger.debug("Authentication successful without user ID token")
if issuer not in self._trusted_issuer_urls:
raise Exception("user id token issuer not trusted: %s" % issuer)

return AuthCredentials(), AuthenticatedNorthUser(
connector_access_tokens=tokens.connector_access_tokens,
openid_config_req = urllib.request.Request(
url=issuer.rstrip("/") + "/.well-known/openid-configuration"
)
with urllib.request.urlopen(openid_config_req) as response:
openid_config = json.load(response)

unverified_header = jwt.get_unverified_header(jwt=raw_token)
jwks_client = PyJWKClient(openid_config["jwks_uri"], cache_keys=True)
kid, algorithm = unverified_header.get("kid"), unverified_header.get(
"alg", "RS256"
)
if not kid:
raise Exception("user id token header 'kid' not found")

# This will raise an exception if the signature is invalid
jwt.decode(
jwt=raw_token,
key=jwks_client.get_signing_key(kid).key,
algorithms=[algorithm],
issuer=self._trusted_issuer_urls,
options={"verify_signature": True, "verify_aud": False},
)