This commit is contained in:
Waylon S. Walker 2024-10-21 08:27:28 -05:00
parent e334e711cc
commit feb21a4292
7 changed files with 207 additions and 3 deletions

View file

@ -0,0 +1,117 @@
from fastapi import HTTPException, Request
from functools import wraps
from starlette.authentication import AuthCredentials, AuthenticationBackend, SimpleUser
from typing import Dict
# In-memory user database for demonstration purposes
AUTH_DB: Dict[str, str] = {
"user1": "password123",
"user2": "securepassword",
"user3": "supersecurepassword",
}
SCOPES = {
"authenticated": "Authenticated users",
"admin": "Admin users",
"superuser": "Superuser",
}
USER_SCOPES = {
"user1": ["authenticated"],
"user2": ["authenticated", "admin"],
"user3": ["authenticated", "admin", "superuser"],
}
def authenticated(func: callable):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
if not request.user.is_authenticated:
raise HTTPException(status_code=401, detail="Authentication required")
return await func(request, *args, **kwargs)
return wrapper
def admin(func: callable):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
if not request.user.is_authenticated:
raise HTTPException(status_code=401, detail="Authentication required")
if "admin" not in request.user.scopes:
raise HTTPException(status_code=403, detail="Admin access required")
return await func(request, *args, **kwargs)
return wrapper
def has_scope(scope: str):
def decorator(func: callable):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
if not request.user.is_authenticated:
raise HTTPException(status_code=401, detail="Authentication required")
if scope not in request.auth.scopes:
raise HTTPException(status_code=403, detail="Access denied")
return await func(request, *args, **kwargs)
return wrapper
return decorator
class BasicAuthBackend(AuthenticationBackend):
"""Custom authentication backend that validates Basic auth credentials."""
async def authenticate(self, request: Request):
# Extract the 'Authorization' header from the request
auth_header = request.headers.get("Authorization")
if not auth_header:
return None # No credentials provided
try:
# Basic authentication: "Basic <username>:<password>"
auth_type, credentials = auth_header.split(" ", 1)
if auth_type != "Basic":
return None # Unsupported auth type
username, password = credentials.split(":")
except ValueError:
raise HTTPException(status_code=400, detail="Invalid Authorization format")
# Validate credentials against the in-memory AUTH_DB
if AUTH_DB.get(username) != password:
raise HTTPException(status_code=401, detail="Invalid credentials")
# If valid, return user object and auth credentials
return AuthCredentials(USER_SCOPES[username]), SimpleUser(username)
# # Initialize FastAPI app
# app = FastAPI()
#
# # Add AuthenticationMiddleware to FastAPI with the custom backend
# app.add_middleware(AuthenticationMiddleware, backend=BasicAuthBackend())
#
# # Add SessionMiddleware with a secret key
# app.add_middleware(SessionMiddleware, secret_key="your-secret-key")
# @app.get("/")
# async def public():
# """Public route."""
# return {"message": "This route is accessible to everyone!"}
#
# @app.get("/private")
# async def private(request: Request):
# """Private route that requires authentication."""
# if not request.user.is_authenticated:
# raise HTTPException(status_code=401, detail="Authentication required")
#
# return {"message": f"Welcome, {request.user.display_name}!"}
#
# @app.get("/session")
# async def session_info(request: Request):
# """Access session data."""
# request.session["example"] = "This is session data"
# return {"session_data": request.session}

View file

@ -1,5 +1,6 @@
from fastapi import APIRouter, Depends, Request
from fastapi_dynamic_response.auth import admin, authenticated, has_scope
from fastapi_dynamic_response.base.schema import Message
from fastapi_dynamic_response.dependencies import get_content_type
@ -15,6 +16,36 @@ async def get_example(
return {"message": "Hello, this is an example", "data": [1, 2, 3, 4]}
@router.get("/private")
@authenticated
async def get_private(
request: Request,
content_type: str = Depends(get_content_type),
):
request.state.template_name = "example.html"
return {"message": "This page is private", "data": [1, 2, 3, 4]}
@router.get("/admin")
@admin
async def get_admin(
request: Request,
content_type: str = Depends(get_content_type),
):
request.state.template_name = "example.html"
return {"message": "This is only for admin users", "data": [1, 2, 3, 4]}
@router.get("/superuser")
@has_scope("superuser")
async def get_superuser(
request: Request,
content_type: str = Depends(get_content_type),
):
request.state.template_name = "example.html"
return {"message": "This is only for superusers", "data": [1, 2, 3, 4]}
@router.get("/error")
async def get_error(
request: Request,

View file

@ -1,4 +1,4 @@
from fastapi import Depends, FastAPI, Request
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi_dynamic_response import globals
@ -52,8 +52,13 @@ app.middleware("http")(catch_exceptions_middleware)
app.middleware("http")(set_bound_logger)
app.mount("/static", StaticFiles(directory="static"), name="static")
from fastapi import Depends, Request
from fastapi_dynamic_response.auth import BasicAuthBackend
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.middleware.sessions import SessionMiddleware
# Flag to indicate if the application is ready
app.add_middleware(AuthenticationMiddleware, backend=BasicAuthBackend())
app.add_middleware(SessionMiddleware, secret_key="your-secret-key")
@app.on_event("startup")

View file

@ -1,5 +1,4 @@
from difflib import get_close_matches
from fastapi_dynamic_response.settings import settings
from io import BytesIO
import json