cosmoguard-bd/src/pif_compiler/api/routes/api_auth.py
2026-03-13 23:48:34 +01:00

114 lines
3.4 KiB
Python

import os
import httpx
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, EmailStr
from pif_compiler.functions.auth import get_current_user
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
router = APIRouter()
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
SUPABASE_SECRET_KEY = os.getenv("SUPABASE_SECRET_KEY", "")
_SUPABASE_HEADERS = {
"apikey": SUPABASE_SECRET_KEY,
"Content-Type": "application/json",
}
class LoginRequest(BaseModel):
email: EmailStr
password: str
class RefreshRequest(BaseModel):
refresh_token: str
def _supabase_auth_url(path: str) -> str:
return f"{SUPABASE_URL}/auth/v1/{path}"
@router.post("/auth/login", tags=["Auth"])
async def login(request: LoginRequest):
"""Autentica con email e password. Ritorna access_token e refresh_token."""
async with httpx.AsyncClient() as client:
resp = await client.post(
_supabase_auth_url("token?grant_type=password"),
headers=_SUPABASE_HEADERS,
json={"email": request.email, "password": request.password},
timeout=10,
)
if resp.status_code == 400:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Credenziali non valide"
)
if resp.status_code != 200:
logger.error(f"login: Supabase HTTP {resp.status_code}{resp.text}")
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Errore nel servizio di autenticazione"
)
data = resp.json()
logger.info(f"login: accesso di {request.email}")
return {
"access_token": data["access_token"],
"refresh_token": data["refresh_token"],
"expires_in": data["expires_in"],
"token_type": "bearer",
}
@router.post("/auth/refresh", tags=["Auth"])
async def refresh(request: RefreshRequest):
"""Rinnova l'access_token con il refresh_token."""
async with httpx.AsyncClient() as client:
resp = await client.post(
_supabase_auth_url("token?grant_type=refresh_token"),
headers=_SUPABASE_HEADERS,
json={"refresh_token": request.refresh_token},
timeout=10,
)
if resp.status_code != 200:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token non valido o scaduto — effettua nuovamente il login"
)
data = resp.json()
return {
"access_token": data["access_token"],
"refresh_token": data["refresh_token"],
"expires_in": data["expires_in"],
"token_type": "bearer",
}
@router.get("/auth/me", tags=["Auth"])
async def get_me(user: dict = Depends(get_current_user)):
"""Ritorna le informazioni dell'utente autenticato dal JWT."""
meta = user.get("user_metadata", {})
return {
"id": user.get("sub"),
"email": user.get("email"),
"name": meta.get("full_name") or meta.get("name"),
}
@router.post("/auth/logout", tags=["Auth"])
async def logout(request: RefreshRequest):
"""Invalida la sessione su Supabase tramite il refresh_token."""
async with httpx.AsyncClient() as client:
await client.post(
_supabase_auth_url("logout"),
headers=_SUPABASE_HEADERS,
json={"refresh_token": request.refresh_token},
timeout=10,
)
return {"success": True}