86 lines
2.7 KiB
Python
86 lines
2.7 KiB
Python
"""
|
|
Middleware / dependency di autenticazione per FastAPI (servizio ML).
|
|
Verifica il JWT firmato da auth.mebboat.it (JWT_SECRET condiviso).
|
|
Supporta cookie `auth_token` (SSO via .mebboat.it) e header Authorization: Bearer <jwt>.
|
|
|
|
Il cookie auth_token è condiviso tra i sottodomini grazie a domain=.mebboat.it:
|
|
- console.mebboat.it imposta il cookie al login
|
|
- ml.mebboat.it lo riceve automaticamente dal browser
|
|
|
|
Uso:
|
|
from core.auth import require_auth, require_internal
|
|
|
|
@app.get("/protected")
|
|
async def protected_route(user = Depends(require_auth)):
|
|
return {"user": user}
|
|
"""
|
|
|
|
import os
|
|
from typing import Optional
|
|
|
|
import jwt
|
|
from fastapi import Cookie, Header, HTTPException, Request, status
|
|
|
|
SECRET = os.environ.get("JWT_SECRET")
|
|
INTERNAL_KEY = os.environ.get("INTERNAL_API_KEY")
|
|
|
|
|
|
def _verify(token: Optional[str]):
|
|
"""Verifica e decodifica un JWT. Ritorna il payload o None."""
|
|
if not token or not isinstance(token, str) or len(token) > 2048:
|
|
return None
|
|
try:
|
|
payload = jwt.decode(token, SECRET, algorithms=["HS256"])
|
|
return {
|
|
"user_id": payload.get("sub"),
|
|
"username": payload.get("username"),
|
|
"session_id": payload.get("session_id"),
|
|
"iat": payload.get("iat"),
|
|
"exp": payload.get("exp"),
|
|
}
|
|
except jwt.PyJWTError:
|
|
return None
|
|
|
|
|
|
async def require_auth(
|
|
request: Request,
|
|
auth_token: Optional[str] = Cookie(default=None),
|
|
authorization: Optional[str] = Header(default=None),
|
|
x_api_key: Optional[str] = Header(default=None),
|
|
):
|
|
"""
|
|
FastAPI dependency: accetta utente loggato (cookie/bearer) o chiamata interna.
|
|
Uso: `user = Depends(require_auth)`.
|
|
|
|
Il cookie auth_token arriva automaticamente dal browser se l'utente
|
|
ha effettuato il login su auth.mebboat.it (dominio .mebboat.it).
|
|
"""
|
|
# Service-to-service
|
|
if x_api_key and INTERNAL_KEY and x_api_key == INTERNAL_KEY:
|
|
request.state.internal = True
|
|
return {"internal": True}
|
|
|
|
# Bearer token
|
|
bearer = None
|
|
if authorization and authorization.startswith("Bearer "):
|
|
bearer = authorization[7:]
|
|
|
|
user = _verify(auth_token or bearer)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="unauthorized",
|
|
)
|
|
request.state.user = user
|
|
return user
|
|
|
|
|
|
async def require_internal(x_api_key: Optional[str] = Header(default=None)):
|
|
"""FastAPI dependency: solo chiamate service-to-service con x-api-key."""
|
|
if not INTERNAL_KEY or x_api_key != INTERNAL_KEY:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="forbidden",
|
|
)
|
|
return True
|