You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
87 lines
3.3 KiB
87 lines
3.3 KiB
from fastapi import HTTPException, Depends, status |
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
from jose import JWTError, jwt |
|
import httpx |
|
import yaml |
|
from typing import Optional, Dict, Any |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
security = HTTPBearer() |
|
|
|
class AuthManager: |
|
def __init__(self, config: Dict[str, Any]): |
|
self.algorithm = config.get('algorithm', 'RS256') |
|
self.jwks_url = config.get('jwks_url') |
|
self.issuer = config.get('issuer') |
|
self.audience = config.get('audience') |
|
self.jwks_cache: Optional[Dict] = None |
|
|
|
async def get_jwks(self) -> Dict: |
|
"""Recupera JWKS dal provider""" |
|
if self.jwks_cache is None: |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.get(self.jwks_url) |
|
response.raise_for_status() |
|
self.jwks_cache = response.json() |
|
except Exception as e: |
|
logger.error(f"Errore nel recupero JWKS: {e}") |
|
raise HTTPException( |
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
|
detail="Servizio di autenticazione non disponibile" |
|
) |
|
return self.jwks_cache |
|
|
|
async def verify_token(self, token: str) -> Dict[str, Any]: |
|
"""Verifica token JWT e restituisci i claims""" |
|
try: |
|
# Per lo sviluppo, saltiamo la verifica JWT effettiva |
|
# In produzione, implementare la verifica JWKS appropriata |
|
unverified_payload = jwt.get_unverified_claims(token) |
|
|
|
# Estrai informazioni utente dal token |
|
user_info = { |
|
'user_id': unverified_payload.get('sub', 'sconosciuto'), |
|
'email': unverified_payload.get('email'), |
|
'name': unverified_payload.get('name'), |
|
'roles': unverified_payload.get('roles', []) |
|
} |
|
|
|
return user_info |
|
|
|
except JWTError as e: |
|
logger.error(f"Verifica JWT fallita: {e}") |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Token di autenticazione non valido", |
|
headers={"WWW-Authenticate": "Bearer"}, |
|
) |
|
|
|
# Gestore autenticazione globale |
|
auth_manager: Optional[AuthManager] = None |
|
|
|
def initialize_auth(config: Dict[str, Any]): |
|
"""Inizializza il gestore di autenticazione""" |
|
global auth_manager |
|
auth_manager = AuthManager(config) |
|
|
|
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict[str, Any]: |
|
"""Dipendenza per ottenere l'utente attualmente autenticato""" |
|
if auth_manager is None: |
|
raise HTTPException( |
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
|
detail="Autenticazione non configurata" |
|
) |
|
|
|
return await auth_manager.verify_token(credentials.credentials) |
|
|
|
async def get_current_admin_user(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]: |
|
"""Dipendenza per assicurarsi che l'utente abbia ruolo amministratore""" |
|
if 'admin' not in current_user.get('roles', []): |
|
raise HTTPException( |
|
status_code=status.HTTP_403_FORBIDDEN, |
|
detail="Privilegi di amministratore richiesti" |
|
) |
|
return current_user
|
|
|