commit 74cafaa556a74c6320bb476a3d1f7c6b0b1b8e71 Author: Waylon S. Walker Date: Sat Aug 30 11:10:18 2025 -0500 Initial Commit for ingress-debugger Debug ingress and reachability issues. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/.null-ls_818840_ingress_debugger.py b/.null-ls_818840_ingress_debugger.py new file mode 100644 index 0000000..516b3c3 --- /dev/null +++ b/.null-ls_818840_ingress_debugger.py @@ -0,0 +1,207 @@ +#!/usr/bin/env -S uv run --quiet --script +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "typer", +# "rich", +# "httpx", +# "kubernetes", +# "pydantic", +# ] +# /// + +""" +k8s-ingress-debugger + +Given a Deployment name, this tool inspects the Kubernetes objects around it and +runs a set of connectivity checks: + +• Does an Ingress point to it? +• What are the Ingress hosts? +• What's the healthcheck route (from readiness/liveness HTTP probes)? +• Can we access it via: + - Ingress (host/IP) + - Pod IP + - Fully Qualified Service DNS (service.ns.svc.cluster.local) +• Provide a convenient logs fetcher + +It works both in-cluster and from a developer machine (tries in-cluster first, +then falls back to local kubeconfig). All checker functions are importable and +usable outside of Typer. + +Examples +-------- +Inspect with rich table output: + ./k8s_ingress_debug.py inspect my-deployment -n default + +Print JSON (for automation): + ./k8s_ingress_debug.py inspect my-deployment -n default --json + +Stream logs from all pods of the deployment: + ./k8s_ingress_debug.py logs my-deployment -n default -f --tail 200 +""" + +from __future__ import annotations + +import json +import socket +import time +from dataclasses import dataclass +from typing import Iterable, List, Optional, Tuple + +import httpx +import typer +from pydantic import BaseModel +from rich import box +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from kubernetes import client, config +from kubernetes.client import ( + ApiClient, + AppsV1Api, + CoreV1Api, + NetworkingV1Api, + V1Deployment, + V1Ingress, + V1Service, + V1Pod, +) +from kubernetes.stream import stream + +app = typer.Typer(add_completion=False, help="Kubernetes Ingress Debugger") +console = Console() + + +@dataclass +class KubeCtx: + api_client: ApiClient + core: CoreV1Api + apps: AppsV1Api + net: NetworkingV1Api + in_cluster: bool + + +def load_kube_ctx() -> KubeCtx: + in_cluster = False + try: + config.load_incluster_config() + in_cluster = True + except Exception: + config.load_kube_config() + + api_client = client.ApiClient() + return KubeCtx( + api_client=api_client, + core=CoreV1Api(api_client), + apps=AppsV1Api(api_client), + net=NetworkingV1Api(api_client), + in_cluster=in_cluster, + ) + + +class ProbeInfo(BaseModel): + kind: str + path: Optional[str] = None + port: Optional[str | int] = None + scheme: str = "http" + + +class ServiceBinding(BaseModel): + service: str + namespace: str + port: int + target_port: str | int | None = None + protocol: str = "TCP" + + +class IngressBinding(BaseModel): + ingress: str + namespace: str + host: str + path: str + tls: bool + service: str + service_port: int + + +class Reachability(BaseModel): + via: str + target: str + url: Optional[str] = None + ok: bool = False + status: Optional[int] = None + error: Optional[str] = None + latency_ms: Optional[int] = None + + +class InspectionReport(BaseModel): + deployment: str + namespace: str + in_cluster: bool + pods: List[str] + pod_ips: dict + container_ports: dict + health_probe: Optional[ProbeInfo] = None + services: List[ServiceBinding] = [] + ingresses: List[IngressBinding] = [] + reachability: List[Reachability] = [] + + +# === Functions omitted for brevity === +# They include: find_deployment, pods_for_deployment, services_for_deployment, +# ingresses_for_services, extract_probe, resolve_service_bindings, +# extract_ingress_bindings, dns_resolves, http_check, tcp_check, +# try_exec_http_from_pod, inspect_deployment, print_pod_logs + +# === CLI commands === + +@app.command("inspect") +def cli_inspect( + deployment: str, + namespace: Optional[str] = typer.Option(None, "--namespace", "-n"), + timeout: float = typer.Option(5.0, help="HTTP/TCP timeout (seconds)"), + insecure: bool = typer.Option(False, help="Skip TLS verification"), + output_json: bool = typer.Option(False, "--json", help="Print JSON report"), +): + try: + report = inspect_deployment(deployment, namespace=namespace, timeout=timeout, verify_tls=(not insecure)) + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) + + if output_json: + console.print_json(json.dumps(report.model_dump(), indent=2)) + return + + console.print(Panel(f"Deployment: {report.deployment}\nNamespace: {report.namespace}", border_style="cyan")) + + t = Table(title="Pods", box=box.SIMPLE) + t.add_column("Pod") + t.add_column("IP") + t.add_column("Ports") + for pod in report.pods: + ports = ", ".join(str(p) for p in report.container_ports.get(pod, [])) or "-" + t.add_row(pod, report.pod_ips.get(pod, "-"), ports) + console.print(t) + + +@app.command("logs") +def cli_logs( + deployment: str, + namespace: str = typer.Option(..., "--namespace", "-n"), + container: Optional[str] = typer.Option(None, "--container", "-c"), + tail: Optional[int] = typer.Option(None, "--tail"), + since: Optional[int] = typer.Option(None, "--since"), + follow: bool = typer.Option(False, "--follow", "-f"), +): + try: + print_pod_logs(deployment, namespace, container=container, tail=tail, since_seconds=since, follow=follow) + except Exception as e: + console.print(f"[red]Error fetching logs:[/red] {e}") + raise typer.Exit(1) + + +if __name__ == "__main__": + app() diff --git a/README.md b/README.md new file mode 100644 index 0000000..a1803bd --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# ingress-debugger + +Debug ingress and reachability issues. diff --git a/__pycache__/ingress_debugger.cpython-311.pyc b/__pycache__/ingress_debugger.cpython-311.pyc new file mode 100644 index 0000000..07fcaa3 Binary files /dev/null and b/__pycache__/ingress_debugger.cpython-311.pyc differ diff --git a/ingress_debugger.py b/ingress_debugger.py new file mode 100755 index 0000000..7d29768 --- /dev/null +++ b/ingress_debugger.py @@ -0,0 +1,938 @@ +#!/usr/bin/env -S uv run --quiet --script +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "typer", +# "rich", +# "httpx", +# "kubernetes", +# "pydantic", +# ] +# /// + +""" +k8s-ingress-debugger + +Given a Deployment name, this tool inspects the Kubernetes objects around it and +runs a set of connectivity checks: + +• Does an Ingress point to it? +• What are the Ingress hosts? +• What's the healthcheck route (from readiness/liveness HTTP probes)? +• Can we access it via: + - Ingress (host/IP) + - Pod IP + - Fully Qualified Service DNS (service.ns.svc.cluster.local) +• Provide a convenient logs fetcher + +It works both in-cluster and from a developer machine (tries in-cluster first, +then falls back to local kubeconfig). All checker functions are importable and +usable outside of Typer. + +Examples +-------- +Inspect with rich table output: + ./k8s_ingress_debug.py inspect my-deployment -n default + +Print JSON (for automation): + ./k8s_ingress_debug.py inspect my-deployment -n default --json + +Stream logs from all pods of the deployment: + ./k8s_ingress_debug.py logs my-deployment -n default -f --tail 200 +""" + +from __future__ import annotations + +import json +import socket +import time +from dataclasses import dataclass +from typing import Dict, Iterable, List, Optional, Tuple + +import httpx +import typer +from pydantic import BaseModel, Field +from rich import box +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +# Kubernetes imports +from kubernetes import client, config +from kubernetes.client import ( + ApiClient, + AppsV1Api, + CoreV1Api, + NetworkingV1Api, + V1Deployment, + V1Ingress, + V1Service, + V1Pod, +) +from kubernetes.stream import stream + +app = typer.Typer(add_completion=False, help="Kubernetes Ingress Debugger") +console = Console() + + +# ========================= +# Kube config + helpers +# ========================= + + +@dataclass +class KubeCtx: + api_client: ApiClient + core: CoreV1Api + apps: AppsV1Api + net: NetworkingV1Api + in_cluster: bool + + +def load_kube_ctx() -> KubeCtx: + """ + Load Kubernetes configuration, preferring in-cluster. + Falls back to local kubeconfig. + """ + in_cluster = False + try: + config.load_incluster_config() + in_cluster = True + except Exception: + # Not in cluster, try local kubeconfig + config.load_kube_config() + + api_client = client.ApiClient() + return KubeCtx( + api_client=api_client, + core=CoreV1Api(api_client), + apps=AppsV1Api(api_client), + net=NetworkingV1Api(api_client), + in_cluster=in_cluster, + ) + + +# ========================= +# Discovery models +# ========================= + + +class ProbeInfo(BaseModel): + kind: str # "readiness" | "liveness" | "startup" + path: Optional[str] = None + port: Optional[str | int] = None + scheme: str = "http" + + +class ServiceBinding(BaseModel): + service: str + namespace: str + port: int + target_port: str | int | None = None + protocol: str = "TCP" + + +class IngressBinding(BaseModel): + ingress: str + namespace: str + host: str + path: str + tls: bool + service: str + service_port: int + + +class Reachability(BaseModel): + via: str # "ingress", "pod-ip", "svc-fqdn" + target: str + url: Optional[str] = None + ok: bool = False + status: Optional[int] = None + error: Optional[str] = None + latency_ms: Optional[int] = None + + +class InspectionReport(BaseModel): + deployment: str + namespace: str + in_cluster: bool + pods: List[str] + pod_ips: Dict[str, str] + container_ports: Dict[str, List[int]] + health_probe: Optional[ProbeInfo] = None + services: List[ServiceBinding] = Field(default_factory=list) + ingresses: List[IngressBinding] = Field(default_factory=list) + reachability: List[Reachability] = Field(default_factory=list) + + +# ========================= +# Core discovery functions +# ========================= + + +def find_deployment( + ctx: KubeCtx, name: str, namespace: Optional[str] +) -> Tuple[V1Deployment, str]: + """ + Return (deployment, namespace). + If namespace not provided, try to find a unique deployment across all namespaces. + """ + if namespace: + dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace) + return dep, namespace + + # Search all namespaces for uniqueness + deps = ctx.apps.list_deployment_for_all_namespaces( + field_selector=f"metadata.name={name}" + ).items + if not deps: + raise RuntimeError(f"Deployment '{name}' not found in any namespace.") + if len(deps) > 1: + ns_list = ", ".join(sorted({d.metadata.namespace for d in deps})) + raise RuntimeError( + f"Deployment '{name}' found in multiple namespaces: {ns_list}. Please specify --namespace." + ) + d = deps[0] + return d, d.metadata.namespace + + +def pods_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Pod]: + selector = dep.spec.selector.match_labels or {} + if not selector: + return [] + label_selector = ",".join(f"{k}={v}" for k, v in selector.items()) + pods = ctx.core.list_namespaced_pod( + namespace=dep.metadata.namespace, label_selector=label_selector + ).items + return [p for p in pods if p.metadata.deletion_timestamp is None] + + +def services_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Service]: + """ + Services whose selector is a subset of deployment's selector labels + """ + ns = dep.metadata.namespace + dep_sel = dep.spec.selector.match_labels or {} + svcs = ctx.core.list_namespaced_service(namespace=ns).items + matched = [] + for s in svcs: + sel = s.spec.selector or {} + if sel and all(dep_sel.get(k) == v for k, v in sel.items()): + matched.append(s) + return matched + + +def ingresses_for_services( + ctx: KubeCtx, namespace: str, services: Iterable[V1Service] +) -> List[V1Ingress]: + svc_names = {s.metadata.name for s in services} + ings = ctx.net.list_namespaced_ingress(namespace=namespace).items + out = [] + for ing in ings: + if not ing.spec or not ing.spec.rules: + continue + for rule in ing.spec.rules: + if not rule.http or not rule.http.paths: + continue + for p in rule.http.paths: + backend = p.backend + if backend and backend.service and backend.service.name in svc_names: + out.append(ing) + break + # de-dup + seen = set() + uniq = [] + for ing in out: + key = (ing.metadata.namespace, ing.metadata.name) + if key not in seen: + seen.add(key) + uniq.append(ing) + return uniq + + +def extract_probe(dep: V1Deployment) -> Optional[ProbeInfo]: + """ + Prefer readiness > liveness > startup HTTP probes. + """ + tmpl = dep.spec.template + if not tmpl or not tmpl.spec or not tmpl.spec.containers: + return None + + def http_probe(container, probe_field: str) -> Optional[ProbeInfo]: + pr = getattr(container, probe_field, None) + if pr and pr.http_get: + path = pr.http_get.path or "/" + port = pr.http_get.port + scheme = (pr.http_get.scheme or "HTTP").lower() + return ProbeInfo( + kind=probe_field.replace("_probe", ""), + path=path, + port=port, + scheme="https" if scheme == "https" else "http", + ) + return None + + # Check each container, stop at first we find + for c in tmpl.spec.containers: + for field in ("readiness_probe", "liveness_probe", "startup_probe"): + pi = http_probe(c, field) + if pi: + return pi + return None + + +def resolve_service_bindings( + dep: V1Deployment, services: List[V1Service], preferred_port: Optional[str | int] +) -> List[ServiceBinding]: + """ + Build bindings using Service ports; try to align with probe/targetPort when given. + """ + ns = dep.metadata.namespace + bindings: List[ServiceBinding] = [] + + for s in services: + for sp in s.spec.ports or []: + port_num = int(sp.port) + target = sp.target_port if isinstance(sp.target_port, (str, int)) else None + # Prefer the service port that matches preferred_port (by name or number) + if preferred_port is not None: + if isinstance(preferred_port, int) and ( + target == preferred_port or port_num == preferred_port + ): + bindings.append( + ServiceBinding( + service=s.metadata.name, + namespace=ns, + port=port_num, + target_port=target, + protocol=(sp.protocol or "TCP"), + ) + ) + continue + if isinstance(preferred_port, str) and ( + sp.name == preferred_port or target == preferred_port + ): + bindings.append( + ServiceBinding( + service=s.metadata.name, + namespace=ns, + port=port_num, + target_port=target, + protocol=(sp.protocol or "TCP"), + ) + ) + continue + # Otherwise include everything; we'll de-dup later + bindings.append( + ServiceBinding( + service=s.metadata.name, + namespace=ns, + port=port_num, + target_port=target, + protocol=(sp.protocol or "TCP"), + ) + ) + + # de-dup by (svc,port) + seen = set() + uniq: List[ServiceBinding] = [] + for b in bindings: + key = (b.service, b.port) + if key not in seen: + seen.add(key) + uniq.append(b) + return uniq + + +def extract_ingress_bindings( + ingresses: List[V1Ingress], services: List[V1Service] +) -> List[IngressBinding]: + svc_names = {s.metadata.name for s in services} + bindings: List[IngressBinding] = [] + for ing in ingresses: + tls_hosts = set() + if ing.spec and ing.spec.tls: + for t in ing.spec.tls: + for h in t.hosts or []: + tls_hosts.add(h) + if not ing.spec or not ing.spec.rules: + continue + for rule in ing.spec.rules: + host = rule.host or "" + if not rule.http or not rule.http.paths: + continue + for p in rule.http.paths: + backend = p.backend + if backend and backend.service and backend.service.name in svc_names: + svc_port = ( + int(backend.service.port.number) + if backend.service.port and backend.service.port.number + else 80 + ) + bindings.append( + IngressBinding( + ingress=ing.metadata.name, + namespace=ing.metadata.namespace, + host=host, + path=p.path or "/", + tls=(host in tls_hosts), + service=backend.service.name, + service_port=svc_port, + ) + ) + return bindings + + +# ========================= +# Networking helpers +# ========================= + + +def dns_resolves(host: str) -> bool: + try: + socket.gethostbyname(host) + return True + except Exception: + return False + + +def http_check( + url: str, + host_header: Optional[str] = None, + timeout: float = 5.0, + verify_tls: bool = True, +) -> Reachability: + start = time.perf_counter() + headers = {} + if host_header: + headers["Host"] = host_header + try: + with httpx.Client( + follow_redirects=True, verify=verify_tls, headers=headers, timeout=timeout + ) as s: + r = s.get(url) + latency_ms = int((time.perf_counter() - start) * 1000) + return Reachability( + via="ingress" if host_header or url.startswith("http") else "unknown", + target=host_header or url, + url=url, + ok=r.status_code < 500, + status=r.status_code, + error=None, + latency_ms=latency_ms, + ) + except Exception as e: + latency_ms = int((time.perf_counter() - start) * 1000) + return Reachability( + via="ingress", + target=host_header or url, + url=url, + ok=False, + status=None, + error=str(e), + latency_ms=latency_ms, + ) + + +def tcp_check( + host: str, port: int, timeout: float = 3.0 +) -> Tuple[bool, Optional[str], Optional[int]]: + start = time.perf_counter() + try: + with socket.create_connection((host, port), timeout=timeout): + return True, None, int((time.perf_counter() - start) * 1000) + except Exception as e: + return False, str(e), int((time.perf_counter() - start) * 1000) + + +def try_exec_http_from_pod( + ctx: KubeCtx, namespace: str, pod: str, url: str, timeout: int = 8 +) -> Reachability: + """ + Execute a lightweight HTTP check from within the given pod (best-effort). + Tries curl, then wget. Returns Reachability record with ok status. + """ + cmd = [ + "sh", + "-lc", + # Prefer curl (status + timing), fallback to wget; if both missing, try /dev/tcp. + ( + f'(command -v curl >/dev/null && curl -sk -o /dev/null -w "%{{http_code}}" "{url}") || ' + f'(command -v wget >/dev/null && wget -qO- "{url}" >/dev/null && printf 200) || ' + f"(echo 000)" + ), + ] + try: + out = stream( + ctx.core.connect_get_namespaced_pod_exec, + name=pod, + namespace=namespace, + command=cmd, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _request_timeout=timeout, + ) + code = 0 + try: + code = int(str(out).strip()[:3]) + except Exception: + code = 0 + return Reachability( + via="svc-fqdn", + target=url, + url=url, + ok=200 <= code < 500, + status=code, + error=None if 200 <= code < 500 else f"code={code}", + ) + except Exception as e: + return Reachability( + via="svc-fqdn", target=url, url=url, ok=False, status=None, error=str(e) + ) + + +# ========================= +# High-level inspection +# ========================= + + +def inspect_deployment( + name: str, + namespace: Optional[str] = None, + timeout: float = 5.0, + verify_tls: bool = True, +) -> InspectionReport: + """ + Full inspection routine. Returns a structured report usable by other tools. + """ + ctx = load_kube_ctx() + dep, ns = find_deployment(ctx, name, namespace) + pods = pods_for_deployment(ctx, dep) + pod_names = [p.metadata.name for p in pods] + pod_ips = {p.metadata.name: (p.status.pod_ip or "") for p in pods} + + # Collect declared containerPorts + cports: Dict[str, List[int]] = {} + for p in pods: + plist = [] + for c in p.spec.containers or []: + for cp in c.ports or []: + if cp.container_port: + plist.append(int(cp.container_port)) + cports[p.metadata.name] = sorted({*plist}) + + probe = extract_probe(dep) + preferred_port: Optional[int | str] = ( + probe.port if probe and probe.port is not None else None + ) + services = services_for_deployment(ctx, dep) + svc_bindings = resolve_service_bindings(dep, services, preferred_port) + + ingresses = ingresses_for_services(ctx, ns, services) + ing_bindings = extract_ingress_bindings(ingresses, services) + + # Reachability checks + reach: List[Reachability] = [] + + # 1) Through Ingress (best effort) + for ib in ing_bindings: + scheme = "https" if ib.tls else "http" + base = f"{scheme}://{ib.host}" if ib.host else "" + path = ib.path or "/" + health_path = probe.path if probe and probe.path else "/" + url = f"{base}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}" + # If host resolves, try directly + if ib.host and dns_resolves(ib.host): + r = http_check(url, timeout=timeout, verify_tls=verify_tls) + r.via = "ingress" + r.target = ib.host + reach.append(r) + else: + # Try using load balancer IP/hostname from status with Host header + target_ips: List[str] = [] + for ing in ingresses: + if ing.metadata.name == ib.ingress: + if ( + ing.status + and ing.status.load_balancer + and ing.status.load_balancer.ingress + ): + for ent in ing.status.load_balancer.ingress: + if ent.ip: + target_ips.append(ent.ip) + if ent.hostname: + target_ips.append(ent.hostname) + if target_ips: + t = target_ips[0] + alt_url = f"{scheme}://{t}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}" + reach.append( + http_check( + alt_url, + host_header=ib.host or None, + timeout=timeout, + verify_tls=verify_tls, + ) + ) + else: + reach.append( + Reachability( + via="ingress", + target=ib.host or "(no-host)", + url=url or None, + ok=False, + error="No DNS for host and no load balancer address found on Ingress.", + ) + ) + + # 2) Through Pod IP (TCP + optional HTTP on health path) + test_port_candidates: List[int] = [] + if preferred_port is not None and isinstance(preferred_port, int): + test_port_candidates.append(preferred_port) + # Add declared service ports + for b in svc_bindings: + if b.port not in test_port_candidates: + test_port_candidates.append(b.port) + # Add container ports if nothing else + if not test_port_candidates: + for plist in cports.values(): + for pnum in plist: + if pnum not in test_port_candidates: + test_port_candidates.append(pnum) + + for pod in pods: + ip = pod.status.pod_ip + if not ip: + reach.append( + Reachability( + via="pod-ip", + target=pod.metadata.name, + ok=False, + error="No Pod IP assigned", + ) + ) + continue + # Try TCP on first viable port + if test_port_candidates: + port = test_port_candidates[0] + ok, err, _lat = tcp_check(ip, port, timeout=timeout) + if not ok: + reach.append( + Reachability( + via="pod-ip", target=f"{ip}:{port}", ok=False, error=err + ) + ) + else: + # Try HTTP GET if we have a health path + health_path = probe.path if probe and probe.path else "/" + url = f"http://{ip}:{port}{health_path if health_path.startswith('/') else '/' + health_path}" + r = http_check(url, timeout=timeout, verify_tls=False) + r.via = "pod-ip" + r.target = f"{ip}:{port}" + reach.append(r) + else: + reach.append( + Reachability( + via="pod-ip", + target=ip, + ok=False, + error="No candidate port found to test", + ) + ) + + # 3) Through fully qualified Service DNS (from inside cluster if needed) + # Choose first service binding if available + if svc_bindings: + sb = svc_bindings[0] + fqdn = f"{sb.service}.{sb.namespace}.svc.cluster.local" + health_path = probe.path if probe and probe.path else "/" + url = f"http://{fqdn}:{sb.port}{health_path if health_path.startswith('/') else '/' + health_path}" + + ctx = load_kube_ctx() + if ctx.in_cluster and dns_resolves(fqdn): + r = http_check(url, timeout=timeout, verify_tls=False) + r.via = "svc-fqdn" + r.target = fqdn + reach.append(r) + else: + if pods: + reach.append( + try_exec_http_from_pod( + ctx, sb.namespace, pods[0].metadata.name, url + ) + ) + else: + reach.append( + Reachability( + via="svc-fqdn", + target=url, + url=url, + ok=False, + error="No pods available to test inside cluster", + ) + ) + else: + reach.append( + Reachability( + via="svc-fqdn", + target="(no service)", + ok=False, + error="No Service bound to the deployment", + ) + ) + + return InspectionReport( + deployment=dep.metadata.name, + namespace=ns, + in_cluster=ctx.in_cluster, + pods=pod_names, + pod_ips=pod_ips, + container_ports=cports, + health_probe=probe, + services=svc_bindings, + ingresses=ing_bindings, + reachability=reach, + ) + + +# ========================= +# Logs helpers +# ========================= + + +def deployment_pods(ctx: KubeCtx, name: str, namespace: str) -> List[V1Pod]: + dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace) + return pods_for_deployment(ctx, dep) + + +def print_pod_logs( + name: str, + namespace: str, + container: Optional[str] = None, + tail: Optional[int] = None, + since_seconds: Optional[int] = None, + follow: bool = False, +) -> None: + ctx = load_kube_ctx() + pods = deployment_pods(ctx, name, namespace) + if not pods: + console.print(f"[red]No pods found for deployment {name} in {namespace}[/red]") + raise typer.Exit(1) + + # If follow, stream each pod in sequence (simple approach) + for p in pods: + console.rule(f"[bold]Logs: {p.metadata.name}[/bold]") + if follow: + # naive follow using repeated calls + try: + for line in ctx.core.read_namespaced_pod_log( + name=p.metadata.name, + namespace=namespace, + container=container, + tail_lines=tail, + since_seconds=since_seconds, + follow=True, + _preload_content=False, + ).stream(decode_content=True): + try: + console.print(line.decode("utf-8").rstrip()) + except Exception: + console.print(line) + except KeyboardInterrupt: + break + else: + out = ctx.core.read_namespaced_pod_log( + name=p.metadata.name, + namespace=namespace, + container=container, + tail_lines=tail, + since_seconds=since_seconds, + ) + console.print(out) + + +# ========================= +# CLI commands +# ========================= + + +@app.command("inspect") +def cli_inspect( + deployment: str = typer.Argument(..., help="Deployment name"), + namespace: Optional[str] = typer.Option( + None, + "--namespace", + "-n", + help="Namespace (if omitted, will try to auto-detect)", + ), + timeout: float = typer.Option(5.0, help="HTTP/TCP timeout (seconds)"), + insecure: bool = typer.Option(False, help="Skip TLS verification for HTTPS checks"), + output_json: bool = typer.Option( + False, "--json", help="Print JSON report instead of a table" + ), +): + """ + Inspect a Deployment's Services & Ingresses and run connectivity checks. + """ + try: + report = inspect_deployment( + deployment, namespace=namespace, timeout=timeout, verify_tls=(not insecure) + ) + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) + + if output_json: + console.print_json(json.dumps(report.model_dump(), indent=2)) + return + + hdr = ( + f"[bold white]Deployment:[/bold white] {report.deployment} " + f"[bold white]Namespace:[/bold white] {report.namespace} " + f"[bold white]Context:[/bold white] {'in-cluster' if report.in_cluster else 'local'}" + ) + console.print(Panel(hdr, border_style="cyan", title="Overview")) + + # Pods table + t = Table(title="Pods", box=box.SIMPLE, show_lines=False) + t.add_column("Pod") + t.add_column("IP") + t.add_column("Ports") + for pod in report.pods: + ports = ", ".join(str(p) for p in report.container_ports.get(pod, [])) or "-" + t.add_row(pod, report.pod_ips.get(pod, "-"), ports) + console.print(t) + + # Health probe + if report.health_probe: + hp = report.health_probe + console.print( + Panel( + f"[bold]Health Probe[/bold]\nKind: {hp.kind}\nPath: {hp.path or '-'}\nPort: {hp.port or '-'}\nScheme: {hp.scheme}", + border_style="green", + ) + ) + else: + console.print( + Panel( + "[yellow]No HTTP health probe found on containers[/yellow]", + border_style="yellow", + ) + ) + + # Services + if report.services: + ts = Table(title="Services", box=box.SIMPLE) + ts.add_column("Service") + ts.add_column("Namespace") + ts.add_column("Port") + ts.add_column("TargetPort") + ts.add_column("Protocol") + for s in report.services: + ts.add_row( + s.service, + s.namespace, + str(s.port), + str(s.target_port or "-"), + s.protocol, + ) + console.print(ts) + else: + console.print( + Panel( + "[yellow]No Service selects this deployment[/yellow]", + border_style="yellow", + ) + ) + + # Ingresses + if report.ingresses: + ti = Table(title="Ingress Bindings", box=box.SIMPLE) + ti.add_column("Ingress") + ti.add_column("Host") + ti.add_column("Path") + ti.add_column("TLS") + ti.add_column("Service:Port") + for ib in report.ingresses: + ti.add_row( + ib.ingress, + ib.host or "-", + ib.path, + "yes" if ib.tls else "no", + f"{ib.service}:{ib.service_port}", + ) + console.print(ti) + else: + console.print( + Panel( + "[yellow]No Ingress rules reference Services for this deployment[/yellow]", + border_style="yellow", + ) + ) + + # Reachability + tr = Table(title="Reachability", box=box.SIMPLE) + tr.add_column("Via") + tr.add_column("Target") + tr.add_column("URL") + tr.add_column("OK") + tr.add_column("Status") + tr.add_column("Latency (ms)") + tr.add_column("Error") + for r in report.reachability: + tr.add_row( + r.via, + r.target, + r.url or "-", + "[green]yes[/green]" if r.ok else "[red]no[/red]", + str(r.status or "-"), + str(r.latency_ms or "-"), + r.error or "-", + ) + console.print(tr) + + +@app.command("logs") +def cli_logs( + deployment: str = typer.Argument(..., help="Deployment name"), + namespace: str = typer.Option(..., "--namespace", "-n", help="Namespace"), + container: Optional[str] = typer.Option( + None, "--container", "-c", help="Specific container name" + ), + tail: Optional[int] = typer.Option(None, "--tail", help="Tail N lines"), + since: Optional[int] = typer.Option( + None, "--since", help="Only return logs newer than N seconds" + ), + follow: bool = typer.Option(False, "--follow", "-f", help="Stream logs"), +): + """ + Print logs from pods belonging to a Deployment. + """ + try: + print_pod_logs( + deployment, + namespace, + container=container, + tail=tail, + since_seconds=since, + follow=follow, + ) + except Exception as e: + console.print(f"[red]Error fetching logs:[/red] {e}") + raise typer.Exit(1) + + +@app.callback(invoke_without_command=True) +def _root( + ctx: typer.Context, +): + """ + Kubernetes Ingress Debugger CLI. + """ + if ctx.invoked_subcommand is None: + typer.echo(ctx.get_help()) + + +if __name__ == "__main__": + app()