Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions python/packages/autogen-studio/autogenstudio/web/auth/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
from .models import AuthConfig, User
from .providers import AuthProvider, FirebaseAuthProvider, GithubAuthProvider, MSALAuthProvider, NoAuthProvider

# Allowed JWT algorithms — explicitly pin to HS256 only to prevent algorithm confusion
# and reject the 'none' algorithm attack vector.
_ALLOWED_JWT_ALGORITHMS = ["HS256"]

# Minimum acceptable JWT secret length to resist brute-force attacks.
_MIN_JWT_SECRET_LENGTH = 32


class AuthManager:
"""
Expand All @@ -23,6 +30,11 @@ def __init__(self, config: AuthConfig):
"""Initialize the auth manager with configuration."""
self.config = config
self.provider = self._create_provider()
if config.jwt_secret and len(config.jwt_secret) < _MIN_JWT_SECRET_LENGTH:
logger.warning(
"JWT secret is shorter than the recommended minimum of "
f"{_MIN_JWT_SECRET_LENGTH} characters; consider using a longer secret."
)
logger.info(f"Initialized auth manager with provider: {config.type}")

def _create_provider(self) -> AuthProvider:
Expand All @@ -47,17 +59,41 @@ def create_token(self, user: User) -> str:
logger.warning("JWT secret not configured, using insecure token")
return "dummy_token_" + user.id

expiry = datetime.now(timezone.utc) + timedelta(minutes=self.config.token_expiry_minutes)
now = datetime.now(timezone.utc)
expiry = now + timedelta(minutes=self.config.token_expiry_minutes)
payload = {
"sub": user.id,
"name": user.name,
"email": user.email,
"provider": user.provider,
"roles": user.roles,
"iat": now,
"exp": expiry,
}
return jwt.encode(payload, self.config.jwt_secret, algorithm="HS256")

def _decode_token(self, token: str) -> Dict[str, Any]:
"""Decode and validate a JWT token with strict options.

Raises InvalidTokenException on any validation failure.
"""
if not self.config.jwt_secret:
raise InvalidTokenException()

decode_options: Dict[str, Any] = {
"verify_signature": True,
"require": ["sub", "exp", "iat"],
"verify_exp": True,
"verify_iat": True,
}

return jwt.decode(
token,
self.config.jwt_secret,
algorithms=_ALLOWED_JWT_ALGORITHMS,
options=decode_options,
)

async def authenticate_request(self, request: Request) -> User:
"""Authenticate a request and return user information."""
# Check if path should be excluded from auth
Expand All @@ -78,12 +114,12 @@ async def authenticate_request(self, request: Request) -> User:

try:
if not self.config.jwt_secret:
# For development with no JWT secret
logger.warning("JWT secret not configured, accepting all tokens")
return User(id="guestuser@gmail.com", name="Default User", provider="none")
# JWT secret is mandatory for authenticated providers; refuse the request.
logger.error("JWT secret not configured but authentication is required")
raise InvalidTokenException()

# Decode and validate JWT
payload = jwt.decode(token, self.config.jwt_secret, algorithms=["HS256"])
# Decode and validate JWT with strict algorithm pinning and claim checks.
payload = self._decode_token(token)

# Create User object from token payload
return User(
Expand All @@ -104,13 +140,13 @@ async def authenticate_request(self, request: Request) -> User:
def is_valid_token(self, token: str) -> bool:
"""Check if a JWT token is valid."""
if not self.config.jwt_secret:
return True # No validation in dev mode
return False # Refuse validation when no secret is configured

try:
jwt.decode(token, self.config.jwt_secret, algorithms=["HS256"])
self._decode_token(token)
return True
except jwt.ExpiredSignatureError:
logger.warning("Token has expired")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError, InvalidTokenException):
logger.warning("Token validation failed")
return False

@classmethod
Expand Down