Browse Source

first commit

master
Matteo Benedetto 6 months ago
commit
142f3aff25
  1. 129
      .gitignore
  2. 1
      .nicegui/storage-user-09afbe0e-d465-433a-b76f-a01a34286165.json
  3. 1
      .nicegui/storage-user-39a9eceb-b117-42d5-b1cb-7fa440d27cbf.json
  4. 1
      .nicegui/storage-user-81a5b05b-9d4a-47e5-99eb-578460741179.json
  5. 236
      README.md
  6. 0
      __init__.py
  7. 24
      app.py
  8. 1
      middlewares/__init__.py
  9. 82
      middlewares/access_middleware.py
  10. 44
      middlewares/auth_middleware.py
  11. 2
      pages/__init__.py
  12. 86
      pages/auth_callback.py
  13. 27
      pages/login.py
  14. 29
      pages/main_page.py
  15. 6
      pages/subpage.py
  16. 7
      requirements.txt
  17. 1
      services/auth/__init__.py
  18. 253
      services/auth/oidc.py
  19. 7
      services/log/__init__.py
  20. 25
      services/log/access_logger.py
  21. 9
      services/log/basic_logger.py
  22. 0
      static/.gitkeep
  23. 0
      tests/__init__.py
  24. 1
      tests/conftest.py
  25. 46
      tests/test_authentication.py

129
.gitignore vendored

@ -0,0 +1,129 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

1
.nicegui/storage-user-09afbe0e-d465-433a-b76f-a01a34286165.json

@ -0,0 +1 @@
{"id_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6ImI0MzViMTNmLTc2YzUtNDFkMi05N2M0LTVhNTAzMjRlYTFkYiJ9.eyJpc3MiOiJodHRwOlwvXC9jbG91ZC5lbm5lMi5uZXQiLCJzdWIiOiJlbm5lMiIsImF1ZCI6IlBUVTdMMjlONXdzNkdBZmpZVmJkMHJ0Vk1hNm9saWFQeHVxRVVLNFE5NWpVRDFDSUwzdVgwelVCdklPVm01SHQiLCJleHAiOjE3NDk1OTMwOTAsImF1dGhfdGltZSI6MTc0OTU4ODM4OCwiaWF0IjoxNzQ5NTkyMTkwLCJhY3IiOiIwIiwiYXpwIjoiUFRVN0wyOU41d3M2R0FmallWYmQwcnRWTWE2b2xpYVB4dXFFVUs0UTk1alVEMUNJTDN1WDB6VUJ2SU9WbTVIdCIsInByZWZlcnJlZF91c2VybmFtZSI6ImVubmUyIiwic2NvcGUiOiJvcGVuaWQgcHJvZmlsZSBlbWFpbCIsIm5iZiI6MTc0OTU5MjE5MCwianRpIjoiNTYiLCJ1cGRhdGVkX2F0IjoxNzQ5NTg3MzQ4LCJuYW1lIjoiZW5uZTIiLCJmYW1pbHlfbmFtZSI6ImVubmUyIiwiZ2l2ZW5fbmFtZSI6IiIsIm1pZGRsZV9uYW1lIjoiIiwicGljdHVyZSI6Imh0dHA6XC9cL2Nsb3VkLmVubmUyLm5ldFwvYXZhdGFyXC9lbm5lMlwvNjQifQ.O6kk_HZZeCDRHXAYu3Q8W0UyPzjTR6avL1Olut2IJ8ODH8TaNsgv2YKrVdTq4w-MoXWUf-O9MLqW4Io_XzxKevYTSueoFPHMkEJfpVLuNYKxxcpPXY_RG7h1_i4qyx1uMaToZoulFBIwOQvStHtsnN0e73RVR-B1sYWNAIEwHov_M4ccMvfQxPh1hGQuPygVUKQh09sn6SXc0HQoP8DOkYd5paAlRL4MdH9K54w15hL90jB5GY3gJ7B-y_GMWOUnPRRge8aY8vmuDeyY5feM2RK-DRg19BIQ7_jhrr0dLMs9Itg2otavD2Re3A1qANuaaFUAPW9va7RAxmB0kZff-BzEsvBjqwq-MJmG3dU97GDD5aKoHuoXOJyb3ESE68WeR_JBGdgudImFg6d-WQQXnloTNog9SeGQcs0wGCDLjymDlHpcETJ5wx1-hDXWcRRPYWx-7ilCWqhFkW7dqK7H1snVzFA8H9PRpl3bZ1xyPkcvS0aoByU4uX06E1T9tMMEykp8S4lw6x3q8WC16G2xpy4tWYWJPkXruENWH-6civShy_0hfk_VbE48VnzD1EjFOuCTKsX2BHZnCqGCQddk6ZYYfyQQF6D4VSk0iRrifAZfcjxpXqHS1mY6tMWDvUfS3z-isSThX0fHQqnrQMNzaBjuGs57ecsaZP_p4wOVJpg","username":"enne2","email":null,"user_id":"enne2","authenticated":true,"access_token":"eyJ0eXAiOiJhdCtKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6ImI0MzViMTNmLTc2YzUtNDFkMi05N2M0LTVhNTAzMjRlYTFkYiJ9.eyJpc3MiOiJodHRwOlwvXC9jbG91ZC5lbm5lMi5uZXQiLCJzdWIiOiJlbm5lMiIsImF1ZCI6Imh0dHBzOlwvXC9ycy5sb2NhbFwvIiwiZXhwIjoxNzQ5NTkzMDkwLCJhdXRoX3RpbWUiOjE3NDk1ODgzODgsImlhdCI6MTc0OTU5MjE5MCwiYWNyIjoiMCIsImNsaWVudF9pZCI6IlBUVTdMMjlONXdzNkdBZmpZVmJkMHJ0Vk1hNm9saWFQeHVxRVVLNFE5NWpVRDFDSUwzdVgwelVCdklPVm01SHQiLCJzY29wZSI6Im9wZW5pZCBwcm9maWxlIGVtYWlsIiwianRpIjoiNTYifQ.c6e576Jv9_cxC2ScrBORsNkp2o7Fa7Js0yvw2xZFA_ZO8VLS-mBN3as51Sg90fQyyOjaBAiJUmPGwYa35ckdvGE1Au6IM23fyu0aIaBnfytvg3_Zfqs7mN2Ll0JnPC3Qs52S047fSF7kEol1s36UmD4o0mMZFosGQZZUunRPVWGk7Mg43185WRvGWJO9fJwt891lx2lGIfeFRAFsO0epuF-Km8C2730pIUeA63H1qIwaZDTkUh-DaIm2w231Mnfh7DEDzs3NoC33-v9lpLCRYIXzeVm0tyqLc0OAGYwliHDzR26Q6z6dPxTrJ1pz5UQnGlY_ecMdYfi34Ni_YGLFWKbEgjs_9k01w42ycloNsMQ9XH6ky77sR6cOOYRV4stORAW075NRItYWE-oPhiD_tNzWZJSG4kdWmU8c-yVjuiXb0KTAuFDiKJNrClhAh_33_aVydGw4GOVHCRgIHv_U0J99uOYJuIwej497qtnMjUaVpIiubJVK7ZoDo-szcTy3sm9exze4SbDLl1A1ObI4iQA7SrmfcNh-jwJL1d6shvXMI8_aYK1IbpWMToJgEpmkCG0J6IEK7u6dBraJ273X5ib5uxGtuSZkmogu0pWSDgcDDc38dwD4GPqOo3sMeeUO-Oz8pIDY7Y-DABA7ynGE1ZB454_K16CsvmNzsce226A","refresh_token":"006qL68FtoOHuKyCMQQHjZ3YjIWl3yZHyyn0Qu2xQyi2FeAT6mz1oyzHNaigtp1SKM0UL6Ag5zkNIBK2XbSfWKs4yAPTCrXEEDxSrv2se75vkHQwVjfkyN1bZ9VlwrxH","token_expires_at":1749589288.921118,"expires_in":900,"last_refreshed":1749592190.9409876}

1
.nicegui/storage-user-39a9eceb-b117-42d5-b1cb-7fa440d27cbf.json

@ -0,0 +1 @@
{}

1
.nicegui/storage-user-81a5b05b-9d4a-47e5-99eb-578460741179.json

@ -0,0 +1 @@
{"username":"enne2","email":null,"user_id":"enne2","authenticated":true,"access_token":"eyJ0eXAiOiJhdCtKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6ImI0MzViMTNmLTc2YzUtNDFkMi05N2M0LTVhNTAzMjRlYTFkYiJ9.eyJpc3MiOiJodHRwOlwvXC9jbG91ZC5lbm5lMi5uZXQiLCJzdWIiOiJlbm5lMiIsImF1ZCI6Imh0dHBzOlwvXC9ycy5sb2NhbFwvIiwiZXhwIjoxNzQ5MzIyMDc5LCJhdXRoX3RpbWUiOjE3NDkzMjExNzksImlhdCI6MTc0OTMyMTE3OSwiYWNyIjoiMCIsImNsaWVudF9pZCI6IlBUVTdMMjlONXdzNkdBZmpZVmJkMHJ0Vk1hNm9saWFQeHVxRVVLNFE5NWpVRDFDSUwzdVgwelVCdklPVm01SHQiLCJzY29wZSI6Im9wZW5pZCBwcm9maWxlIGVtYWlsIiwianRpIjoiMjcifQ.XVtOLQ1yld0LNm1s9-FWqLjpjlpKaaww9ypfUWDqyxbPccIJXev6jlM8E3XUJOC4jwDsN8lHLbjSrnrMOqnKrRH2xMM6Nbwm9dexA_qw81eQOkp931T66xEtf9XtsRkNRSssutteh_MjQDP1TvM-RRrKl5fXobsC72GpZxWbKRD7Ctems5AEBcfaLet1AqdE-yyUor1WmhGsYc-zpo13bcEdTSgPSI9SnTQ1KRwg8iyr7AdWJYgHz6_1FNySTOshA_AY6Vu0JohwXAOZTDSXSElrxDAJXZKc0avq_x3CaGSu6fcx9WRBvhF6xe4N1qh219NI7AmfJF3MxXHNJ8M271Kw7W0rsPQrs2P92YuugqaU9dRR3Q-DCN4LUu7tUHgpGyghv4KQhgP-i9B4mkygEjeS5z0yMA2mcrzPiQpqdOfmYi8hh5oHbyv1-KZDXuV8GdSlFtx0ydQZTYV_DsYWFzqbzoElCJsK4xF2NDWjJVyHT-60fa8BsOkOxcu-nxUSG1cgJvNGfbOxp6NcCrYw3Mz5JeUIFPqBD_3nEpxACHgp61BgybRwyeYZOsdkpttxxkhUtk_a9AWU4RoCVFXz4l3mECcXg8iUUtefDDR38TXudjbhzYMdNqOEo0gXgwVJ4cgFviKMAOtDEwxJ2a2RQeqcFGeyeZqv2XyYB8F9-zc","refresh_token":"lNzSuzoRExqlpXN53vcyFJmX1Dp0PsQngCjgKrBk3tSUPhQHk22u4EXybA2etkIrrnKcEX9LloUg5ZkYm6JF1N4u8XNq7XJWlsllRIv48cQe43MWJH7InSvxnvJGKrUQ","token_expires_at":1749322079.8674123}

236
README.md

@ -0,0 +1,236 @@
# NiceGUI OIDC Authentication Demo
A modern authentication system built with [NiceGUI](https://nicegui.io/) and FastAPI, demonstrating how to implement OpenID Connect (OIDC) authentication with session management and route protection.
## 🚀 What This App Does
This application demonstrates:
- **OIDC Authentication**: OpenID Connect integration with external identity providers
- **JWT Token Validation**: Proper signature verification using JWKS
- **Session Management**: Secure user sessions with automatic token refresh
- **Route Protection**: Middleware that restricts access to authenticated users only
- **Automatic Redirects**: Redirects users to OIDC provider and back to intended pages
- **Access Logging**: Comprehensive request logging with user context
## 📁 Project Structure
```
authentication/
├── services/
│ ├── auth/
│ │ └── oidc.py # OIDC configuration and token handling
│ └── log/
│ └── __init__.py # Logging configuration
├── pages/
│ ├── main_page.py # Main/home page
│ ├── subpage.py # Example protected page
│ ├── login.py # OIDC login initiation
│ └── auth_callback.py # OIDC callback handler
├── middlewares/
│ ├── auth_middleware.py # OIDC authentication middleware
│ └── access_middleware.py # Request logging middleware
├── routes/
│ └── logout.py # Logout functionality
├── tests/
│ └── test_authentication.py # Test suite
├── app.py # Main application
├── requirements.txt # Dependencies
└── README.md
```
## 🔧 How to Run
1. **Install dependencies**:
```bash
pip install -r requirements.txt
```
2. **Configure OIDC settings** in `services/auth/oidc.py`:
```python
self.client_id = "your-client-id"
self.client_secret = "your-client-secret"
self.discovery_url = "https://your-provider/.well-known/openid-configuration"
self.redirect_uri = "http://127.0.0.1:8080/auth/callback"
```
3. **Run the application**:
```bash
python app.py
```
4. **Access the app**:
- Open your browser to `http://localhost:8080`
- You'll be redirected to `/login`
- Click "Log in with OIDC" to authenticate
## 🧪 Running Tests
```bash
# Run all tests
pytest tests/
# Run with verbose output
pytest tests/ -v
```
## 🏗 How to Build a Similar OIDC App
### Step 1: OIDC Configuration
```python
class OIDCConfig:
def __init__(self):
self.client_id = "your-client-id"
self.client_secret = "your-client-secret"
self.discovery_url = "https://provider/.well-known/openid-configuration"
self.redirect_uri = "http://localhost:8080/auth/callback"
self.scope = "openid profile email"
```
### Step 2: Authentication Middleware
```python
class OIDCAuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
excluded_paths = {'/login', '/auth/callback'}
if request.url.path not in excluded_paths:
if not app.storage.user.get('authenticated', False):
app.storage.user['redirect_to'] = str(request.url.path)
return RedirectResponse('/login')
# Validate current token
access_token = app.storage.user.get('access_token')
if access_token and not oidc_config.validate_token(access_token):
app.storage.user.clear()
return RedirectResponse('/login')
return await call_next(request)
```
### Step 3: Login Page
```python
@ui.page('/login')
def login():
if app.storage.user.get('authenticated', False):
return RedirectResponse('/')
def initiate_oidc_login():
state = str(uuid.uuid4())
app.storage.user['oidc_state'] = state
auth_url = oidc_config.get_authorization_url(state)
ui.navigate.to(auth_url, new_tab=False)
with ui.card().classes('absolute-center'):
ui.label('Authentication Required')
ui.button('Log in with OIDC', on_click=initiate_oidc_login)
```
### Step 4: OIDC Callback Handler
```python
@ui.page('/auth/callback')
async def auth_callback(code: str = None, state: str = None):
# Verify state parameter
stored_state = app.storage.user.get('oidc_state')
if not stored_state or stored_state != state:
ui.notify('Invalid state parameter', color='negative')
return
# Exchange code for tokens
tokens = await oidc_config.exchange_code_for_tokens(code, oidc_config.redirect_uri)
user_info = oidc_config.validate_token(tokens['id_token'])
# Store user session
app.storage.user.update({
'username': user_info.get('preferred_username'),
'email': user_info.get('email'),
'authenticated': True,
'access_token': tokens['access_token'],
'refresh_token': tokens.get('refresh_token')
})
redirect_to = app.storage.user.get('redirect_to', '/')
ui.navigate.to(redirect_to)
```
### Step 5: Token Validation with JWKS
```python
def validate_token(self, token: str) -> Optional[Dict]:
try:
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get('kid')
jwks = self.get_jwks()
public_key = None
for key in jwks.get('keys', []):
if key.get('kid') == kid:
public_key = RSAAlgorithm.from_jwk(key)
break
payload = jwt.decode(
token, public_key, algorithms=['RS256'],
audience=self.client_id, options={"verify_signature": True}
)
return payload
except Exception as e:
logger.error(f"Token validation error: {e}")
return None
```
## 🔐 Security Features
✅ **Production-ready security:**
- **JWT signature verification** using JWKS from identity provider
- **State parameter protection** against CSRF attacks
- **Secure token storage** in encrypted sessions
- **Automatic token refresh** to maintain sessions
- **Proper logout** with identity provider notification
- **Request logging** with user context and IP tracking
- **HTTPS enforcement** for token endpoints
## 📚 Key Features Explained
### OIDC Flow
1. User accesses protected resource → redirected to `/login`
2. Login page redirects to OIDC provider with state parameter
3. User authenticates with identity provider
4. Provider redirects back to `/auth/callback` with authorization code
5. App exchanges code for access/ID tokens
6. ID token is validated using provider's public keys
7. User session is established with token storage
### Token Management
- **Access tokens** for API authorization
- **ID tokens** for user identity claims
- **Refresh tokens** for automatic session renewal
- **JWKS validation** ensures token authenticity
### Access Logging
Every request is logged with:
- Unique request ID
- Client IP address (proxy-aware)
- User information
- Response time and status
## 🛠 Supported OIDC Providers
This implementation works with any standard OIDC provider:
- **Keycloak**
- **Auth0**
- **Azure AD**
- **Google Identity**
- **Okta**
- **Custom OIDC providers**
## 🤝 Contributing
Feel free to submit issues, feature requests, or pull requests to improve this OIDC authentication example!
## 📄 License
This example is provided as-is for educational purposes.

0
__init__.py

24
app.py

@ -0,0 +1,24 @@
from services.log import basic_logger as logger
from nicegui import app, ui
from middlewares.access_middleware import AccessLoggingMiddleware
from middlewares.auth_middleware import OIDCAuthMiddleware
# Add middlewares in order: Access logging first, then auth
app.add_middleware(AccessLoggingMiddleware)
app.add_middleware(OIDCAuthMiddleware)
# Import pages after middleware setup
from pages import main_page, auth_callback, login, subpage
def main():
logger.info(f"Starting OIDC Authentication Demo with Access Logging")
# Configure app
ui.run(
port=8080,
storage_secret='your-secret-key-change-in-production',
title='OIDC Authentication Demo',
reload=False
)
if __name__ in {"__main__", "__mp_main__"}:
main()

1
middlewares/__init__.py

@ -0,0 +1 @@
"""Middlewares package for the authentication application."""

82
middlewares/access_middleware.py

@ -0,0 +1,82 @@
from services.log import access_logger
import time
from typing import Optional
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from nicegui import app
import uuid
def get_client_ip(request: Request) -> str:
"""Extract client IP address with X-Forwarded-For support"""
# Check X-Forwarded-For header first (for reverse proxies)
forwarded_for = request.headers.get('X-Forwarded-For')
if forwarded_for:
# Take the first IP in the chain (original client)
return forwarded_for.split(',')[0].strip()
# Check X-Real-IP header (nginx)
real_ip = request.headers.get('X-Real-IP')
if real_ip:
return real_ip.strip()
# Fall back to direct connection IP
return request.client.host if request.client else 'unknown'
def get_user_agent(request: Request) -> str:
"""Extract user agent string"""
return request.headers.get('User-Agent', 'unknown')
def get_current_user() -> Optional[str]:
"""Get current authenticated user"""
if app.storage.user.get('authenticated', False):
return app.storage.user.get('username', 'authenticated_user')
return None
class AccessLoggingMiddleware(BaseHTTPMiddleware):
"""Middleware to log all HTTP requests with client IP and user info"""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
request_id = str(uuid.uuid4()) # Generate a unique request ID
client_ip = get_client_ip(request)
user_agent = get_user_agent(request)
user = get_current_user()
# Process the request
response = await call_next(request)
# Calculate processing time
process_time = time.time() - start_time
# Log the access
log_message = (
f"{request.method} {request.url.path} "
f"{response.status_code} "
f"\"{user_agent}\""
)
access_logger.extra = {
'request_id': request_id,
'client_ip': client_ip,
'user_agent': user_agent,
'user': user if user else 'anonymous',
'process_time': f"{process_time:.3f}s",
}
# Use different log levels based on status code
if response.status_code >= 500:
access_logger.error(log_message)
elif response.status_code >= 400:
access_logger.warning(log_message)
else:
access_logger.info(log_message)
return response

44
middlewares/auth_middleware.py

@ -0,0 +1,44 @@
import logging
from nicegui import app
from fastapi import Request
from fastapi.responses import RedirectResponse
from starlette.middleware.base import BaseHTTPMiddleware
from services.auth.oidc import oidc_config
from .access_middleware import get_client_ip
from services.log import access_logger as logger
class OIDCAuthMiddleware(BaseHTTPMiddleware):
"""Middleware to handle OIDC authentication"""
async def dispatch(self, request: Request, call_next):
# Skip authentication for login, callback, and internal routes
excluded_paths = {'/login', '/auth/callback'}
if (request.url.path.startswith('/_nicegui') or
request.url.path in excluded_paths):
logger.debug(f"Skipping auth for excluded path: {request.url.path}")
return await call_next(request)
# Check if user is authenticated
if not app.storage.user.get('authenticated', False):
client_ip = get_client_ip(request)
logger.info(f"Unauthenticated access from {client_ip} to {request.url.path}, redirecting to login")
# Store the original URL for redirect after login
app.storage.user['redirect_to'] = str(request.url.path)
return RedirectResponse('/login')
# Validate current token
access_token = app.storage.user.get('access_token')
if access_token:
payload = oidc_config.validate_token(access_token)
if not payload:
client_ip = get_client_ip(request)
user = app.storage.user.get('username', 'unknown')
logger.warning(f"Invalid/expired token for user {user} from {client_ip} accessing {request.url.path}, clearing session")
# Token is invalid/expired, clear session
app.storage.user.clear()
return RedirectResponse('/login')
else:
logger.debug(f"Valid token for user accessing {request.url.path}")
return await call_next(request)

2
pages/__init__.py

@ -0,0 +1,2 @@
"""Pages package for the authentication application."""
from . import *

86
pages/auth_callback.py

@ -0,0 +1,86 @@
from services.log import access_logger as logger
from nicegui import app, ui
from fastapi import Request
from services.auth.oidc import oidc_config
import time
@ui.page('/auth/callback')
async def auth_callback(request: Request, code: str = None, state: str = None, error: str = None, redirect_to: str = "/") -> None:
logger.info(f"Auth callback received - code: {'present' if code else 'missing'}, state: {state}, error: {error}")
if error:
logger.error(f"Authentication error: {error}")
ui.notify(f'Authentication failed: {error}', color='negative')
ui.label(f'Authentication failed: {error}').classes('text-red-500')
#ui.navigate.to('/login')
return
if not code or not state:
logger.error(f"Missing callback parameters - code: {'present' if code else 'missing'}, state: {'present' if state else 'missing'}")
ui.notify('Invalid callback parameters', color='negative')
ui.label('Invalid callback parameters').classes('text-red-500')
#ui.navigate.to('/login')
return
# Verify state parameter to prevent CSRF attacks
stored_state = app.storage.user.get('oidc_state')
state = app.storage.user.get('oidc_state', 'not set')
# For debugging purposes, you can display the state parameter
ui.label(f'OIDC state: {state}')
if not stored_state or stored_state != state:
logger.error(f"State parameter mismatch - expected: {stored_state}, received: {state}")
ui.notify('Invalid state parameter', color='negative')
ui.label('Invalid state parameter').classes('text-red-500')
#ui.navigate.to('/login')
return
logger.info("State parameter verified successfully")
try:
# Exchange code for tokens
logger.info("Exchanging authorization code for tokens")
tokens = await oidc_config.exchange_code_for_tokens(code, redirect_uri=oidc_config.redirect_uri)
# Validate and decode ID token to get user info
logger.info("Validating ID token")
user_info = oidc_config.validate_token(tokens['id_token'])
if not user_info:
logger.error("Invalid ID token received")
ui.notify('Invalid ID token', color='negative')
#ui.navigate.to('/login')
return
app.storage.user['id_token'] = tokens['id_token']
logger.info("ID token validated successfully")
user_id = user_info.get('sub')
username = user_info.get('preferred_username', user_info.get('email', 'Unknown'))
email = user_info.get('email')
logger.info(f"User authenticated successfully - ID: {user_id}, Username: {username}, Email: {email}")
# Store user session
app.storage.user.update({
'username': username,
'email': email,
'user_id': user_id,
'authenticated': True,
'access_token': tokens['access_token'],
'refresh_token': tokens.get('refresh_token'),
'token_expires_at': time.time() + tokens.get('expires_in', 3600)
})
# Start token refresh timer
logger.info(f"Starting token refresh timer for user {user_id}")
# Redirect to original destination or home
redirect_to = app.storage.user.get('redirect_to', '/')
app.storage.user.pop('redirect_to', None) # Clean up
app.storage.user.pop('oidc_state', None) # Clean up
logger.info(f"Authentication complete for user {username}, redirecting to: {redirect_to}")
ui.navigate.to(redirect_to)
except Exception as e:
logger.error(f"Authentication failed with exception: {str(e)}", exc_info=True)
ui.notify(f'Authentication failed: {str(e)}', color='negative')
#ui.navigate.to('/login')

27
pages/login.py

@ -0,0 +1,27 @@
from typing import Optional
from fastapi.responses import RedirectResponse
from nicegui import app, ui
from services.auth.oidc import oidc_config
import uuid
@ui.page('/login')
def login() -> Optional[RedirectResponse]:
if app.storage.user.get('authenticated', False):
return RedirectResponse('/')
def initiate_oidc_login():
# Generate state parameter for security
state = str(uuid.uuid4())
app.storage.user['oidc_state'] = state
# Get authorization URL and redirect
auth_url = oidc_config.get_authorization_url(state)
ui.navigate.to(auth_url, new_tab=False)
with ui.card().classes('absolute-center'):
ui.label('Authentication Required').classes('text-xl mb-4')
ui.label('Click below to login with your OIDC provider').classes('mb-4')
ui.button('Log in with OIDC', on_click=initiate_oidc_login).classes('w-full')
return None

29
pages/main_page.py

@ -0,0 +1,29 @@
from nicegui import app, ui
from services.auth.oidc import oidc_config
from fastapi import Request
from services.log import access_logger as logger
@ui.page('/')
def main_page(request: Request = None) -> None:
async def logout() -> None:
user_id = app.storage.user.get('user_id')
logger.info(f"User logout initiated")
if oidc_config.logout_user():
ui.navigate.to('/login')
else:
ui.notify('Logout failed', color='negative')
logger.error("Logout failed")
with ui.column().classes('absolute-center items-center'):
username = app.storage.user.get('username', 'Unknown')
email = app.storage.user.get('email', '')
ui.timer(10, oidc_config.refresh_access_token, immediate=False)
ui.label(f'Hello {username}!').classes('text-2xl')
if email:
ui.label(f'Email: {email}').classes('text-sm text-gray-600')
ui.button(on_click=logout, icon='logout').props('outline round')

6
pages/subpage.py

@ -0,0 +1,6 @@
from nicegui import ui
@ui.page('/subpage')
def test_page() -> None:
ui.label('This is a sub page.')

7
requirements.txt

@ -0,0 +1,7 @@
nicegui<2.20
fastapi
pytest
starlette
python-jose[cryptography]
requests
python-multipart

1
services/auth/__init__.py

@ -0,0 +1 @@
"""Authentication package for OIDC integration."""

253
services/auth/oidc.py

@ -0,0 +1,253 @@
import time
import os
from typing import Dict, Optional, Any
import requests
import jwt
from jwt.algorithms import RSAAlgorithm
from nicegui import app
from services.log import access_logger as logger, basic_logger
import urllib.parse
class OIDCConfig:
def __init__(self):
# Configure these for your OIDC provider
self.client_id = "PTU7L29N5ws6GAfjYVbd0rtVMa6oliaPxuqEUK4Q95jUD1CIL3uX0zUBvIOVm5Ht"
self.client_secret = "E0HRxJRDGJtdDueEulvA6Y46oNco0gkaw75a2cRUPFTdVRjQ7RhLPXg3PRfIJ3N2"
self.discovery_url = "https://cloud.enne2.net/index.php/.well-known/openid-configuration"
self.redirect_uri = "http://127.0.0.1:8080/auth/callback"
self.scope = "openid profile email"
# Cache for OIDC configuration
self._config_cache = None
self._jwks_cache = None
basic_logger.info(f"OIDC Config initialized with discovery URL: {self.discovery_url}")
def get_oidc_config(self) -> Dict:
"""Fetch OIDC configuration from discovery endpoint"""
if not self._config_cache:
basic_logger.info(f"Fetching OIDC configuration from {self.discovery_url}")
try:
response = requests.get(self.discovery_url)
response.raise_for_status()
self._config_cache = response.json()
basic_logger.info("OIDC configuration fetched successfully")
except Exception as e:
basic_logger.error(f"Failed to fetch OIDC configuration: {e}")
raise
return self._config_cache
def get_jwks(self) -> Dict:
"""Fetch JSON Web Key Set for token validation"""
if not self._jwks_cache:
config = self.get_oidc_config()
jwks_uri = config['jwks_uri']
basic_logger.info(f"Fetching JWKS from {jwks_uri}")
try:
response = requests.get(jwks_uri)
response.raise_for_status()
self._jwks_cache = response.json()
basic_logger.info("JWKS fetched successfully")
except Exception as e:
basic_logger.error(f"Failed to fetch JWKS: {e}")
raise
return self._jwks_cache
def get_authorization_url(self, state: str) -> str:
"""Generate authorization URL for OIDC login"""
config = self.get_oidc_config()
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': self.scope,
'state': state,
}
auth_url = f"{config['authorization_endpoint']}?{urllib.parse.urlencode(params)}"
logger.info(f"Generated authorization URL with state: {state}")
return auth_url
async def exchange_code_for_tokens(self, code: str, redirect_uri: str) -> Optional[Dict[str, Any]]:
"""Exchange authorization code for access and ID tokens"""
try:
data = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': redirect_uri
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
# Use HTTPS for the token endpoint
config = self.get_oidc_config()
token_url = config.get('token_endpoint', 'http://example.com/token').replace('http://', 'https://')
logger.debug(f"Token exchange request data: {data}")
logger.debug(f"Token exchange request URL: {token_url}")
logger.debug(f"Token exchange request headers: {headers}")
response = requests.post(
token_url,
data=data,
headers=headers,
timeout=30
)
logger.debug(f"Token exchange response status: {response.status_code}")
logger.debug(f"Token exchange response content: {response.text}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to exchange code for tokens: {e}")
return None
def refresh_access_token(self) -> Dict:
"""Refresh access token using refresh token"""
last_refreshed = app.storage.user.get('last_refreshed', 0)
if time.time() - last_refreshed < os.getenv('TOKEN_REFRESH_MINIMUM_INTERVAL', 300):
#logger.info("Access token recently refreshed, skipping refresh")
return app.storage.user.get('access_token', {})
config = self.get_oidc_config()
refresh_token = app.storage.user.get('refresh_token')
logger.info("Refreshing access token")
data = {
'grant_type': 'refresh_token',
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': refresh_token,
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
token_url = config.get('token_endpoint', 'http://example.com/token').replace('http://', 'https://')
try:
response = requests.post(
token_url,
data=data,
headers=headers,
timeout=30
)
response.raise_for_status()
tokens = response.json()
data = {
'access_token': tokens.get('access_token'),
'refresh_token': tokens.get('refresh_token'),
'expires_in': tokens.get('expires_in'),
'id_token': tokens.get('id_token'),
'last_refreshed': time.time(),
}
# Update user session with new tokens
app.storage.user.update(data)
logger.info("Successfully refreshed access token")
return tokens
except Exception as e:
logger.error(f"Failed to refresh access token: {e}")
raise
def validate_token(self, token: str) -> Optional[Dict]:
"""Validate and decode JWT token with proper signature verification"""
try:
# Get the token header to find the key ID
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get('kid')
if not kid:
logger.warning("No key ID found in token header")
return None
# Get JWKS and find the matching key
jwks = self.get_jwks()
public_key = None
for key in jwks.get('keys', []):
if key.get('kid') == kid:
# Convert JWK to PEM format
public_key = RSAAlgorithm.from_jwk(key)
break
if not public_key:
logger.warning(f"No matching public key found for kid: {kid}")
return None
# Verify and decode the token
payload = jwt.decode(
token,
public_key,
algorithms=['RS256'],
audience=self.client_id,
options={
"verify_signature": True,
"verify_exp": True,
"verify_aud": False,
"verify_iss": True
}
)
logger.debug("Token validated successfully with signature verification")
return payload
except jwt.ExpiredSignatureError:
logger.warning("Token has expired")
return None
except jwt.InvalidAudienceError:
logger.warning("Invalid token audience")
return None
except jwt.InvalidIssuerError:
logger.warning("Invalid token issuer")
return None
except jwt.InvalidTokenError as e:
logger.warning(f"Invalid token: {e}")
return None
except Exception as e:
logger.error(f"Token validation error: {e}")
return None
def logout_user(self) -> bool:
"""Clear user session and call api to log out"""
config = self.get_oidc_config()
logout_url = config.get('end_session_endpoint')
if not logout_url:
logger.error("No end session endpoint found in OIDC configuration")
# Still clear local session even if remote logout fails
app.storage.user.clear()
return True
# Prepare logout request
user_info = app.storage.user.get('user_info', {})
user_id = user_info.get('sub') or user_info.get('preferred_username', 'unknown')
params = {
'id_token_hint': app.storage.user.get('id_token'),
'client_id': self.client_id,
'post_logout_redirect_uri': self.redirect_uri.replace('/auth/callback', '/login'),
}
# Remove None values
params = {k: v for k, v in params.items() if v is not None}
logout_url = f"{logout_url}?{urllib.parse.urlencode(params)}"
try:
response = requests.get(logout_url, timeout=10)
response.raise_for_status()
logger.info(f"User {user_id} logged out successfully from OIDC provider")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to log out user {user_id} from OIDC provider: {e}")
# Continue with local logout even if remote logout fails
# Clear user session
app.storage.user.clear()
logger.info(f"Local session cleared for user {user_id}")
return True
# Global OIDC config instance
oidc_config = OIDCConfig()

7
services/log/__init__.py

@ -0,0 +1,7 @@
"""Logging package for the authentication application."""
from .basic_logger import basic_logger
from .access_logger import access_logger
# Make loggers available at package level
__all__ = ['basic_logger', 'access_logger']

25
services/log/access_logger.py

@ -0,0 +1,25 @@
import logging
# Access logging configuration
access_formatter = logging.Formatter(
'%(asctime)s - %(request_id)s - %(module)s - %(levelname)s - %(message)s srcip: %(client_ip)s, user: %(user)s'
)
access_handler = logging.StreamHandler()
access_handler.setFormatter(access_formatter)
access_logger = logging.getLogger('access')
access_logger.addHandler(access_handler)
access_logger.setLevel(logging.INFO)
access_logger.propagate = False
# Default extra context for access logger
extra = {
'request_id': 'unknown',
'client_ip': 'unknown',
'user_agent': 'unknown',
'user': 'anonymous',
'process_time': '0.000s'
}
access_logger = logging.LoggerAdapter(access_logger, extra)

9
services/log/basic_logger.py

@ -0,0 +1,9 @@
import logging
# Set up basic logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
basic_logger = logging.getLogger('basic')

0
static/.gitkeep

0
tests/__init__.py

1
tests/conftest.py

@ -0,0 +1 @@
pytest_plugins = ['nicegui.testing.plugin']

46
tests/test_authentication.py

@ -0,0 +1,46 @@
import pytest
from unittest.mock import patch, MagicMock
from nicegui.testing import User
from app import main
# pylint: disable=missing-function-docstring
@pytest.mark.module_under_test(main)
async def test_login_redirect_to_oidc(user: User) -> None:
"""Test that login page redirects to OIDC provider"""
await user.open('/')
await user.should_see('Log in with OIDC')
@pytest.mark.module_under_test(main)
async def test_subpage_access_redirect(user: User) -> None:
"""Test that protected pages redirect to login"""
await user.open('/subpage')
await user.should_see('Authentication Required')
@pytest.mark.module_under_test(main)
@patch('auth.oidc.oidc_config.exchange_code_for_tokens')
@patch('auth.oidc.oidc_config.validate_token')
async def test_oidc_callback_success(mock_validate, mock_exchange, user: User) -> None:
"""Test successful OIDC callback"""
# Mock successful token exchange
mock_exchange.return_value = {
'access_token': 'fake_access_token',
'id_token': 'fake_id_token',
'refresh_token': 'fake_refresh_token',
'expires_in': 3600
}
# Mock successful token validation
mock_validate.return_value = {
'sub': 'user123',
'preferred_username': 'testuser',
'email': 'test@example.com',
'exp': 9999999999 # Far future
}
# Simulate OIDC callback
await user.open('/auth/callback?code=test_code&state=test_state')
# Note: This test would need to be adapted based on your OIDC flow
Loading…
Cancel
Save