htmx-patterns/htmx_patterns/zpages/router.py
2025-04-09 14:31:46 -05:00

91 lines
2.2 KiB
Python

from datetime import datetime, timezone
from kubernetes import client, config as kubernetes_config
from kubernetes.config.config_exception import ConfigException
import os
from pydantic import BaseModel
from fastapi import APIRouter
from htmx_patterns.config import get_config
zpages_router = APIRouter(tags=["zpages"])
config = get_config()
def format_uptime(seconds: int) -> str:
minutes, sec = divmod(seconds, 60)
hours, min = divmod(minutes, 60)
days, hr = divmod(hours, 24)
parts = []
if days:
parts.append(f"{days}d")
if hr:
parts.append(f"{hr}h")
if min:
parts.append(f"{min}m")
parts.append(f"{sec}s")
return " ".join(parts)
class PodInfo(BaseModel):
pod_name: str
namespace: str
node_name: str
container_image: str
start_time: str
pod_uptime: str
class Ready(BaseModel):
ready: bool
timestamp: str
app_version: str
pod_info: PodInfo
def get_pod_info():
pod_name = os.getenv("KUBERNETES_POD_NAME")
namespace = os.getenv("KUBERNETES_POD_NAMESPACE")
try:
kubernetes_config.load_incluster_config()
except ConfigException:
return PodInfo(
pod_name="unknown",
namespace="unknown",
node_name="unknown",
pod_ip="unknown",
container_image="unknown",
start_time="unknown",
pod_uptime="unknown",
)
v1 = client.CoreV1Api()
pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
start_time = pod.status.start_time
now = datetime.now(timezone.utc)
uptime_seconds = int((now - start_time).total_seconds())
return PodInfo(
pod_name=pod.metadata.name,
namespace=pod.metadata.namespace,
node_name=pod.spec.node_name,
container_image=pod.spec.containers[0].image,
start_time=pod.status.start_time,
pod_uptime=format_uptime(uptime_seconds),
)
@zpages_router.get("/readyz")
async def root() -> Ready:
pod_info = get_pod_info()
ready = Ready(
ready=True,
timestamp=datetime.now(timezone.utc).isoformat(),
app_version=config.app_version,
pod_info=pod_info,
)
return ready