cosmoguard-bd/src/pif_compiler/functions/auth.py
2026-03-09 21:26:33 +01:00

70 lines
2.3 KiB
Python

import os
import jwt
from jwt import PyJWKClient
from fastapi import Request, HTTPException, status
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
# Client JWKS inizializzato una volta sola — caching automatico delle chiavi pubbliche
_jwks_client: PyJWKClient | None = None
def _get_jwks_client() -> PyJWKClient:
global _jwks_client
if _jwks_client is None:
if not SUPABASE_URL:
logger.error("SUPABASE_URL non configurato")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Configurazione auth mancante sul server"
)
_jwks_client = PyJWKClient(f"{SUPABASE_URL}/auth/v1/.well-known/jwks.json")
return _jwks_client
def verify_jwt(token: str) -> dict:
"""Verifica il JWT Supabase tramite JWKS (RS256/ES256). Ritorna il payload decodificato."""
try:
client = _get_jwks_client()
signing_key = client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256", "ES256"],
audience="authenticated",
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token scaduto"
)
except jwt.InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token non valido: {e}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"verify_jwt: errore inatteso — {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Errore nella verifica del token"
)
def get_current_user(request: Request) -> dict:
"""FastAPI dependency: estrae e verifica il JWT dall'header Authorization."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization header mancante o malformato"
)
token = auth_header.removeprefix("Bearer ")
return verify_jwt(token)