938 lines
29 KiB
Python
Executable file
938 lines
29 KiB
Python
Executable file
#!/usr/bin/env -S uv run --quiet --script
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = [
|
|
# "typer",
|
|
# "rich",
|
|
# "httpx",
|
|
# "kubernetes",
|
|
# "pydantic",
|
|
# ]
|
|
# ///
|
|
|
|
"""
|
|
k8s-ingress-debugger
|
|
|
|
Given a Deployment name, this tool inspects the Kubernetes objects around it and
|
|
runs a set of connectivity checks:
|
|
|
|
• Does an Ingress point to it?
|
|
• What are the Ingress hosts?
|
|
• What's the healthcheck route (from readiness/liveness HTTP probes)?
|
|
• Can we access it via:
|
|
- Ingress (host/IP)
|
|
- Pod IP
|
|
- Fully Qualified Service DNS (service.ns.svc.cluster.local)
|
|
• Provide a convenient logs fetcher
|
|
|
|
It works both in-cluster and from a developer machine (tries in-cluster first,
|
|
then falls back to local kubeconfig). All checker functions are importable and
|
|
usable outside of Typer.
|
|
|
|
Examples
|
|
--------
|
|
Inspect with rich table output:
|
|
./k8s_ingress_debug.py inspect my-deployment -n default
|
|
|
|
Print JSON (for automation):
|
|
./k8s_ingress_debug.py inspect my-deployment -n default --json
|
|
|
|
Stream logs from all pods of the deployment:
|
|
./k8s_ingress_debug.py logs my-deployment -n default -f --tail 200
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import socket
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Iterable, List, Optional, Tuple
|
|
|
|
import httpx
|
|
import typer
|
|
from pydantic import BaseModel, Field
|
|
from rich import box
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.table import Table
|
|
|
|
# Kubernetes imports
|
|
from kubernetes import client, config
|
|
from kubernetes.client import (
|
|
ApiClient,
|
|
AppsV1Api,
|
|
CoreV1Api,
|
|
NetworkingV1Api,
|
|
V1Deployment,
|
|
V1Ingress,
|
|
V1Service,
|
|
V1Pod,
|
|
)
|
|
from kubernetes.stream import stream
|
|
|
|
app = typer.Typer(add_completion=False, help="Kubernetes Ingress Debugger")
|
|
console = Console()
|
|
|
|
|
|
# =========================
|
|
# Kube config + helpers
|
|
# =========================
|
|
|
|
|
|
@dataclass
|
|
class KubeCtx:
|
|
api_client: ApiClient
|
|
core: CoreV1Api
|
|
apps: AppsV1Api
|
|
net: NetworkingV1Api
|
|
in_cluster: bool
|
|
|
|
|
|
def load_kube_ctx() -> KubeCtx:
|
|
"""
|
|
Load Kubernetes configuration, preferring in-cluster.
|
|
Falls back to local kubeconfig.
|
|
"""
|
|
in_cluster = False
|
|
try:
|
|
config.load_incluster_config()
|
|
in_cluster = True
|
|
except Exception:
|
|
# Not in cluster, try local kubeconfig
|
|
config.load_kube_config()
|
|
|
|
api_client = client.ApiClient()
|
|
return KubeCtx(
|
|
api_client=api_client,
|
|
core=CoreV1Api(api_client),
|
|
apps=AppsV1Api(api_client),
|
|
net=NetworkingV1Api(api_client),
|
|
in_cluster=in_cluster,
|
|
)
|
|
|
|
|
|
# =========================
|
|
# Discovery models
|
|
# =========================
|
|
|
|
|
|
class ProbeInfo(BaseModel):
|
|
kind: str # "readiness" | "liveness" | "startup"
|
|
path: Optional[str] = None
|
|
port: Optional[str | int] = None
|
|
scheme: str = "http"
|
|
|
|
|
|
class ServiceBinding(BaseModel):
|
|
service: str
|
|
namespace: str
|
|
port: int
|
|
target_port: str | int | None = None
|
|
protocol: str = "TCP"
|
|
|
|
|
|
class IngressBinding(BaseModel):
|
|
ingress: str
|
|
namespace: str
|
|
host: str
|
|
path: str
|
|
tls: bool
|
|
service: str
|
|
service_port: int
|
|
|
|
|
|
class Reachability(BaseModel):
|
|
via: str # "ingress", "pod-ip", "svc-fqdn"
|
|
target: str
|
|
url: Optional[str] = None
|
|
ok: bool = False
|
|
status: Optional[int] = None
|
|
error: Optional[str] = None
|
|
latency_ms: Optional[int] = None
|
|
|
|
|
|
class InspectionReport(BaseModel):
|
|
deployment: str
|
|
namespace: str
|
|
in_cluster: bool
|
|
pods: List[str]
|
|
pod_ips: Dict[str, str]
|
|
container_ports: Dict[str, List[int]]
|
|
health_probe: Optional[ProbeInfo] = None
|
|
services: List[ServiceBinding] = Field(default_factory=list)
|
|
ingresses: List[IngressBinding] = Field(default_factory=list)
|
|
reachability: List[Reachability] = Field(default_factory=list)
|
|
|
|
|
|
# =========================
|
|
# Core discovery functions
|
|
# =========================
|
|
|
|
|
|
def find_deployment(
|
|
ctx: KubeCtx, name: str, namespace: Optional[str]
|
|
) -> Tuple[V1Deployment, str]:
|
|
"""
|
|
Return (deployment, namespace).
|
|
If namespace not provided, try to find a unique deployment across all namespaces.
|
|
"""
|
|
if namespace:
|
|
dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace)
|
|
return dep, namespace
|
|
|
|
# Search all namespaces for uniqueness
|
|
deps = ctx.apps.list_deployment_for_all_namespaces(
|
|
field_selector=f"metadata.name={name}"
|
|
).items
|
|
if not deps:
|
|
raise RuntimeError(f"Deployment '{name}' not found in any namespace.")
|
|
if len(deps) > 1:
|
|
ns_list = ", ".join(sorted({d.metadata.namespace for d in deps}))
|
|
raise RuntimeError(
|
|
f"Deployment '{name}' found in multiple namespaces: {ns_list}. Please specify --namespace."
|
|
)
|
|
d = deps[0]
|
|
return d, d.metadata.namespace
|
|
|
|
|
|
def pods_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Pod]:
|
|
selector = dep.spec.selector.match_labels or {}
|
|
if not selector:
|
|
return []
|
|
label_selector = ",".join(f"{k}={v}" for k, v in selector.items())
|
|
pods = ctx.core.list_namespaced_pod(
|
|
namespace=dep.metadata.namespace, label_selector=label_selector
|
|
).items
|
|
return [p for p in pods if p.metadata.deletion_timestamp is None]
|
|
|
|
|
|
def services_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Service]:
|
|
"""
|
|
Services whose selector is a subset of deployment's selector labels
|
|
"""
|
|
ns = dep.metadata.namespace
|
|
dep_sel = dep.spec.selector.match_labels or {}
|
|
svcs = ctx.core.list_namespaced_service(namespace=ns).items
|
|
matched = []
|
|
for s in svcs:
|
|
sel = s.spec.selector or {}
|
|
if sel and all(dep_sel.get(k) == v for k, v in sel.items()):
|
|
matched.append(s)
|
|
return matched
|
|
|
|
|
|
def ingresses_for_services(
|
|
ctx: KubeCtx, namespace: str, services: Iterable[V1Service]
|
|
) -> List[V1Ingress]:
|
|
svc_names = {s.metadata.name for s in services}
|
|
ings = ctx.net.list_namespaced_ingress(namespace=namespace).items
|
|
out = []
|
|
for ing in ings:
|
|
if not ing.spec or not ing.spec.rules:
|
|
continue
|
|
for rule in ing.spec.rules:
|
|
if not rule.http or not rule.http.paths:
|
|
continue
|
|
for p in rule.http.paths:
|
|
backend = p.backend
|
|
if backend and backend.service and backend.service.name in svc_names:
|
|
out.append(ing)
|
|
break
|
|
# de-dup
|
|
seen = set()
|
|
uniq = []
|
|
for ing in out:
|
|
key = (ing.metadata.namespace, ing.metadata.name)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
uniq.append(ing)
|
|
return uniq
|
|
|
|
|
|
def extract_probe(dep: V1Deployment) -> Optional[ProbeInfo]:
|
|
"""
|
|
Prefer readiness > liveness > startup HTTP probes.
|
|
"""
|
|
tmpl = dep.spec.template
|
|
if not tmpl or not tmpl.spec or not tmpl.spec.containers:
|
|
return None
|
|
|
|
def http_probe(container, probe_field: str) -> Optional[ProbeInfo]:
|
|
pr = getattr(container, probe_field, None)
|
|
if pr and pr.http_get:
|
|
path = pr.http_get.path or "/"
|
|
port = pr.http_get.port
|
|
scheme = (pr.http_get.scheme or "HTTP").lower()
|
|
return ProbeInfo(
|
|
kind=probe_field.replace("_probe", ""),
|
|
path=path,
|
|
port=port,
|
|
scheme="https" if scheme == "https" else "http",
|
|
)
|
|
return None
|
|
|
|
# Check each container, stop at first we find
|
|
for c in tmpl.spec.containers:
|
|
for field in ("readiness_probe", "liveness_probe", "startup_probe"):
|
|
pi = http_probe(c, field)
|
|
if pi:
|
|
return pi
|
|
return None
|
|
|
|
|
|
def resolve_service_bindings(
|
|
dep: V1Deployment, services: List[V1Service], preferred_port: Optional[str | int]
|
|
) -> List[ServiceBinding]:
|
|
"""
|
|
Build bindings using Service ports; try to align with probe/targetPort when given.
|
|
"""
|
|
ns = dep.metadata.namespace
|
|
bindings: List[ServiceBinding] = []
|
|
|
|
for s in services:
|
|
for sp in s.spec.ports or []:
|
|
port_num = int(sp.port)
|
|
target = sp.target_port if isinstance(sp.target_port, (str, int)) else None
|
|
# Prefer the service port that matches preferred_port (by name or number)
|
|
if preferred_port is not None:
|
|
if isinstance(preferred_port, int) and (
|
|
target == preferred_port or port_num == preferred_port
|
|
):
|
|
bindings.append(
|
|
ServiceBinding(
|
|
service=s.metadata.name,
|
|
namespace=ns,
|
|
port=port_num,
|
|
target_port=target,
|
|
protocol=(sp.protocol or "TCP"),
|
|
)
|
|
)
|
|
continue
|
|
if isinstance(preferred_port, str) and (
|
|
sp.name == preferred_port or target == preferred_port
|
|
):
|
|
bindings.append(
|
|
ServiceBinding(
|
|
service=s.metadata.name,
|
|
namespace=ns,
|
|
port=port_num,
|
|
target_port=target,
|
|
protocol=(sp.protocol or "TCP"),
|
|
)
|
|
)
|
|
continue
|
|
# Otherwise include everything; we'll de-dup later
|
|
bindings.append(
|
|
ServiceBinding(
|
|
service=s.metadata.name,
|
|
namespace=ns,
|
|
port=port_num,
|
|
target_port=target,
|
|
protocol=(sp.protocol or "TCP"),
|
|
)
|
|
)
|
|
|
|
# de-dup by (svc,port)
|
|
seen = set()
|
|
uniq: List[ServiceBinding] = []
|
|
for b in bindings:
|
|
key = (b.service, b.port)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
uniq.append(b)
|
|
return uniq
|
|
|
|
|
|
def extract_ingress_bindings(
|
|
ingresses: List[V1Ingress], services: List[V1Service]
|
|
) -> List[IngressBinding]:
|
|
svc_names = {s.metadata.name for s in services}
|
|
bindings: List[IngressBinding] = []
|
|
for ing in ingresses:
|
|
tls_hosts = set()
|
|
if ing.spec and ing.spec.tls:
|
|
for t in ing.spec.tls:
|
|
for h in t.hosts or []:
|
|
tls_hosts.add(h)
|
|
if not ing.spec or not ing.spec.rules:
|
|
continue
|
|
for rule in ing.spec.rules:
|
|
host = rule.host or ""
|
|
if not rule.http or not rule.http.paths:
|
|
continue
|
|
for p in rule.http.paths:
|
|
backend = p.backend
|
|
if backend and backend.service and backend.service.name in svc_names:
|
|
svc_port = (
|
|
int(backend.service.port.number)
|
|
if backend.service.port and backend.service.port.number
|
|
else 80
|
|
)
|
|
bindings.append(
|
|
IngressBinding(
|
|
ingress=ing.metadata.name,
|
|
namespace=ing.metadata.namespace,
|
|
host=host,
|
|
path=p.path or "/",
|
|
tls=(host in tls_hosts),
|
|
service=backend.service.name,
|
|
service_port=svc_port,
|
|
)
|
|
)
|
|
return bindings
|
|
|
|
|
|
# =========================
|
|
# Networking helpers
|
|
# =========================
|
|
|
|
|
|
def dns_resolves(host: str) -> bool:
|
|
try:
|
|
socket.gethostbyname(host)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def http_check(
|
|
url: str,
|
|
host_header: Optional[str] = None,
|
|
timeout: float = 5.0,
|
|
verify_tls: bool = True,
|
|
) -> Reachability:
|
|
start = time.perf_counter()
|
|
headers = {}
|
|
if host_header:
|
|
headers["Host"] = host_header
|
|
try:
|
|
with httpx.Client(
|
|
follow_redirects=True, verify=verify_tls, headers=headers, timeout=timeout
|
|
) as s:
|
|
r = s.get(url)
|
|
latency_ms = int((time.perf_counter() - start) * 1000)
|
|
return Reachability(
|
|
via="ingress" if host_header or url.startswith("http") else "unknown",
|
|
target=host_header or url,
|
|
url=url,
|
|
ok=r.status_code < 500,
|
|
status=r.status_code,
|
|
error=None,
|
|
latency_ms=latency_ms,
|
|
)
|
|
except Exception as e:
|
|
latency_ms = int((time.perf_counter() - start) * 1000)
|
|
return Reachability(
|
|
via="ingress",
|
|
target=host_header or url,
|
|
url=url,
|
|
ok=False,
|
|
status=None,
|
|
error=str(e),
|
|
latency_ms=latency_ms,
|
|
)
|
|
|
|
|
|
def tcp_check(
|
|
host: str, port: int, timeout: float = 3.0
|
|
) -> Tuple[bool, Optional[str], Optional[int]]:
|
|
start = time.perf_counter()
|
|
try:
|
|
with socket.create_connection((host, port), timeout=timeout):
|
|
return True, None, int((time.perf_counter() - start) * 1000)
|
|
except Exception as e:
|
|
return False, str(e), int((time.perf_counter() - start) * 1000)
|
|
|
|
|
|
def try_exec_http_from_pod(
|
|
ctx: KubeCtx, namespace: str, pod: str, url: str, timeout: int = 8
|
|
) -> Reachability:
|
|
"""
|
|
Execute a lightweight HTTP check from within the given pod (best-effort).
|
|
Tries curl, then wget. Returns Reachability record with ok status.
|
|
"""
|
|
cmd = [
|
|
"sh",
|
|
"-lc",
|
|
# Prefer curl (status + timing), fallback to wget; if both missing, try /dev/tcp.
|
|
(
|
|
f'(command -v curl >/dev/null && curl -sk -o /dev/null -w "%{{http_code}}" "{url}") || '
|
|
f'(command -v wget >/dev/null && wget -qO- "{url}" >/dev/null && printf 200) || '
|
|
f"(echo 000)"
|
|
),
|
|
]
|
|
try:
|
|
out = stream(
|
|
ctx.core.connect_get_namespaced_pod_exec,
|
|
name=pod,
|
|
namespace=namespace,
|
|
command=cmd,
|
|
stderr=True,
|
|
stdin=False,
|
|
stdout=True,
|
|
tty=False,
|
|
_request_timeout=timeout,
|
|
)
|
|
code = 0
|
|
try:
|
|
code = int(str(out).strip()[:3])
|
|
except Exception:
|
|
code = 0
|
|
return Reachability(
|
|
via="svc-fqdn",
|
|
target=url,
|
|
url=url,
|
|
ok=200 <= code < 500,
|
|
status=code,
|
|
error=None if 200 <= code < 500 else f"code={code}",
|
|
)
|
|
except Exception as e:
|
|
return Reachability(
|
|
via="svc-fqdn", target=url, url=url, ok=False, status=None, error=str(e)
|
|
)
|
|
|
|
|
|
# =========================
|
|
# High-level inspection
|
|
# =========================
|
|
|
|
|
|
def inspect_deployment(
|
|
name: str,
|
|
namespace: Optional[str] = None,
|
|
timeout: float = 5.0,
|
|
verify_tls: bool = True,
|
|
) -> InspectionReport:
|
|
"""
|
|
Full inspection routine. Returns a structured report usable by other tools.
|
|
"""
|
|
ctx = load_kube_ctx()
|
|
dep, ns = find_deployment(ctx, name, namespace)
|
|
pods = pods_for_deployment(ctx, dep)
|
|
pod_names = [p.metadata.name for p in pods]
|
|
pod_ips = {p.metadata.name: (p.status.pod_ip or "") for p in pods}
|
|
|
|
# Collect declared containerPorts
|
|
cports: Dict[str, List[int]] = {}
|
|
for p in pods:
|
|
plist = []
|
|
for c in p.spec.containers or []:
|
|
for cp in c.ports or []:
|
|
if cp.container_port:
|
|
plist.append(int(cp.container_port))
|
|
cports[p.metadata.name] = sorted({*plist})
|
|
|
|
probe = extract_probe(dep)
|
|
preferred_port: Optional[int | str] = (
|
|
probe.port if probe and probe.port is not None else None
|
|
)
|
|
services = services_for_deployment(ctx, dep)
|
|
svc_bindings = resolve_service_bindings(dep, services, preferred_port)
|
|
|
|
ingresses = ingresses_for_services(ctx, ns, services)
|
|
ing_bindings = extract_ingress_bindings(ingresses, services)
|
|
|
|
# Reachability checks
|
|
reach: List[Reachability] = []
|
|
|
|
# 1) Through Ingress (best effort)
|
|
for ib in ing_bindings:
|
|
scheme = "https" if ib.tls else "http"
|
|
base = f"{scheme}://{ib.host}" if ib.host else ""
|
|
path = ib.path or "/"
|
|
health_path = probe.path if probe and probe.path else "/"
|
|
url = f"{base}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}"
|
|
# If host resolves, try directly
|
|
if ib.host and dns_resolves(ib.host):
|
|
r = http_check(url, timeout=timeout, verify_tls=verify_tls)
|
|
r.via = "ingress"
|
|
r.target = ib.host
|
|
reach.append(r)
|
|
else:
|
|
# Try using load balancer IP/hostname from status with Host header
|
|
target_ips: List[str] = []
|
|
for ing in ingresses:
|
|
if ing.metadata.name == ib.ingress:
|
|
if (
|
|
ing.status
|
|
and ing.status.load_balancer
|
|
and ing.status.load_balancer.ingress
|
|
):
|
|
for ent in ing.status.load_balancer.ingress:
|
|
if ent.ip:
|
|
target_ips.append(ent.ip)
|
|
if ent.hostname:
|
|
target_ips.append(ent.hostname)
|
|
if target_ips:
|
|
t = target_ips[0]
|
|
alt_url = f"{scheme}://{t}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}"
|
|
reach.append(
|
|
http_check(
|
|
alt_url,
|
|
host_header=ib.host or None,
|
|
timeout=timeout,
|
|
verify_tls=verify_tls,
|
|
)
|
|
)
|
|
else:
|
|
reach.append(
|
|
Reachability(
|
|
via="ingress",
|
|
target=ib.host or "(no-host)",
|
|
url=url or None,
|
|
ok=False,
|
|
error="No DNS for host and no load balancer address found on Ingress.",
|
|
)
|
|
)
|
|
|
|
# 2) Through Pod IP (TCP + optional HTTP on health path)
|
|
test_port_candidates: List[int] = []
|
|
if preferred_port is not None and isinstance(preferred_port, int):
|
|
test_port_candidates.append(preferred_port)
|
|
# Add declared service ports
|
|
for b in svc_bindings:
|
|
if b.port not in test_port_candidates:
|
|
test_port_candidates.append(b.port)
|
|
# Add container ports if nothing else
|
|
if not test_port_candidates:
|
|
for plist in cports.values():
|
|
for pnum in plist:
|
|
if pnum not in test_port_candidates:
|
|
test_port_candidates.append(pnum)
|
|
|
|
for pod in pods:
|
|
ip = pod.status.pod_ip
|
|
if not ip:
|
|
reach.append(
|
|
Reachability(
|
|
via="pod-ip",
|
|
target=pod.metadata.name,
|
|
ok=False,
|
|
error="No Pod IP assigned",
|
|
)
|
|
)
|
|
continue
|
|
# Try TCP on first viable port
|
|
if test_port_candidates:
|
|
port = test_port_candidates[0]
|
|
ok, err, _lat = tcp_check(ip, port, timeout=timeout)
|
|
if not ok:
|
|
reach.append(
|
|
Reachability(
|
|
via="pod-ip", target=f"{ip}:{port}", ok=False, error=err
|
|
)
|
|
)
|
|
else:
|
|
# Try HTTP GET if we have a health path
|
|
health_path = probe.path if probe and probe.path else "/"
|
|
url = f"http://{ip}:{port}{health_path if health_path.startswith('/') else '/' + health_path}"
|
|
r = http_check(url, timeout=timeout, verify_tls=False)
|
|
r.via = "pod-ip"
|
|
r.target = f"{ip}:{port}"
|
|
reach.append(r)
|
|
else:
|
|
reach.append(
|
|
Reachability(
|
|
via="pod-ip",
|
|
target=ip,
|
|
ok=False,
|
|
error="No candidate port found to test",
|
|
)
|
|
)
|
|
|
|
# 3) Through fully qualified Service DNS (from inside cluster if needed)
|
|
# Choose first service binding if available
|
|
if svc_bindings:
|
|
sb = svc_bindings[0]
|
|
fqdn = f"{sb.service}.{sb.namespace}.svc.cluster.local"
|
|
health_path = probe.path if probe and probe.path else "/"
|
|
url = f"http://{fqdn}:{sb.port}{health_path if health_path.startswith('/') else '/' + health_path}"
|
|
|
|
ctx = load_kube_ctx()
|
|
if ctx.in_cluster and dns_resolves(fqdn):
|
|
r = http_check(url, timeout=timeout, verify_tls=False)
|
|
r.via = "svc-fqdn"
|
|
r.target = fqdn
|
|
reach.append(r)
|
|
else:
|
|
if pods:
|
|
reach.append(
|
|
try_exec_http_from_pod(
|
|
ctx, sb.namespace, pods[0].metadata.name, url
|
|
)
|
|
)
|
|
else:
|
|
reach.append(
|
|
Reachability(
|
|
via="svc-fqdn",
|
|
target=url,
|
|
url=url,
|
|
ok=False,
|
|
error="No pods available to test inside cluster",
|
|
)
|
|
)
|
|
else:
|
|
reach.append(
|
|
Reachability(
|
|
via="svc-fqdn",
|
|
target="(no service)",
|
|
ok=False,
|
|
error="No Service bound to the deployment",
|
|
)
|
|
)
|
|
|
|
return InspectionReport(
|
|
deployment=dep.metadata.name,
|
|
namespace=ns,
|
|
in_cluster=ctx.in_cluster,
|
|
pods=pod_names,
|
|
pod_ips=pod_ips,
|
|
container_ports=cports,
|
|
health_probe=probe,
|
|
services=svc_bindings,
|
|
ingresses=ing_bindings,
|
|
reachability=reach,
|
|
)
|
|
|
|
|
|
# =========================
|
|
# Logs helpers
|
|
# =========================
|
|
|
|
|
|
def deployment_pods(ctx: KubeCtx, name: str, namespace: str) -> List[V1Pod]:
|
|
dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace)
|
|
return pods_for_deployment(ctx, dep)
|
|
|
|
|
|
def print_pod_logs(
|
|
name: str,
|
|
namespace: str,
|
|
container: Optional[str] = None,
|
|
tail: Optional[int] = None,
|
|
since_seconds: Optional[int] = None,
|
|
follow: bool = False,
|
|
) -> None:
|
|
ctx = load_kube_ctx()
|
|
pods = deployment_pods(ctx, name, namespace)
|
|
if not pods:
|
|
console.print(f"[red]No pods found for deployment {name} in {namespace}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
# If follow, stream each pod in sequence (simple approach)
|
|
for p in pods:
|
|
console.rule(f"[bold]Logs: {p.metadata.name}[/bold]")
|
|
if follow:
|
|
# naive follow using repeated calls
|
|
try:
|
|
for line in ctx.core.read_namespaced_pod_log(
|
|
name=p.metadata.name,
|
|
namespace=namespace,
|
|
container=container,
|
|
tail_lines=tail,
|
|
since_seconds=since_seconds,
|
|
follow=True,
|
|
_preload_content=False,
|
|
).stream(decode_content=True):
|
|
try:
|
|
console.print(line.decode("utf-8").rstrip())
|
|
except Exception:
|
|
console.print(line)
|
|
except KeyboardInterrupt:
|
|
break
|
|
else:
|
|
out = ctx.core.read_namespaced_pod_log(
|
|
name=p.metadata.name,
|
|
namespace=namespace,
|
|
container=container,
|
|
tail_lines=tail,
|
|
since_seconds=since_seconds,
|
|
)
|
|
console.print(out)
|
|
|
|
|
|
# =========================
|
|
# CLI commands
|
|
# =========================
|
|
|
|
|
|
@app.command("inspect")
|
|
def cli_inspect(
|
|
deployment: str = typer.Argument(..., help="Deployment name"),
|
|
namespace: Optional[str] = typer.Option(
|
|
None,
|
|
"--namespace",
|
|
"-n",
|
|
help="Namespace (if omitted, will try to auto-detect)",
|
|
),
|
|
timeout: float = typer.Option(5.0, help="HTTP/TCP timeout (seconds)"),
|
|
insecure: bool = typer.Option(False, help="Skip TLS verification for HTTPS checks"),
|
|
output_json: bool = typer.Option(
|
|
False, "--json", help="Print JSON report instead of a table"
|
|
),
|
|
):
|
|
"""
|
|
Inspect a Deployment's Services & Ingresses and run connectivity checks.
|
|
"""
|
|
try:
|
|
report = inspect_deployment(
|
|
deployment, namespace=namespace, timeout=timeout, verify_tls=(not insecure)
|
|
)
|
|
except Exception as e:
|
|
console.print(f"[red]Error:[/red] {e}")
|
|
raise typer.Exit(1)
|
|
|
|
if output_json:
|
|
console.print_json(json.dumps(report.model_dump(), indent=2))
|
|
return
|
|
|
|
hdr = (
|
|
f"[bold white]Deployment:[/bold white] {report.deployment} "
|
|
f"[bold white]Namespace:[/bold white] {report.namespace} "
|
|
f"[bold white]Context:[/bold white] {'in-cluster' if report.in_cluster else 'local'}"
|
|
)
|
|
console.print(Panel(hdr, border_style="cyan", title="Overview"))
|
|
|
|
# Pods table
|
|
t = Table(title="Pods", box=box.SIMPLE, show_lines=False)
|
|
t.add_column("Pod")
|
|
t.add_column("IP")
|
|
t.add_column("Ports")
|
|
for pod in report.pods:
|
|
ports = ", ".join(str(p) for p in report.container_ports.get(pod, [])) or "-"
|
|
t.add_row(pod, report.pod_ips.get(pod, "-"), ports)
|
|
console.print(t)
|
|
|
|
# Health probe
|
|
if report.health_probe:
|
|
hp = report.health_probe
|
|
console.print(
|
|
Panel(
|
|
f"[bold]Health Probe[/bold]\nKind: {hp.kind}\nPath: {hp.path or '-'}\nPort: {hp.port or '-'}\nScheme: {hp.scheme}",
|
|
border_style="green",
|
|
)
|
|
)
|
|
else:
|
|
console.print(
|
|
Panel(
|
|
"[yellow]No HTTP health probe found on containers[/yellow]",
|
|
border_style="yellow",
|
|
)
|
|
)
|
|
|
|
# Services
|
|
if report.services:
|
|
ts = Table(title="Services", box=box.SIMPLE)
|
|
ts.add_column("Service")
|
|
ts.add_column("Namespace")
|
|
ts.add_column("Port")
|
|
ts.add_column("TargetPort")
|
|
ts.add_column("Protocol")
|
|
for s in report.services:
|
|
ts.add_row(
|
|
s.service,
|
|
s.namespace,
|
|
str(s.port),
|
|
str(s.target_port or "-"),
|
|
s.protocol,
|
|
)
|
|
console.print(ts)
|
|
else:
|
|
console.print(
|
|
Panel(
|
|
"[yellow]No Service selects this deployment[/yellow]",
|
|
border_style="yellow",
|
|
)
|
|
)
|
|
|
|
# Ingresses
|
|
if report.ingresses:
|
|
ti = Table(title="Ingress Bindings", box=box.SIMPLE)
|
|
ti.add_column("Ingress")
|
|
ti.add_column("Host")
|
|
ti.add_column("Path")
|
|
ti.add_column("TLS")
|
|
ti.add_column("Service:Port")
|
|
for ib in report.ingresses:
|
|
ti.add_row(
|
|
ib.ingress,
|
|
ib.host or "-",
|
|
ib.path,
|
|
"yes" if ib.tls else "no",
|
|
f"{ib.service}:{ib.service_port}",
|
|
)
|
|
console.print(ti)
|
|
else:
|
|
console.print(
|
|
Panel(
|
|
"[yellow]No Ingress rules reference Services for this deployment[/yellow]",
|
|
border_style="yellow",
|
|
)
|
|
)
|
|
|
|
# Reachability
|
|
tr = Table(title="Reachability", box=box.SIMPLE)
|
|
tr.add_column("Via")
|
|
tr.add_column("Target")
|
|
tr.add_column("URL")
|
|
tr.add_column("OK")
|
|
tr.add_column("Status")
|
|
tr.add_column("Latency (ms)")
|
|
tr.add_column("Error")
|
|
for r in report.reachability:
|
|
tr.add_row(
|
|
r.via,
|
|
r.target,
|
|
r.url or "-",
|
|
"[green]yes[/green]" if r.ok else "[red]no[/red]",
|
|
str(r.status or "-"),
|
|
str(r.latency_ms or "-"),
|
|
r.error or "-",
|
|
)
|
|
console.print(tr)
|
|
|
|
|
|
@app.command("logs")
|
|
def cli_logs(
|
|
deployment: str = typer.Argument(..., help="Deployment name"),
|
|
namespace: str = typer.Option(..., "--namespace", "-n", help="Namespace"),
|
|
container: Optional[str] = typer.Option(
|
|
None, "--container", "-c", help="Specific container name"
|
|
),
|
|
tail: Optional[int] = typer.Option(None, "--tail", help="Tail N lines"),
|
|
since: Optional[int] = typer.Option(
|
|
None, "--since", help="Only return logs newer than N seconds"
|
|
),
|
|
follow: bool = typer.Option(False, "--follow", "-f", help="Stream logs"),
|
|
):
|
|
"""
|
|
Print logs from pods belonging to a Deployment.
|
|
"""
|
|
try:
|
|
print_pod_logs(
|
|
deployment,
|
|
namespace,
|
|
container=container,
|
|
tail=tail,
|
|
since_seconds=since,
|
|
follow=follow,
|
|
)
|
|
except Exception as e:
|
|
console.print(f"[red]Error fetching logs:[/red] {e}")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.callback(invoke_without_command=True)
|
|
def _root(
|
|
ctx: typer.Context,
|
|
):
|
|
"""
|
|
Kubernetes Ingress Debugger CLI.
|
|
"""
|
|
if ctx.invoked_subcommand is None:
|
|
typer.echo(ctx.get_help())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|