This commit is contained in:
Waylon S. Walker 2024-10-13 20:31:03 -05:00
commit a426df12c0
22 changed files with 2819 additions and 0 deletions

View file

@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2024-present U.N. Owen <void@some.where>
#
# SPDX-License-Identifier: MIT
__version__ = "0.0.1"

View file

@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2024-present U.N. Owen <void@some.where>
#
# SPDX-License-Identifier: MIT

View file

@ -0,0 +1,38 @@
from fastapi import APIRouter, Depends, Request
from fastapi_dynamic_response.base.schema import Message
from fastapi_dynamic_response.dependencies import get_content_type
router = APIRouter()
@router.get("/example")
async def get_example(
request: Request,
content_type: str = Depends(get_content_type),
):
request.state.template_name = "example.html"
return {"message": "Hello, this is an example", "data": [1, 2, 3, 4]}
@router.get("/another-example")
async def another_example(
request: Request,
content_type: str = Depends(get_content_type),
):
request.state.template_name = "another_example.html"
return {
"title": "Another Example",
"message": "Your cart",
"items": ["apple", "banana", "cherry"],
}
@router.post("/message")
async def message(
request: Request,
message: Message,
content_type: str = Depends(get_content_type),
):
request.state.template_name = "post_message.html"
return {"message": message.message}

View file

@ -0,0 +1,5 @@
from pydantic import BaseModel
class Message(BaseModel):
message: str

View file

@ -0,0 +1,9 @@
from fastapi import Query
def get_content_type(
content_type: str = Query(
None, description="Specify the content type of the response"
),
):
return content_type

View file

@ -0,0 +1,4 @@
from fastapi.templating import Jinja2Templates
is_ready = False
templates = Jinja2Templates(directory="templates")

View file

@ -0,0 +1,36 @@
from fastapi import Depends, FastAPI, Request
from fastapi_dynamic_response import globals
from fastapi_dynamic_response.base.router import router as base_router
from fastapi_dynamic_response.dependencies import get_content_type
from fastapi_dynamic_response.middleware import (
catch_exceptions_middleware,
respond_based_on_content_type,
)
from fastapi_dynamic_response.zpages.router import router as zpages_router
app = FastAPI(debug=True)
app.include_router(zpages_router)
app.include_router(base_router)
app.middleware("http")(catch_exceptions_middleware)
app.middleware("http")(respond_based_on_content_type)
# Flag to indicate if the application is ready
@app.on_event("startup")
async def startup_event():
# Perform startup actions, e.g., database connections
# If all startup actions are successful, set is_ready to True
globals.is_ready = True
@app.get("/sitemap")
async def sitemap(
request: Request,
content_type: str = Depends(get_content_type),
):
request.state.template_name = "sitemap.html"
available_routes = [route.path for route in app.router.routes if route.path]
return {"available_routes": available_routes}

View file

@ -0,0 +1,229 @@
from difflib import get_close_matches
from fastapi import Request, Response
from fastapi.exceptions import (
HTTPException as StarletteHTTPException,
RequestValidationError,
)
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
import html2text
from io import BytesIO
import json
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import traceback
from weasyprint import HTML as WeasyHTML
from fastapi_dynamic_response.globals import templates
async def catch_exceptions_middleware(request: Request, call_next):
try:
return await call_next(request)
except Exception as e:
print(traceback.format_exc())
raise e
def is_browser_request(user_agent: str) -> bool:
browser_keywords = [
"mozilla",
"chrome",
"safari",
"firefox",
"edge",
"wget",
"opera",
]
return any(keyword in user_agent.lower() for keyword in browser_keywords)
def is_rtf_request(user_agent: str) -> bool:
rtf_keywords = ["curl", "httpie", "httpx"]
return any(keyword in user_agent.lower() for keyword in rtf_keywords)
def get_screenshot(html_content: str) -> BytesIO:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1280x1024")
driver = webdriver.Chrome(options=chrome_options)
driver.get("data:text/html;charset=utf-8," + html_content)
screenshot = driver.get_screenshot_as_png()
driver.quit()
buffer = BytesIO(screenshot)
return buffer
def format_json_as_plain_text(data: dict) -> str:
"""Convert JSON to human-readable plain text format with indentation and bullet points."""
def _format_value(value, indent=2):
if isinstance(value, dict):
return format_json_as_plain_text(value)
elif isinstance(value, list):
return "\n".join([f"{' ' * indent}- {item}" for item in value])
else:
return str(value)
output_lines = []
for key, value in data.items():
if isinstance(value, (dict, list)):
output_lines.append(f"{key}:\n{_format_value(value)}")
else:
output_lines.append(f"{key}: {value}")
return "\n".join(output_lines)
def format_json_as_rich_text(data: dict, template_name: str) -> str:
"""Convert JSON to a human-readable rich text format using rich."""
console = Console()
# pretty_data = Pretty(data, indent_guides=True)
template = templates.get_template(template_name)
html_content = template.render(data=data)
markdown_content = html2text.html2text(html_content)
with console.capture() as capture:
console.print(
Panel(
Markdown(markdown_content),
title="Response Data",
border_style="bold cyan",
)
)
return capture.get()
def handle_not_found(request: Request, data: str):
requested_path = request.url.path
available_routes = [route.path for route in app.router.routes if route.path]
suggestions = get_close_matches(requested_path, available_routes, n=3, cutoff=0.5)
return templates.TemplateResponse(
"404.html",
{
"request": request,
"data": json.loads(data),
"available_routes": available_routes,
"requested_path": requested_path,
"suggestions": suggestions,
},
)
async def respond_based_on_content_type(request: Request, call_next):
requested_path = request.url.path
if requested_path in ["/docs", "/redoc", "/openapi.json"]:
return await call_next(request)
try:
response = await call_next(request)
user_agent = request.headers.get("user-agent", "").lower()
referer = request.headers.get("referer", "")
content_type = request.query_params.get(
"content_type",
request.headers.get("content-type", request.headers.get("Accept")),
)
if "raw" in content_type:
return await call_next(request)
if content_type == "*/*":
content_type = None
if ("/docs" in referer or "/redoc" in referer) and content_type is None:
content_type = "application/json"
elif is_browser_request(user_agent) and content_type is None:
content_type = "text/html"
elif is_rtf_request(user_agent) and content_type is None:
content_type = "application/rtf"
elif content_type is None:
content_type = content_type or "application/json"
body = b"".join([chunk async for chunk in response.body_iterator])
data = body.decode("utf-8")
if response.status_code == 404:
return handle_not_found(request, data)
if response.status_code == 422:
return response
if str(response.status_code)[0] not in "123":
return response
return await handle_response(request, data, content_type)
# except TemplateNotFound:
# return HTMLResponse(content="Template Not Found ", status_code=404)
except StarletteHTTPException as exc:
return HTMLResponse(
content=f"Error {exc.status_code}: {exc.detail}",
status_code=exc.status_code,
)
except RequestValidationError as exc:
return JSONResponse(status_code=422, content={"detail": exc.errors()})
except Exception as e:
print(traceback.format_exc())
return HTMLResponse(content=f"Internal Server Error: {str(e)}", status_code=500)
async def handle_response(request: Request, data: str, content_type: str):
json_data = json.loads(data)
template_name = getattr(request.state, "template_name", "default_template.html")
if content_type == "application/json":
return JSONResponse(content=json_data)
elif content_type == "text/html":
return templates.TemplateResponse(
template_name, {"request": request, "data": json_data}
)
elif content_type == "text/html-partial":
return templates.TemplateResponse(
template_name, {"request": request, "data": json_data}
)
elif (
content_type == "text/markdown"
or content_type == "md"
or content_type == "markdown"
):
import html2text
template = templates.get_template(template_name)
html_content = template.render(data=json_data)
markdown_content = html2text.html2text(html_content)
return PlainTextResponse(content=markdown_content)
elif content_type == "text/plain":
plain_text_content = format_json_as_plain_text(json_data)
return PlainTextResponse(content=plain_text_content)
elif (
content_type == "text/rich"
or content_type == "text/rtf"
or content_type == "application/rtf"
):
rich_text_content = format_json_as_rich_text(json_data, template_name)
return PlainTextResponse(content=rich_text_content)
elif content_type == "image/png":
template = templates.get_template(template_name)
html_content = template.render(data=json_data)
screenshot = get_screenshot(html_content)
return Response(content=screenshot.getvalue(), media_type="image/png")
elif content_type == "application/pdf":
template = templates.get_template(template_name)
html_content = template.render(data=json_data)
pdf = WeasyHTML(string=html_content).write_pdf()
return Response(content=pdf, media_type="application/pdf")
return JSONResponse(content=json_data)

View file

@ -0,0 +1,42 @@
from fastapi import APIRouter, HTTPException, Request
from fastapi_dynamic_response import globals
router = APIRouter()
@router.get("/livez")
async def livez(request: Request):
"""
Liveness probe endpoint.
Returns 200 OK if the application is alive.
"""
request.state.template_name = "status.html"
return {"status": "alive"}
@router.get("/readyz")
async def readyz(request: Request):
"""
Readiness probe endpoint.
Returns 200 OK if the application is ready to receive traffic.
Returns 503 Service Unavailable if not ready.
"""
request.state.template_name = "status.html"
if globals.is_ready:
return {"status": "ready"}
else:
raise HTTPException(status_code=503, detail="Not ready")
@router.get("/healthz")
async def healthz(request: Request):
"""
Health check endpoint.
Returns 200 OK if the application is healthy and ready.
Returns 503 Service Unavailable if not healthy.
"""
request.state.template_name = "status.html"
if is_ready:
return {"status": "healthy"}
else:
raise HTTPException(status_code=503, detail="Unhealthy")