""" Middleware / dependency di autenticazione per FastAPI (servizio ML). Verifica il JWT firmato da auth.mebboat.it (JWT_SECRET condiviso). Supporta cookie `auth_token` (SSO via .mebboat.it) e header Authorization: Bearer . Il cookie auth_token รจ condiviso tra i sottodomini grazie a domain=.mebboat.it: - console.mebboat.it imposta il cookie al login - ml.mebboat.it lo riceve automaticamente dal browser Uso: from core.auth import require_auth, require_internal @app.get("/protected") async def protected_route(user = Depends(require_auth)): return {"user": user} """ import os from typing import Optional import jwt from fastapi import Cookie, Header, HTTPException, Request, status SECRET = os.environ.get("JWT_SECRET") INTERNAL_KEY = os.environ.get("INTERNAL_API_KEY") def _verify(token: Optional[str]): """Verifica e decodifica un JWT. Ritorna il payload o None.""" if not token or not isinstance(token, str) or len(token) > 2048: return None try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) return { "user_id": payload.get("sub"), "username": payload.get("username"), "session_id": payload.get("session_id"), "iat": payload.get("iat"), "exp": payload.get("exp"), } except jwt.PyJWTError: return None async def require_auth( request: Request, auth_token: Optional[str] = Cookie(default=None), authorization: Optional[str] = Header(default=None), x_api_key: Optional[str] = Header(default=None), ): """ FastAPI dependency: accetta utente loggato (cookie/bearer) o chiamata interna. Uso: `user = Depends(require_auth)`. Il cookie auth_token arriva automaticamente dal browser se l'utente ha effettuato il login su auth.mebboat.it (dominio .mebboat.it). """ # Service-to-service if x_api_key and INTERNAL_KEY and x_api_key == INTERNAL_KEY: request.state.internal = True return {"internal": True} # Bearer token bearer = None if authorization and authorization.startswith("Bearer "): bearer = authorization[7:] user = _verify(auth_token or bearer) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="unauthorized", ) request.state.user = user return user async def require_internal(x_api_key: Optional[str] = Header(default=None)): """FastAPI dependency: solo chiamate service-to-service con x-api-key.""" if not INTERNAL_KEY or x_api_key != INTERNAL_KEY: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="forbidden", ) return True