learn-nginx-auth/main_auth.py
2025-11-21 13:47:16 -06:00

200 lines
6.7 KiB
Python
Executable file

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "fastapi",
# "uvicorn[standard]",
# "python-jose[cryptography]",
# "python-multipart",
# "rich",
# ]
# ///
from fastapi import FastAPI, Request, Response, HTTPException, Depends
from fastapi.responses import RedirectResponse, PlainTextResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
from jose import JWTError, jwt
import datetime
import os
import logging
from rich.logging import RichHandler
from rich.console import Console
# Configure rich console for consistent styling
console = Console()
app = FastAPI()
security = HTTPBasic()
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(name)-12s | %(levelname)-8s | %(message)s",
datefmt="%H:%M:%S",
handlers=[RichHandler(console=console, show_path=False, rich_tracebacks=True)]
)
logger = logging.getLogger("nginx-auth")
# Configure uvicorn loggers to use our format
uvicorn_error_logger = logging.getLogger("uvicorn.error")
uvicorn_error_logger.handlers = [RichHandler(console=console, show_path=False)]
# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-super-secure-secret-key-change-this-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
USERS = {
"admin": {"password": "admin", "role": "admin"},
"reader": {"password": "reader", "role": "reader"},
}
def create_access_token(data: dict, expires_delta: datetime.timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.datetime.now(datetime.UTC) + expires_delta
else:
expire = datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_jwt_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
def get_current_user(request: Request):
token = request.cookies.get("access_token")
if not token:
return None
username = verify_jwt_token(token)
if username and username in USERS:
return username
return None
def get_current_role(user: str):
return USERS[user]["role"]
@app.post("/login")
async def login(request: Request, credentials: HTTPBasicCredentials = Depends(security)):
user = credentials.username
pwd = credentials.password
client_ip = request.client.host if request.client else "unknown"
if user in USERS and secrets.compare_digest(USERS[user]['password'], pwd):
# Create JWT token
access_token_expires = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user, "role": USERS[user]["role"]},
expires_delta=access_token_expires
)
logger.info(f"🔑 LOGIN SUCCESS | {user}({USERS[user]['role']}) | {client_ip}")
resp = Response("OK", status_code=200)
resp.set_cookie(
"access_token",
access_token,
httponly=True,
secure=False, # Set to True in production with HTTPS
samesite='lax',
path="/",
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
# Ensure login response isn't cached
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0"
resp.headers["Pragma"] = "no-cache"
return resp
logger.info(f"🔑 LOGIN FAILED | {user} | {client_ip}")
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.get("/logout")
def logout(request: Request):
client_ip = request.client.host if request.client else "unknown"
# Try to get current user for logging
token = request.cookies.get("access_token")
username = "unknown"
if token:
username = verify_jwt_token(token) or "unknown"
logger.info(f"👋 LOGOUT | {username} | {client_ip}")
resp = RedirectResponse("/")
resp.delete_cookie("access_token", path="/")
# Ensure logout response isn't cached
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0"
resp.headers["Pragma"] = "no-cache"
resp.headers["Expires"] = "Thu, 01 Jan 1970 00:00:00 GMT"
return resp
@app.get("/authz")
def authz(request: Request):
token = request.cookies.get("access_token")
path = request.headers.get("X-Original-URI", "unknown")
method = request.headers.get("X-Original-Method", "GET")
host = request.headers.get("X-Original-Host", "localhost")
client_ip = request.client.host if request.client else "unknown"
if not token:
logger.info(f"🚫 AUTH DENIED | {method:4} {path} | No token | {client_ip}")
return Response("Not authenticated", status_code=401)
username = verify_jwt_token(token)
if not username or username not in USERS:
logger.info(f"🚫 AUTH DENIED | {method:4} {path} | Invalid token | {client_ip}")
return Response("Invalid token", status_code=401)
user_role = USERS[username]['role']
# Only admin may access /admin
if path and path.startswith("/admin") and user_role != 'admin':
logger.info(f"🚫 AUTH DENIED | {method:4} {path} | {username}({user_role}) | {client_ip}")
return Response("Forbidden", status_code=403)
# Everything else: allowed
logger.info(f"✅ AUTH ALLOW | {method:4} {path} | {username}({user_role}) | {client_ip}")
return Response("OK", status_code=200)
@app.get("/me")
def get_current_user_info(request: Request):
"""Debug endpoint to see current user info"""
token = request.cookies.get("access_token")
if not token:
return {"authenticated": False, "message": "No token"}
username = verify_jwt_token(token)
if not username or username not in USERS:
return {"authenticated": False, "message": "Invalid token"}
return {
"authenticated": True,
"username": username,
"role": USERS[username]["role"]
}
if __name__ == "__main__":
import uvicorn
app.mount("/static", StaticFiles(directory="static"), name="static")
logger.info("🚀 Starting nginx-auth demo server...")
# Disable uvicorn access logs since we have our own structured auth logging
uvicorn.run(
app,
host="localhost",
port=5115,
access_log=False, # Disable to avoid conflicts
log_level="info"
)