Initial Commit for ingress-debugger

Debug ingress and reachability issues.
This commit is contained in:
Waylon Walker 2025-08-30 11:10:18 -05:00
commit 74cafaa556
5 changed files with 1148 additions and 0 deletions

0
.gitignore vendored Normal file
View file

View file

@ -0,0 +1,207 @@
#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "typer",
# "rich",
# "httpx",
# "kubernetes",
# "pydantic",
# ]
# ///
"""
k8s-ingress-debugger
Given a Deployment name, this tool inspects the Kubernetes objects around it and
runs a set of connectivity checks:
Does an Ingress point to it?
What are the Ingress hosts?
What's the healthcheck route (from readiness/liveness HTTP probes)?
Can we access it via:
- Ingress (host/IP)
- Pod IP
- Fully Qualified Service DNS (service.ns.svc.cluster.local)
Provide a convenient logs fetcher
It works both in-cluster and from a developer machine (tries in-cluster first,
then falls back to local kubeconfig). All checker functions are importable and
usable outside of Typer.
Examples
--------
Inspect with rich table output:
./k8s_ingress_debug.py inspect my-deployment -n default
Print JSON (for automation):
./k8s_ingress_debug.py inspect my-deployment -n default --json
Stream logs from all pods of the deployment:
./k8s_ingress_debug.py logs my-deployment -n default -f --tail 200
"""
from __future__ import annotations
import json
import socket
import time
from dataclasses import dataclass
from typing import Iterable, List, Optional, Tuple
import httpx
import typer
from pydantic import BaseModel
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from kubernetes import client, config
from kubernetes.client import (
ApiClient,
AppsV1Api,
CoreV1Api,
NetworkingV1Api,
V1Deployment,
V1Ingress,
V1Service,
V1Pod,
)
from kubernetes.stream import stream
app = typer.Typer(add_completion=False, help="Kubernetes Ingress Debugger")
console = Console()
@dataclass
class KubeCtx:
api_client: ApiClient
core: CoreV1Api
apps: AppsV1Api
net: NetworkingV1Api
in_cluster: bool
def load_kube_ctx() -> KubeCtx:
in_cluster = False
try:
config.load_incluster_config()
in_cluster = True
except Exception:
config.load_kube_config()
api_client = client.ApiClient()
return KubeCtx(
api_client=api_client,
core=CoreV1Api(api_client),
apps=AppsV1Api(api_client),
net=NetworkingV1Api(api_client),
in_cluster=in_cluster,
)
class ProbeInfo(BaseModel):
kind: str
path: Optional[str] = None
port: Optional[str | int] = None
scheme: str = "http"
class ServiceBinding(BaseModel):
service: str
namespace: str
port: int
target_port: str | int | None = None
protocol: str = "TCP"
class IngressBinding(BaseModel):
ingress: str
namespace: str
host: str
path: str
tls: bool
service: str
service_port: int
class Reachability(BaseModel):
via: str
target: str
url: Optional[str] = None
ok: bool = False
status: Optional[int] = None
error: Optional[str] = None
latency_ms: Optional[int] = None
class InspectionReport(BaseModel):
deployment: str
namespace: str
in_cluster: bool
pods: List[str]
pod_ips: dict
container_ports: dict
health_probe: Optional[ProbeInfo] = None
services: List[ServiceBinding] = []
ingresses: List[IngressBinding] = []
reachability: List[Reachability] = []
# === Functions omitted for brevity ===
# They include: find_deployment, pods_for_deployment, services_for_deployment,
# ingresses_for_services, extract_probe, resolve_service_bindings,
# extract_ingress_bindings, dns_resolves, http_check, tcp_check,
# try_exec_http_from_pod, inspect_deployment, print_pod_logs
# === CLI commands ===
@app.command("inspect")
def cli_inspect(
deployment: str,
namespace: Optional[str] = typer.Option(None, "--namespace", "-n"),
timeout: float = typer.Option(5.0, help="HTTP/TCP timeout (seconds)"),
insecure: bool = typer.Option(False, help="Skip TLS verification"),
output_json: bool = typer.Option(False, "--json", help="Print JSON report"),
):
try:
report = inspect_deployment(deployment, namespace=namespace, timeout=timeout, verify_tls=(not insecure))
except Exception as e:
console.print(f"[red]Error:[/red] {e}")
raise typer.Exit(1)
if output_json:
console.print_json(json.dumps(report.model_dump(), indent=2))
return
console.print(Panel(f"Deployment: {report.deployment}\nNamespace: {report.namespace}", border_style="cyan"))
t = Table(title="Pods", box=box.SIMPLE)
t.add_column("Pod")
t.add_column("IP")
t.add_column("Ports")
for pod in report.pods:
ports = ", ".join(str(p) for p in report.container_ports.get(pod, [])) or "-"
t.add_row(pod, report.pod_ips.get(pod, "-"), ports)
console.print(t)
@app.command("logs")
def cli_logs(
deployment: str,
namespace: str = typer.Option(..., "--namespace", "-n"),
container: Optional[str] = typer.Option(None, "--container", "-c"),
tail: Optional[int] = typer.Option(None, "--tail"),
since: Optional[int] = typer.Option(None, "--since"),
follow: bool = typer.Option(False, "--follow", "-f"),
):
try:
print_pod_logs(deployment, namespace, container=container, tail=tail, since_seconds=since, follow=follow)
except Exception as e:
console.print(f"[red]Error fetching logs:[/red] {e}")
raise typer.Exit(1)
if __name__ == "__main__":
app()

3
README.md Normal file
View file

@ -0,0 +1,3 @@
# ingress-debugger
Debug ingress and reachability issues.

Binary file not shown.

938
ingress_debugger.py Executable file
View file

@ -0,0 +1,938 @@
#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "typer",
# "rich",
# "httpx",
# "kubernetes",
# "pydantic",
# ]
# ///
"""
k8s-ingress-debugger
Given a Deployment name, this tool inspects the Kubernetes objects around it and
runs a set of connectivity checks:
Does an Ingress point to it?
What are the Ingress hosts?
What's the healthcheck route (from readiness/liveness HTTP probes)?
Can we access it via:
- Ingress (host/IP)
- Pod IP
- Fully Qualified Service DNS (service.ns.svc.cluster.local)
Provide a convenient logs fetcher
It works both in-cluster and from a developer machine (tries in-cluster first,
then falls back to local kubeconfig). All checker functions are importable and
usable outside of Typer.
Examples
--------
Inspect with rich table output:
./k8s_ingress_debug.py inspect my-deployment -n default
Print JSON (for automation):
./k8s_ingress_debug.py inspect my-deployment -n default --json
Stream logs from all pods of the deployment:
./k8s_ingress_debug.py logs my-deployment -n default -f --tail 200
"""
from __future__ import annotations
import json
import socket
import time
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple
import httpx
import typer
from pydantic import BaseModel, Field
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
# Kubernetes imports
from kubernetes import client, config
from kubernetes.client import (
ApiClient,
AppsV1Api,
CoreV1Api,
NetworkingV1Api,
V1Deployment,
V1Ingress,
V1Service,
V1Pod,
)
from kubernetes.stream import stream
app = typer.Typer(add_completion=False, help="Kubernetes Ingress Debugger")
console = Console()
# =========================
# Kube config + helpers
# =========================
@dataclass
class KubeCtx:
api_client: ApiClient
core: CoreV1Api
apps: AppsV1Api
net: NetworkingV1Api
in_cluster: bool
def load_kube_ctx() -> KubeCtx:
"""
Load Kubernetes configuration, preferring in-cluster.
Falls back to local kubeconfig.
"""
in_cluster = False
try:
config.load_incluster_config()
in_cluster = True
except Exception:
# Not in cluster, try local kubeconfig
config.load_kube_config()
api_client = client.ApiClient()
return KubeCtx(
api_client=api_client,
core=CoreV1Api(api_client),
apps=AppsV1Api(api_client),
net=NetworkingV1Api(api_client),
in_cluster=in_cluster,
)
# =========================
# Discovery models
# =========================
class ProbeInfo(BaseModel):
kind: str # "readiness" | "liveness" | "startup"
path: Optional[str] = None
port: Optional[str | int] = None
scheme: str = "http"
class ServiceBinding(BaseModel):
service: str
namespace: str
port: int
target_port: str | int | None = None
protocol: str = "TCP"
class IngressBinding(BaseModel):
ingress: str
namespace: str
host: str
path: str
tls: bool
service: str
service_port: int
class Reachability(BaseModel):
via: str # "ingress", "pod-ip", "svc-fqdn"
target: str
url: Optional[str] = None
ok: bool = False
status: Optional[int] = None
error: Optional[str] = None
latency_ms: Optional[int] = None
class InspectionReport(BaseModel):
deployment: str
namespace: str
in_cluster: bool
pods: List[str]
pod_ips: Dict[str, str]
container_ports: Dict[str, List[int]]
health_probe: Optional[ProbeInfo] = None
services: List[ServiceBinding] = Field(default_factory=list)
ingresses: List[IngressBinding] = Field(default_factory=list)
reachability: List[Reachability] = Field(default_factory=list)
# =========================
# Core discovery functions
# =========================
def find_deployment(
ctx: KubeCtx, name: str, namespace: Optional[str]
) -> Tuple[V1Deployment, str]:
"""
Return (deployment, namespace).
If namespace not provided, try to find a unique deployment across all namespaces.
"""
if namespace:
dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace)
return dep, namespace
# Search all namespaces for uniqueness
deps = ctx.apps.list_deployment_for_all_namespaces(
field_selector=f"metadata.name={name}"
).items
if not deps:
raise RuntimeError(f"Deployment '{name}' not found in any namespace.")
if len(deps) > 1:
ns_list = ", ".join(sorted({d.metadata.namespace for d in deps}))
raise RuntimeError(
f"Deployment '{name}' found in multiple namespaces: {ns_list}. Please specify --namespace."
)
d = deps[0]
return d, d.metadata.namespace
def pods_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Pod]:
selector = dep.spec.selector.match_labels or {}
if not selector:
return []
label_selector = ",".join(f"{k}={v}" for k, v in selector.items())
pods = ctx.core.list_namespaced_pod(
namespace=dep.metadata.namespace, label_selector=label_selector
).items
return [p for p in pods if p.metadata.deletion_timestamp is None]
def services_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Service]:
"""
Services whose selector is a subset of deployment's selector labels
"""
ns = dep.metadata.namespace
dep_sel = dep.spec.selector.match_labels or {}
svcs = ctx.core.list_namespaced_service(namespace=ns).items
matched = []
for s in svcs:
sel = s.spec.selector or {}
if sel and all(dep_sel.get(k) == v for k, v in sel.items()):
matched.append(s)
return matched
def ingresses_for_services(
ctx: KubeCtx, namespace: str, services: Iterable[V1Service]
) -> List[V1Ingress]:
svc_names = {s.metadata.name for s in services}
ings = ctx.net.list_namespaced_ingress(namespace=namespace).items
out = []
for ing in ings:
if not ing.spec or not ing.spec.rules:
continue
for rule in ing.spec.rules:
if not rule.http or not rule.http.paths:
continue
for p in rule.http.paths:
backend = p.backend
if backend and backend.service and backend.service.name in svc_names:
out.append(ing)
break
# de-dup
seen = set()
uniq = []
for ing in out:
key = (ing.metadata.namespace, ing.metadata.name)
if key not in seen:
seen.add(key)
uniq.append(ing)
return uniq
def extract_probe(dep: V1Deployment) -> Optional[ProbeInfo]:
"""
Prefer readiness > liveness > startup HTTP probes.
"""
tmpl = dep.spec.template
if not tmpl or not tmpl.spec or not tmpl.spec.containers:
return None
def http_probe(container, probe_field: str) -> Optional[ProbeInfo]:
pr = getattr(container, probe_field, None)
if pr and pr.http_get:
path = pr.http_get.path or "/"
port = pr.http_get.port
scheme = (pr.http_get.scheme or "HTTP").lower()
return ProbeInfo(
kind=probe_field.replace("_probe", ""),
path=path,
port=port,
scheme="https" if scheme == "https" else "http",
)
return None
# Check each container, stop at first we find
for c in tmpl.spec.containers:
for field in ("readiness_probe", "liveness_probe", "startup_probe"):
pi = http_probe(c, field)
if pi:
return pi
return None
def resolve_service_bindings(
dep: V1Deployment, services: List[V1Service], preferred_port: Optional[str | int]
) -> List[ServiceBinding]:
"""
Build bindings using Service ports; try to align with probe/targetPort when given.
"""
ns = dep.metadata.namespace
bindings: List[ServiceBinding] = []
for s in services:
for sp in s.spec.ports or []:
port_num = int(sp.port)
target = sp.target_port if isinstance(sp.target_port, (str, int)) else None
# Prefer the service port that matches preferred_port (by name or number)
if preferred_port is not None:
if isinstance(preferred_port, int) and (
target == preferred_port or port_num == preferred_port
):
bindings.append(
ServiceBinding(
service=s.metadata.name,
namespace=ns,
port=port_num,
target_port=target,
protocol=(sp.protocol or "TCP"),
)
)
continue
if isinstance(preferred_port, str) and (
sp.name == preferred_port or target == preferred_port
):
bindings.append(
ServiceBinding(
service=s.metadata.name,
namespace=ns,
port=port_num,
target_port=target,
protocol=(sp.protocol or "TCP"),
)
)
continue
# Otherwise include everything; we'll de-dup later
bindings.append(
ServiceBinding(
service=s.metadata.name,
namespace=ns,
port=port_num,
target_port=target,
protocol=(sp.protocol or "TCP"),
)
)
# de-dup by (svc,port)
seen = set()
uniq: List[ServiceBinding] = []
for b in bindings:
key = (b.service, b.port)
if key not in seen:
seen.add(key)
uniq.append(b)
return uniq
def extract_ingress_bindings(
ingresses: List[V1Ingress], services: List[V1Service]
) -> List[IngressBinding]:
svc_names = {s.metadata.name for s in services}
bindings: List[IngressBinding] = []
for ing in ingresses:
tls_hosts = set()
if ing.spec and ing.spec.tls:
for t in ing.spec.tls:
for h in t.hosts or []:
tls_hosts.add(h)
if not ing.spec or not ing.spec.rules:
continue
for rule in ing.spec.rules:
host = rule.host or ""
if not rule.http or not rule.http.paths:
continue
for p in rule.http.paths:
backend = p.backend
if backend and backend.service and backend.service.name in svc_names:
svc_port = (
int(backend.service.port.number)
if backend.service.port and backend.service.port.number
else 80
)
bindings.append(
IngressBinding(
ingress=ing.metadata.name,
namespace=ing.metadata.namespace,
host=host,
path=p.path or "/",
tls=(host in tls_hosts),
service=backend.service.name,
service_port=svc_port,
)
)
return bindings
# =========================
# Networking helpers
# =========================
def dns_resolves(host: str) -> bool:
try:
socket.gethostbyname(host)
return True
except Exception:
return False
def http_check(
url: str,
host_header: Optional[str] = None,
timeout: float = 5.0,
verify_tls: bool = True,
) -> Reachability:
start = time.perf_counter()
headers = {}
if host_header:
headers["Host"] = host_header
try:
with httpx.Client(
follow_redirects=True, verify=verify_tls, headers=headers, timeout=timeout
) as s:
r = s.get(url)
latency_ms = int((time.perf_counter() - start) * 1000)
return Reachability(
via="ingress" if host_header or url.startswith("http") else "unknown",
target=host_header or url,
url=url,
ok=r.status_code < 500,
status=r.status_code,
error=None,
latency_ms=latency_ms,
)
except Exception as e:
latency_ms = int((time.perf_counter() - start) * 1000)
return Reachability(
via="ingress",
target=host_header or url,
url=url,
ok=False,
status=None,
error=str(e),
latency_ms=latency_ms,
)
def tcp_check(
host: str, port: int, timeout: float = 3.0
) -> Tuple[bool, Optional[str], Optional[int]]:
start = time.perf_counter()
try:
with socket.create_connection((host, port), timeout=timeout):
return True, None, int((time.perf_counter() - start) * 1000)
except Exception as e:
return False, str(e), int((time.perf_counter() - start) * 1000)
def try_exec_http_from_pod(
ctx: KubeCtx, namespace: str, pod: str, url: str, timeout: int = 8
) -> Reachability:
"""
Execute a lightweight HTTP check from within the given pod (best-effort).
Tries curl, then wget. Returns Reachability record with ok status.
"""
cmd = [
"sh",
"-lc",
# Prefer curl (status + timing), fallback to wget; if both missing, try /dev/tcp.
(
f'(command -v curl >/dev/null && curl -sk -o /dev/null -w "%{{http_code}}" "{url}") || '
f'(command -v wget >/dev/null && wget -qO- "{url}" >/dev/null && printf 200) || '
f"(echo 000)"
),
]
try:
out = stream(
ctx.core.connect_get_namespaced_pod_exec,
name=pod,
namespace=namespace,
command=cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=timeout,
)
code = 0
try:
code = int(str(out).strip()[:3])
except Exception:
code = 0
return Reachability(
via="svc-fqdn",
target=url,
url=url,
ok=200 <= code < 500,
status=code,
error=None if 200 <= code < 500 else f"code={code}",
)
except Exception as e:
return Reachability(
via="svc-fqdn", target=url, url=url, ok=False, status=None, error=str(e)
)
# =========================
# High-level inspection
# =========================
def inspect_deployment(
name: str,
namespace: Optional[str] = None,
timeout: float = 5.0,
verify_tls: bool = True,
) -> InspectionReport:
"""
Full inspection routine. Returns a structured report usable by other tools.
"""
ctx = load_kube_ctx()
dep, ns = find_deployment(ctx, name, namespace)
pods = pods_for_deployment(ctx, dep)
pod_names = [p.metadata.name for p in pods]
pod_ips = {p.metadata.name: (p.status.pod_ip or "") for p in pods}
# Collect declared containerPorts
cports: Dict[str, List[int]] = {}
for p in pods:
plist = []
for c in p.spec.containers or []:
for cp in c.ports or []:
if cp.container_port:
plist.append(int(cp.container_port))
cports[p.metadata.name] = sorted({*plist})
probe = extract_probe(dep)
preferred_port: Optional[int | str] = (
probe.port if probe and probe.port is not None else None
)
services = services_for_deployment(ctx, dep)
svc_bindings = resolve_service_bindings(dep, services, preferred_port)
ingresses = ingresses_for_services(ctx, ns, services)
ing_bindings = extract_ingress_bindings(ingresses, services)
# Reachability checks
reach: List[Reachability] = []
# 1) Through Ingress (best effort)
for ib in ing_bindings:
scheme = "https" if ib.tls else "http"
base = f"{scheme}://{ib.host}" if ib.host else ""
path = ib.path or "/"
health_path = probe.path if probe and probe.path else "/"
url = f"{base}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}"
# If host resolves, try directly
if ib.host and dns_resolves(ib.host):
r = http_check(url, timeout=timeout, verify_tls=verify_tls)
r.via = "ingress"
r.target = ib.host
reach.append(r)
else:
# Try using load balancer IP/hostname from status with Host header
target_ips: List[str] = []
for ing in ingresses:
if ing.metadata.name == ib.ingress:
if (
ing.status
and ing.status.load_balancer
and ing.status.load_balancer.ingress
):
for ent in ing.status.load_balancer.ingress:
if ent.ip:
target_ips.append(ent.ip)
if ent.hostname:
target_ips.append(ent.hostname)
if target_ips:
t = target_ips[0]
alt_url = f"{scheme}://{t}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}"
reach.append(
http_check(
alt_url,
host_header=ib.host or None,
timeout=timeout,
verify_tls=verify_tls,
)
)
else:
reach.append(
Reachability(
via="ingress",
target=ib.host or "(no-host)",
url=url or None,
ok=False,
error="No DNS for host and no load balancer address found on Ingress.",
)
)
# 2) Through Pod IP (TCP + optional HTTP on health path)
test_port_candidates: List[int] = []
if preferred_port is not None and isinstance(preferred_port, int):
test_port_candidates.append(preferred_port)
# Add declared service ports
for b in svc_bindings:
if b.port not in test_port_candidates:
test_port_candidates.append(b.port)
# Add container ports if nothing else
if not test_port_candidates:
for plist in cports.values():
for pnum in plist:
if pnum not in test_port_candidates:
test_port_candidates.append(pnum)
for pod in pods:
ip = pod.status.pod_ip
if not ip:
reach.append(
Reachability(
via="pod-ip",
target=pod.metadata.name,
ok=False,
error="No Pod IP assigned",
)
)
continue
# Try TCP on first viable port
if test_port_candidates:
port = test_port_candidates[0]
ok, err, _lat = tcp_check(ip, port, timeout=timeout)
if not ok:
reach.append(
Reachability(
via="pod-ip", target=f"{ip}:{port}", ok=False, error=err
)
)
else:
# Try HTTP GET if we have a health path
health_path = probe.path if probe and probe.path else "/"
url = f"http://{ip}:{port}{health_path if health_path.startswith('/') else '/' + health_path}"
r = http_check(url, timeout=timeout, verify_tls=False)
r.via = "pod-ip"
r.target = f"{ip}:{port}"
reach.append(r)
else:
reach.append(
Reachability(
via="pod-ip",
target=ip,
ok=False,
error="No candidate port found to test",
)
)
# 3) Through fully qualified Service DNS (from inside cluster if needed)
# Choose first service binding if available
if svc_bindings:
sb = svc_bindings[0]
fqdn = f"{sb.service}.{sb.namespace}.svc.cluster.local"
health_path = probe.path if probe and probe.path else "/"
url = f"http://{fqdn}:{sb.port}{health_path if health_path.startswith('/') else '/' + health_path}"
ctx = load_kube_ctx()
if ctx.in_cluster and dns_resolves(fqdn):
r = http_check(url, timeout=timeout, verify_tls=False)
r.via = "svc-fqdn"
r.target = fqdn
reach.append(r)
else:
if pods:
reach.append(
try_exec_http_from_pod(
ctx, sb.namespace, pods[0].metadata.name, url
)
)
else:
reach.append(
Reachability(
via="svc-fqdn",
target=url,
url=url,
ok=False,
error="No pods available to test inside cluster",
)
)
else:
reach.append(
Reachability(
via="svc-fqdn",
target="(no service)",
ok=False,
error="No Service bound to the deployment",
)
)
return InspectionReport(
deployment=dep.metadata.name,
namespace=ns,
in_cluster=ctx.in_cluster,
pods=pod_names,
pod_ips=pod_ips,
container_ports=cports,
health_probe=probe,
services=svc_bindings,
ingresses=ing_bindings,
reachability=reach,
)
# =========================
# Logs helpers
# =========================
def deployment_pods(ctx: KubeCtx, name: str, namespace: str) -> List[V1Pod]:
dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace)
return pods_for_deployment(ctx, dep)
def print_pod_logs(
name: str,
namespace: str,
container: Optional[str] = None,
tail: Optional[int] = None,
since_seconds: Optional[int] = None,
follow: bool = False,
) -> None:
ctx = load_kube_ctx()
pods = deployment_pods(ctx, name, namespace)
if not pods:
console.print(f"[red]No pods found for deployment {name} in {namespace}[/red]")
raise typer.Exit(1)
# If follow, stream each pod in sequence (simple approach)
for p in pods:
console.rule(f"[bold]Logs: {p.metadata.name}[/bold]")
if follow:
# naive follow using repeated calls
try:
for line in ctx.core.read_namespaced_pod_log(
name=p.metadata.name,
namespace=namespace,
container=container,
tail_lines=tail,
since_seconds=since_seconds,
follow=True,
_preload_content=False,
).stream(decode_content=True):
try:
console.print(line.decode("utf-8").rstrip())
except Exception:
console.print(line)
except KeyboardInterrupt:
break
else:
out = ctx.core.read_namespaced_pod_log(
name=p.metadata.name,
namespace=namespace,
container=container,
tail_lines=tail,
since_seconds=since_seconds,
)
console.print(out)
# =========================
# CLI commands
# =========================
@app.command("inspect")
def cli_inspect(
deployment: str = typer.Argument(..., help="Deployment name"),
namespace: Optional[str] = typer.Option(
None,
"--namespace",
"-n",
help="Namespace (if omitted, will try to auto-detect)",
),
timeout: float = typer.Option(5.0, help="HTTP/TCP timeout (seconds)"),
insecure: bool = typer.Option(False, help="Skip TLS verification for HTTPS checks"),
output_json: bool = typer.Option(
False, "--json", help="Print JSON report instead of a table"
),
):
"""
Inspect a Deployment's Services & Ingresses and run connectivity checks.
"""
try:
report = inspect_deployment(
deployment, namespace=namespace, timeout=timeout, verify_tls=(not insecure)
)
except Exception as e:
console.print(f"[red]Error:[/red] {e}")
raise typer.Exit(1)
if output_json:
console.print_json(json.dumps(report.model_dump(), indent=2))
return
hdr = (
f"[bold white]Deployment:[/bold white] {report.deployment} "
f"[bold white]Namespace:[/bold white] {report.namespace} "
f"[bold white]Context:[/bold white] {'in-cluster' if report.in_cluster else 'local'}"
)
console.print(Panel(hdr, border_style="cyan", title="Overview"))
# Pods table
t = Table(title="Pods", box=box.SIMPLE, show_lines=False)
t.add_column("Pod")
t.add_column("IP")
t.add_column("Ports")
for pod in report.pods:
ports = ", ".join(str(p) for p in report.container_ports.get(pod, [])) or "-"
t.add_row(pod, report.pod_ips.get(pod, "-"), ports)
console.print(t)
# Health probe
if report.health_probe:
hp = report.health_probe
console.print(
Panel(
f"[bold]Health Probe[/bold]\nKind: {hp.kind}\nPath: {hp.path or '-'}\nPort: {hp.port or '-'}\nScheme: {hp.scheme}",
border_style="green",
)
)
else:
console.print(
Panel(
"[yellow]No HTTP health probe found on containers[/yellow]",
border_style="yellow",
)
)
# Services
if report.services:
ts = Table(title="Services", box=box.SIMPLE)
ts.add_column("Service")
ts.add_column("Namespace")
ts.add_column("Port")
ts.add_column("TargetPort")
ts.add_column("Protocol")
for s in report.services:
ts.add_row(
s.service,
s.namespace,
str(s.port),
str(s.target_port or "-"),
s.protocol,
)
console.print(ts)
else:
console.print(
Panel(
"[yellow]No Service selects this deployment[/yellow]",
border_style="yellow",
)
)
# Ingresses
if report.ingresses:
ti = Table(title="Ingress Bindings", box=box.SIMPLE)
ti.add_column("Ingress")
ti.add_column("Host")
ti.add_column("Path")
ti.add_column("TLS")
ti.add_column("Service:Port")
for ib in report.ingresses:
ti.add_row(
ib.ingress,
ib.host or "-",
ib.path,
"yes" if ib.tls else "no",
f"{ib.service}:{ib.service_port}",
)
console.print(ti)
else:
console.print(
Panel(
"[yellow]No Ingress rules reference Services for this deployment[/yellow]",
border_style="yellow",
)
)
# Reachability
tr = Table(title="Reachability", box=box.SIMPLE)
tr.add_column("Via")
tr.add_column("Target")
tr.add_column("URL")
tr.add_column("OK")
tr.add_column("Status")
tr.add_column("Latency (ms)")
tr.add_column("Error")
for r in report.reachability:
tr.add_row(
r.via,
r.target,
r.url or "-",
"[green]yes[/green]" if r.ok else "[red]no[/red]",
str(r.status or "-"),
str(r.latency_ms or "-"),
r.error or "-",
)
console.print(tr)
@app.command("logs")
def cli_logs(
deployment: str = typer.Argument(..., help="Deployment name"),
namespace: str = typer.Option(..., "--namespace", "-n", help="Namespace"),
container: Optional[str] = typer.Option(
None, "--container", "-c", help="Specific container name"
),
tail: Optional[int] = typer.Option(None, "--tail", help="Tail N lines"),
since: Optional[int] = typer.Option(
None, "--since", help="Only return logs newer than N seconds"
),
follow: bool = typer.Option(False, "--follow", "-f", help="Stream logs"),
):
"""
Print logs from pods belonging to a Deployment.
"""
try:
print_pod_logs(
deployment,
namespace,
container=container,
tail=tail,
since_seconds=since,
follow=follow,
)
except Exception as e:
console.print(f"[red]Error fetching logs:[/red] {e}")
raise typer.Exit(1)
@app.callback(invoke_without_command=True)
def _root(
ctx: typer.Context,
):
"""
Kubernetes Ingress Debugger CLI.
"""
if ctx.invoked_subcommand is None:
typer.echo(ctx.get_help())
if __name__ == "__main__":
app()