#!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.12" # dependencies = [ # "fastapi", # "uvicorn[standard]", # "python-jose[cryptography]", # "python-multipart", # ] # /// from fastapi import FastAPI, Request, Response, HTTPException, Depends from fastapi.responses import RedirectResponse, PlainTextResponse from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic, HTTPBasicCredentials import secrets from jose import JWTError, jwt from datetime import datetime, timedelta import os app = FastAPI() security = HTTPBasic() # JWT Configuration SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-super-secure-secret-key-change-this-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 USERS = { "admin": {"password": "admin", "role": "admin"}, "reader": {"password": "reader", "role": "reader"}, } def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_jwt_token(token: str): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None return username except JWTError: return None def get_current_user(request: Request): token = request.cookies.get("access_token") if not token: return None username = verify_jwt_token(token) if username and username in USERS: return username return None def get_current_role(user: str): return USERS[user]["role"] @app.post("/login") async def login(credentials: HTTPBasicCredentials = Depends(security)): user = credentials.username pwd = credentials.password if user in USERS and secrets.compare_digest(USERS[user]['password'], pwd): # Create JWT token access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user, "role": USERS[user]["role"]}, expires_delta=access_token_expires ) resp = Response("OK", status_code=200) resp.set_cookie( "access_token", access_token, httponly=True, secure=False, # Set to True in production with HTTPS samesite='lax', path="/", max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) # Ensure login response isn't cached resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" return resp raise HTTPException(status_code=401, detail="Invalid credentials") @app.get("/logout") def logout(): resp = RedirectResponse("/") resp.delete_cookie("access_token", path="/") # Ensure logout response isn't cached resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" resp.headers["Expires"] = "Thu, 01 Jan 1970 00:00:00 GMT" return resp @app.get("/authz") def authz(request: Request): token = request.cookies.get("access_token") path = request.headers.get("X-Original-URI") if not token: return Response("Not authenticated", status_code=401) username = verify_jwt_token(token) if not username or username not in USERS: return Response("Invalid token", status_code=401) user_role = USERS[username]['role'] # Only admin may access /admin if path and path.startswith("/admin") and user_role != 'admin': return Response("Forbidden", status_code=403) # Everything else: allowed return Response("OK", status_code=200) @app.get("/me") def get_current_user_info(request: Request): """Debug endpoint to see current user info""" token = request.cookies.get("access_token") if not token: return {"authenticated": False, "message": "No token"} username = verify_jwt_token(token) if not username or username not in USERS: return {"authenticated": False, "message": "Invalid token"} return { "authenticated": True, "username": username, "role": USERS[username]["role"] } if __name__ == "__main__": import uvicorn app.mount("/static", StaticFiles(directory="static"), name="static") uvicorn.run(app, host="localhost", port=5115)