You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

87 lines
3.3 KiB

from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
import httpx
import yaml
from typing import Optional, Dict, Any
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer()
class AuthManager:
def __init__(self, config: Dict[str, Any]):
self.algorithm = config.get('algorithm', 'RS256')
self.jwks_url = config.get('jwks_url')
self.issuer = config.get('issuer')
self.audience = config.get('audience')
self.jwks_cache: Optional[Dict] = None
async def get_jwks(self) -> Dict:
"""Recupera JWKS dal provider"""
if self.jwks_cache is None:
try:
async with httpx.AsyncClient() as client:
response = await client.get(self.jwks_url)
response.raise_for_status()
self.jwks_cache = response.json()
except Exception as e:
logger.error(f"Errore nel recupero JWKS: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Servizio di autenticazione non disponibile"
)
return self.jwks_cache
async def verify_token(self, token: str) -> Dict[str, Any]:
"""Verifica token JWT e restituisci i claims"""
try:
# Per lo sviluppo, saltiamo la verifica JWT effettiva
# In produzione, implementare la verifica JWKS appropriata
unverified_payload = jwt.get_unverified_claims(token)
# Estrai informazioni utente dal token
user_info = {
'user_id': unverified_payload.get('sub', 'sconosciuto'),
'email': unverified_payload.get('email'),
'name': unverified_payload.get('name'),
'roles': unverified_payload.get('roles', [])
}
return user_info
except JWTError as e:
logger.error(f"Verifica JWT fallita: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token di autenticazione non valido",
headers={"WWW-Authenticate": "Bearer"},
)
# Gestore autenticazione globale
auth_manager: Optional[AuthManager] = None
def initialize_auth(config: Dict[str, Any]):
"""Inizializza il gestore di autenticazione"""
global auth_manager
auth_manager = AuthManager(config)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict[str, Any]:
"""Dipendenza per ottenere l'utente attualmente autenticato"""
if auth_manager is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Autenticazione non configurata"
)
return await auth_manager.verify_token(credentials.credentials)
async def get_current_admin_user(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""Dipendenza per assicurarsi che l'utente abbia ruolo amministratore"""
if 'admin' not in current_user.get('roles', []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Privilegi di amministratore richiesti"
)
return current_user