143 lines
4.6 KiB
Python
Executable file
143 lines
4.6 KiB
Python
Executable file
#!/usr/bin/env -S uv run --script
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = [
|
|
# "fastapi",
|
|
# "uvicorn[standard]",
|
|
# "python-jose[cryptography]",
|
|
# "python-multipart",
|
|
# ]
|
|
# ///
|
|
from fastapi import FastAPI, Request, Response, HTTPException, Depends
|
|
from fastapi.responses import RedirectResponse, PlainTextResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
import secrets
|
|
from jose import JWTError, jwt
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
|
|
app = FastAPI()
|
|
security = HTTPBasic()
|
|
|
|
# JWT Configuration
|
|
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-super-secure-secret-key-change-this-in-production")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
|
|
|
USERS = {
|
|
"admin": {"password": "admin", "role": "admin"},
|
|
"reader": {"password": "reader", "role": "reader"},
|
|
}
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta = None):
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(minutes=15)
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
def verify_jwt_token(token: str):
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
return None
|
|
return username
|
|
except JWTError:
|
|
return None
|
|
|
|
def get_current_user(request: Request):
|
|
token = request.cookies.get("access_token")
|
|
if not token:
|
|
return None
|
|
|
|
username = verify_jwt_token(token)
|
|
if username and username in USERS:
|
|
return username
|
|
return None
|
|
|
|
def get_current_role(user: str):
|
|
return USERS[user]["role"]
|
|
|
|
@app.post("/login")
|
|
async def login(credentials: HTTPBasicCredentials = Depends(security)):
|
|
user = credentials.username
|
|
pwd = credentials.password
|
|
if user in USERS and secrets.compare_digest(USERS[user]['password'], pwd):
|
|
# Create JWT token
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": user, "role": USERS[user]["role"]},
|
|
expires_delta=access_token_expires
|
|
)
|
|
|
|
resp = Response("OK", status_code=200)
|
|
resp.set_cookie(
|
|
"access_token",
|
|
access_token,
|
|
httponly=True,
|
|
secure=False, # Set to True in production with HTTPS
|
|
samesite='lax',
|
|
path="/",
|
|
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60
|
|
)
|
|
# Ensure login response isn't cached
|
|
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0"
|
|
resp.headers["Pragma"] = "no-cache"
|
|
return resp
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
@app.get("/logout")
|
|
def logout():
|
|
resp = RedirectResponse("/")
|
|
resp.delete_cookie("access_token", path="/")
|
|
# Ensure logout response isn't cached
|
|
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0"
|
|
resp.headers["Pragma"] = "no-cache"
|
|
resp.headers["Expires"] = "Thu, 01 Jan 1970 00:00:00 GMT"
|
|
return resp
|
|
|
|
@app.get("/authz")
|
|
def authz(request: Request):
|
|
token = request.cookies.get("access_token")
|
|
path = request.headers.get("X-Original-URI")
|
|
|
|
if not token:
|
|
return Response("Not authenticated", status_code=401)
|
|
|
|
username = verify_jwt_token(token)
|
|
if not username or username not in USERS:
|
|
return Response("Invalid token", status_code=401)
|
|
|
|
user_role = USERS[username]['role']
|
|
# Only admin may access /admin
|
|
if path and path.startswith("/admin") and user_role != 'admin':
|
|
return Response("Forbidden", status_code=403)
|
|
# Everything else: allowed
|
|
return Response("OK", status_code=200)
|
|
|
|
@app.get("/me")
|
|
def get_current_user_info(request: Request):
|
|
"""Debug endpoint to see current user info"""
|
|
token = request.cookies.get("access_token")
|
|
if not token:
|
|
return {"authenticated": False, "message": "No token"}
|
|
|
|
username = verify_jwt_token(token)
|
|
if not username or username not in USERS:
|
|
return {"authenticated": False, "message": "Invalid token"}
|
|
|
|
return {
|
|
"authenticated": True,
|
|
"username": username,
|
|
"role": USERS[username]["role"]
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
uvicorn.run(app, host="localhost", port=5115)
|