#!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.12" # dependencies = [ # "fastapi", # "uvicorn[standard]", # "python-jose[cryptography]", # "python-multipart", # "rich", # ] # /// from fastapi import FastAPI, Request, Response, HTTPException, Depends from fastapi.responses import RedirectResponse, PlainTextResponse from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic, HTTPBasicCredentials import secrets from jose import JWTError, jwt import datetime import os import logging from rich.logging import RichHandler from rich.console import Console # Configure rich console for consistent styling console = Console() app = FastAPI() security = HTTPBasic() # Configure structured logging logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(name)-12s | %(levelname)-8s | %(message)s", datefmt="%H:%M:%S", handlers=[RichHandler(console=console, show_path=False, rich_tracebacks=True)] ) logger = logging.getLogger("nginx-auth") # Configure uvicorn loggers to use our format uvicorn_error_logger = logging.getLogger("uvicorn.error") uvicorn_error_logger.handlers = [RichHandler(console=console, show_path=False)] # JWT Configuration SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-super-secure-secret-key-change-this-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 USERS = { "admin": {"password": "admin", "role": "admin"}, "reader": {"password": "reader", "role": "reader"}, } def create_access_token(data: dict, expires_delta: datetime.timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.datetime.now(datetime.UTC) + expires_delta else: expire = datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_jwt_token(token: str): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None return username except JWTError: return None def get_current_user(request: Request): token = request.cookies.get("access_token") if not token: return None username = verify_jwt_token(token) if username and username in USERS: return username return None def get_current_role(user: str): return USERS[user]["role"] @app.post("/login") async def login(request: Request, credentials: HTTPBasicCredentials = Depends(security)): user = credentials.username pwd = credentials.password client_ip = request.client.host if request.client else "unknown" if user in USERS and secrets.compare_digest(USERS[user]['password'], pwd): # Create JWT token access_token_expires = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user, "role": USERS[user]["role"]}, expires_delta=access_token_expires ) logger.info(f"🔑 LOGIN SUCCESS | {user}({USERS[user]['role']}) | {client_ip}") resp = Response("OK", status_code=200) resp.set_cookie( "access_token", access_token, httponly=True, secure=False, # Set to True in production with HTTPS samesite='lax', path="/", max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) # Ensure login response isn't cached resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" return resp logger.info(f"🔑 LOGIN FAILED | {user} | {client_ip}") raise HTTPException(status_code=401, detail="Invalid credentials") @app.get("/logout") def logout(request: Request): client_ip = request.client.host if request.client else "unknown" # Try to get current user for logging token = request.cookies.get("access_token") username = "unknown" if token: username = verify_jwt_token(token) or "unknown" logger.info(f"👋 LOGOUT | {username} | {client_ip}") resp = RedirectResponse("/") resp.delete_cookie("access_token", path="/") # Ensure logout response isn't cached resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" resp.headers["Expires"] = "Thu, 01 Jan 1970 00:00:00 GMT" return resp @app.get("/authz") def authz(request: Request): token = request.cookies.get("access_token") path = request.headers.get("X-Original-URI", "unknown") method = request.headers.get("X-Original-Method", "GET") host = request.headers.get("X-Original-Host", "localhost") client_ip = request.client.host if request.client else "unknown" if not token: logger.info(f"🚫 AUTH DENIED | {method:4} {path} | No token | {client_ip}") return Response("Not authenticated", status_code=401) username = verify_jwt_token(token) if not username or username not in USERS: logger.info(f"🚫 AUTH DENIED | {method:4} {path} | Invalid token | {client_ip}") return Response("Invalid token", status_code=401) user_role = USERS[username]['role'] # Only admin may access /admin if path and path.startswith("/admin") and user_role != 'admin': logger.info(f"🚫 AUTH DENIED | {method:4} {path} | {username}({user_role}) | {client_ip}") return Response("Forbidden", status_code=403) # Everything else: allowed logger.info(f"✅ AUTH ALLOW | {method:4} {path} | {username}({user_role}) | {client_ip}") return Response("OK", status_code=200) @app.get("/me") def get_current_user_info(request: Request): """Debug endpoint to see current user info""" token = request.cookies.get("access_token") if not token: return {"authenticated": False, "message": "No token"} username = verify_jwt_token(token) if not username or username not in USERS: return {"authenticated": False, "message": "Invalid token"} return { "authenticated": True, "username": username, "role": USERS[username]["role"] } if __name__ == "__main__": import uvicorn app.mount("/static", StaticFiles(directory="static"), name="static") logger.info("🚀 Starting nginx-auth demo server...") # Disable uvicorn access logs since we have our own structured auth logging uvicorn.run( app, host="localhost", port=5115, access_log=False, # Disable to avoid conflicts log_level="info" )