#!/usr/bin/env -S uv run --quiet --script # /// script # requires-python = ">=3.12" # dependencies = [ # "typer", # "rich", # "httpx", # "kubernetes", # "pydantic", # ] # /// """ k8s-ingress-debugger Given a Deployment name, this tool inspects the Kubernetes objects around it and runs a set of connectivity checks: • Does an Ingress point to it? • What are the Ingress hosts? • What's the healthcheck route (from readiness/liveness HTTP probes)? • Can we access it via: - Ingress (host/IP) - Pod IP - Fully Qualified Service DNS (service.ns.svc.cluster.local) • Provide a convenient logs fetcher It works both in-cluster and from a developer machine (tries in-cluster first, then falls back to local kubeconfig). All checker functions are importable and usable outside of Typer. Examples -------- Inspect with rich table output: ./k8s_ingress_debug.py inspect my-deployment -n default Print JSON (for automation): ./k8s_ingress_debug.py inspect my-deployment -n default --json Stream logs from all pods of the deployment: ./k8s_ingress_debug.py logs my-deployment -n default -f --tail 200 """ from __future__ import annotations import json import socket import time from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Tuple import httpx import typer from pydantic import BaseModel, Field from rich import box from rich.console import Console from rich.panel import Panel from rich.table import Table # Kubernetes imports from kubernetes import client, config from kubernetes.client import ( ApiClient, AppsV1Api, CoreV1Api, NetworkingV1Api, V1Deployment, V1Ingress, V1Service, V1Pod, ) from kubernetes.stream import stream app = typer.Typer(add_completion=False, help="Kubernetes Ingress Debugger") console = Console() # ========================= # Kube config + helpers # ========================= @dataclass class KubeCtx: api_client: ApiClient core: CoreV1Api apps: AppsV1Api net: NetworkingV1Api in_cluster: bool def load_kube_ctx() -> KubeCtx: """ Load Kubernetes configuration, preferring in-cluster. Falls back to local kubeconfig. """ in_cluster = False try: config.load_incluster_config() in_cluster = True except Exception: # Not in cluster, try local kubeconfig config.load_kube_config() api_client = client.ApiClient() return KubeCtx( api_client=api_client, core=CoreV1Api(api_client), apps=AppsV1Api(api_client), net=NetworkingV1Api(api_client), in_cluster=in_cluster, ) # ========================= # Discovery models # ========================= class ProbeInfo(BaseModel): kind: str # "readiness" | "liveness" | "startup" path: Optional[str] = None port: Optional[str | int] = None scheme: str = "http" class ServiceBinding(BaseModel): service: str namespace: str port: int target_port: str | int | None = None protocol: str = "TCP" class IngressBinding(BaseModel): ingress: str namespace: str host: str path: str tls: bool service: str service_port: int class Reachability(BaseModel): via: str # "ingress", "pod-ip", "svc-fqdn" target: str url: Optional[str] = None ok: bool = False status: Optional[int] = None error: Optional[str] = None latency_ms: Optional[int] = None class InspectionReport(BaseModel): deployment: str namespace: str in_cluster: bool pods: List[str] pod_ips: Dict[str, str] container_ports: Dict[str, List[int]] health_probe: Optional[ProbeInfo] = None services: List[ServiceBinding] = Field(default_factory=list) ingresses: List[IngressBinding] = Field(default_factory=list) reachability: List[Reachability] = Field(default_factory=list) # ========================= # Core discovery functions # ========================= def find_deployment( ctx: KubeCtx, name: str, namespace: Optional[str] ) -> Tuple[V1Deployment, str]: """ Return (deployment, namespace). If namespace not provided, try to find a unique deployment across all namespaces. """ if namespace: dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace) return dep, namespace # Search all namespaces for uniqueness deps = ctx.apps.list_deployment_for_all_namespaces( field_selector=f"metadata.name={name}" ).items if not deps: raise RuntimeError(f"Deployment '{name}' not found in any namespace.") if len(deps) > 1: ns_list = ", ".join(sorted({d.metadata.namespace for d in deps})) raise RuntimeError( f"Deployment '{name}' found in multiple namespaces: {ns_list}. Please specify --namespace." ) d = deps[0] return d, d.metadata.namespace def pods_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Pod]: selector = dep.spec.selector.match_labels or {} if not selector: return [] label_selector = ",".join(f"{k}={v}" for k, v in selector.items()) pods = ctx.core.list_namespaced_pod( namespace=dep.metadata.namespace, label_selector=label_selector ).items return [p for p in pods if p.metadata.deletion_timestamp is None] def services_for_deployment(ctx: KubeCtx, dep: V1Deployment) -> List[V1Service]: """ Services whose selector is a subset of deployment's selector labels """ ns = dep.metadata.namespace dep_sel = dep.spec.selector.match_labels or {} svcs = ctx.core.list_namespaced_service(namespace=ns).items matched = [] for s in svcs: sel = s.spec.selector or {} if sel and all(dep_sel.get(k) == v for k, v in sel.items()): matched.append(s) return matched def ingresses_for_services( ctx: KubeCtx, namespace: str, services: Iterable[V1Service] ) -> List[V1Ingress]: svc_names = {s.metadata.name for s in services} ings = ctx.net.list_namespaced_ingress(namespace=namespace).items out = [] for ing in ings: if not ing.spec or not ing.spec.rules: continue for rule in ing.spec.rules: if not rule.http or not rule.http.paths: continue for p in rule.http.paths: backend = p.backend if backend and backend.service and backend.service.name in svc_names: out.append(ing) break # de-dup seen = set() uniq = [] for ing in out: key = (ing.metadata.namespace, ing.metadata.name) if key not in seen: seen.add(key) uniq.append(ing) return uniq def extract_probe(dep: V1Deployment) -> Optional[ProbeInfo]: """ Prefer readiness > liveness > startup HTTP probes. """ tmpl = dep.spec.template if not tmpl or not tmpl.spec or not tmpl.spec.containers: return None def http_probe(container, probe_field: str) -> Optional[ProbeInfo]: pr = getattr(container, probe_field, None) if pr and pr.http_get: path = pr.http_get.path or "/" port = pr.http_get.port scheme = (pr.http_get.scheme or "HTTP").lower() return ProbeInfo( kind=probe_field.replace("_probe", ""), path=path, port=port, scheme="https" if scheme == "https" else "http", ) return None # Check each container, stop at first we find for c in tmpl.spec.containers: for field in ("readiness_probe", "liveness_probe", "startup_probe"): pi = http_probe(c, field) if pi: return pi return None def resolve_service_bindings( dep: V1Deployment, services: List[V1Service], preferred_port: Optional[str | int] ) -> List[ServiceBinding]: """ Build bindings using Service ports; try to align with probe/targetPort when given. """ ns = dep.metadata.namespace bindings: List[ServiceBinding] = [] for s in services: for sp in s.spec.ports or []: port_num = int(sp.port) target = sp.target_port if isinstance(sp.target_port, (str, int)) else None # Prefer the service port that matches preferred_port (by name or number) if preferred_port is not None: if isinstance(preferred_port, int) and ( target == preferred_port or port_num == preferred_port ): bindings.append( ServiceBinding( service=s.metadata.name, namespace=ns, port=port_num, target_port=target, protocol=(sp.protocol or "TCP"), ) ) continue if isinstance(preferred_port, str) and ( sp.name == preferred_port or target == preferred_port ): bindings.append( ServiceBinding( service=s.metadata.name, namespace=ns, port=port_num, target_port=target, protocol=(sp.protocol or "TCP"), ) ) continue # Otherwise include everything; we'll de-dup later bindings.append( ServiceBinding( service=s.metadata.name, namespace=ns, port=port_num, target_port=target, protocol=(sp.protocol or "TCP"), ) ) # de-dup by (svc,port) seen = set() uniq: List[ServiceBinding] = [] for b in bindings: key = (b.service, b.port) if key not in seen: seen.add(key) uniq.append(b) return uniq def extract_ingress_bindings( ingresses: List[V1Ingress], services: List[V1Service] ) -> List[IngressBinding]: svc_names = {s.metadata.name for s in services} bindings: List[IngressBinding] = [] for ing in ingresses: tls_hosts = set() if ing.spec and ing.spec.tls: for t in ing.spec.tls: for h in t.hosts or []: tls_hosts.add(h) if not ing.spec or not ing.spec.rules: continue for rule in ing.spec.rules: host = rule.host or "" if not rule.http or not rule.http.paths: continue for p in rule.http.paths: backend = p.backend if backend and backend.service and backend.service.name in svc_names: svc_port = ( int(backend.service.port.number) if backend.service.port and backend.service.port.number else 80 ) bindings.append( IngressBinding( ingress=ing.metadata.name, namespace=ing.metadata.namespace, host=host, path=p.path or "/", tls=(host in tls_hosts), service=backend.service.name, service_port=svc_port, ) ) return bindings # ========================= # Networking helpers # ========================= def dns_resolves(host: str) -> bool: try: socket.gethostbyname(host) return True except Exception: return False def http_check( url: str, host_header: Optional[str] = None, timeout: float = 5.0, verify_tls: bool = True, ) -> Reachability: start = time.perf_counter() headers = {} if host_header: headers["Host"] = host_header try: with httpx.Client( follow_redirects=True, verify=verify_tls, headers=headers, timeout=timeout ) as s: r = s.get(url) latency_ms = int((time.perf_counter() - start) * 1000) return Reachability( via="ingress" if host_header or url.startswith("http") else "unknown", target=host_header or url, url=url, ok=r.status_code < 500, status=r.status_code, error=None, latency_ms=latency_ms, ) except Exception as e: latency_ms = int((time.perf_counter() - start) * 1000) return Reachability( via="ingress", target=host_header or url, url=url, ok=False, status=None, error=str(e), latency_ms=latency_ms, ) def tcp_check( host: str, port: int, timeout: float = 3.0 ) -> Tuple[bool, Optional[str], Optional[int]]: start = time.perf_counter() try: with socket.create_connection((host, port), timeout=timeout): return True, None, int((time.perf_counter() - start) * 1000) except Exception as e: return False, str(e), int((time.perf_counter() - start) * 1000) def try_exec_http_from_pod( ctx: KubeCtx, namespace: str, pod: str, url: str, timeout: int = 8 ) -> Reachability: """ Execute a lightweight HTTP check from within the given pod (best-effort). Tries curl, then wget. Returns Reachability record with ok status. """ cmd = [ "sh", "-lc", # Prefer curl (status + timing), fallback to wget; if both missing, try /dev/tcp. ( f'(command -v curl >/dev/null && curl -sk -o /dev/null -w "%{{http_code}}" "{url}") || ' f'(command -v wget >/dev/null && wget -qO- "{url}" >/dev/null && printf 200) || ' f"(echo 000)" ), ] try: out = stream( ctx.core.connect_get_namespaced_pod_exec, name=pod, namespace=namespace, command=cmd, stderr=True, stdin=False, stdout=True, tty=False, _request_timeout=timeout, ) code = 0 try: code = int(str(out).strip()[:3]) except Exception: code = 0 return Reachability( via="svc-fqdn", target=url, url=url, ok=200 <= code < 500, status=code, error=None if 200 <= code < 500 else f"code={code}", ) except Exception as e: return Reachability( via="svc-fqdn", target=url, url=url, ok=False, status=None, error=str(e) ) # ========================= # High-level inspection # ========================= def inspect_deployment( name: str, namespace: Optional[str] = None, timeout: float = 5.0, verify_tls: bool = True, ) -> InspectionReport: """ Full inspection routine. Returns a structured report usable by other tools. """ ctx = load_kube_ctx() dep, ns = find_deployment(ctx, name, namespace) pods = pods_for_deployment(ctx, dep) pod_names = [p.metadata.name for p in pods] pod_ips = {p.metadata.name: (p.status.pod_ip or "") for p in pods} # Collect declared containerPorts cports: Dict[str, List[int]] = {} for p in pods: plist = [] for c in p.spec.containers or []: for cp in c.ports or []: if cp.container_port: plist.append(int(cp.container_port)) cports[p.metadata.name] = sorted({*plist}) probe = extract_probe(dep) preferred_port: Optional[int | str] = ( probe.port if probe and probe.port is not None else None ) services = services_for_deployment(ctx, dep) svc_bindings = resolve_service_bindings(dep, services, preferred_port) ingresses = ingresses_for_services(ctx, ns, services) ing_bindings = extract_ingress_bindings(ingresses, services) # Reachability checks reach: List[Reachability] = [] # 1) Through Ingress (best effort) for ib in ing_bindings: scheme = "https" if ib.tls else "http" base = f"{scheme}://{ib.host}" if ib.host else "" path = ib.path or "/" health_path = probe.path if probe and probe.path else "/" url = f"{base}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}" # If host resolves, try directly if ib.host and dns_resolves(ib.host): r = http_check(url, timeout=timeout, verify_tls=verify_tls) r.via = "ingress" r.target = ib.host reach.append(r) else: # Try using load balancer IP/hostname from status with Host header target_ips: List[str] = [] for ing in ingresses: if ing.metadata.name == ib.ingress: if ( ing.status and ing.status.load_balancer and ing.status.load_balancer.ingress ): for ent in ing.status.load_balancer.ingress: if ent.ip: target_ips.append(ent.ip) if ent.hostname: target_ips.append(ent.hostname) if target_ips: t = target_ips[0] alt_url = f"{scheme}://{t}{path.rstrip('/')}{health_path if health_path.startswith('/') else '/' + health_path}" reach.append( http_check( alt_url, host_header=ib.host or None, timeout=timeout, verify_tls=verify_tls, ) ) else: reach.append( Reachability( via="ingress", target=ib.host or "(no-host)", url=url or None, ok=False, error="No DNS for host and no load balancer address found on Ingress.", ) ) # 2) Through Pod IP (TCP + optional HTTP on health path) test_port_candidates: List[int] = [] if preferred_port is not None and isinstance(preferred_port, int): test_port_candidates.append(preferred_port) # Add declared service ports for b in svc_bindings: if b.port not in test_port_candidates: test_port_candidates.append(b.port) # Add container ports if nothing else if not test_port_candidates: for plist in cports.values(): for pnum in plist: if pnum not in test_port_candidates: test_port_candidates.append(pnum) for pod in pods: ip = pod.status.pod_ip if not ip: reach.append( Reachability( via="pod-ip", target=pod.metadata.name, ok=False, error="No Pod IP assigned", ) ) continue # Try TCP on first viable port if test_port_candidates: port = test_port_candidates[0] ok, err, _lat = tcp_check(ip, port, timeout=timeout) if not ok: reach.append( Reachability( via="pod-ip", target=f"{ip}:{port}", ok=False, error=err ) ) else: # Try HTTP GET if we have a health path health_path = probe.path if probe and probe.path else "/" url = f"http://{ip}:{port}{health_path if health_path.startswith('/') else '/' + health_path}" r = http_check(url, timeout=timeout, verify_tls=False) r.via = "pod-ip" r.target = f"{ip}:{port}" reach.append(r) else: reach.append( Reachability( via="pod-ip", target=ip, ok=False, error="No candidate port found to test", ) ) # 3) Through fully qualified Service DNS (from inside cluster if needed) # Choose first service binding if available if svc_bindings: sb = svc_bindings[0] fqdn = f"{sb.service}.{sb.namespace}.svc.cluster.local" health_path = probe.path if probe and probe.path else "/" url = f"http://{fqdn}:{sb.port}{health_path if health_path.startswith('/') else '/' + health_path}" ctx = load_kube_ctx() if ctx.in_cluster and dns_resolves(fqdn): r = http_check(url, timeout=timeout, verify_tls=False) r.via = "svc-fqdn" r.target = fqdn reach.append(r) else: if pods: reach.append( try_exec_http_from_pod( ctx, sb.namespace, pods[0].metadata.name, url ) ) else: reach.append( Reachability( via="svc-fqdn", target=url, url=url, ok=False, error="No pods available to test inside cluster", ) ) else: reach.append( Reachability( via="svc-fqdn", target="(no service)", ok=False, error="No Service bound to the deployment", ) ) return InspectionReport( deployment=dep.metadata.name, namespace=ns, in_cluster=ctx.in_cluster, pods=pod_names, pod_ips=pod_ips, container_ports=cports, health_probe=probe, services=svc_bindings, ingresses=ing_bindings, reachability=reach, ) # ========================= # Logs helpers # ========================= def deployment_pods(ctx: KubeCtx, name: str, namespace: str) -> List[V1Pod]: dep = ctx.apps.read_namespaced_deployment(name=name, namespace=namespace) return pods_for_deployment(ctx, dep) def print_pod_logs( name: str, namespace: str, container: Optional[str] = None, tail: Optional[int] = None, since_seconds: Optional[int] = None, follow: bool = False, ) -> None: ctx = load_kube_ctx() pods = deployment_pods(ctx, name, namespace) if not pods: console.print(f"[red]No pods found for deployment {name} in {namespace}[/red]") raise typer.Exit(1) # If follow, stream each pod in sequence (simple approach) for p in pods: console.rule(f"[bold]Logs: {p.metadata.name}[/bold]") if follow: # naive follow using repeated calls try: for line in ctx.core.read_namespaced_pod_log( name=p.metadata.name, namespace=namespace, container=container, tail_lines=tail, since_seconds=since_seconds, follow=True, _preload_content=False, ).stream(decode_content=True): try: console.print(line.decode("utf-8").rstrip()) except Exception: console.print(line) except KeyboardInterrupt: break else: out = ctx.core.read_namespaced_pod_log( name=p.metadata.name, namespace=namespace, container=container, tail_lines=tail, since_seconds=since_seconds, ) console.print(out) # ========================= # CLI commands # ========================= @app.command("inspect") def cli_inspect( deployment: str = typer.Argument(..., help="Deployment name"), namespace: Optional[str] = typer.Option( None, "--namespace", "-n", help="Namespace (if omitted, will try to auto-detect)", ), timeout: float = typer.Option(5.0, help="HTTP/TCP timeout (seconds)"), insecure: bool = typer.Option(False, help="Skip TLS verification for HTTPS checks"), output_json: bool = typer.Option( False, "--json", help="Print JSON report instead of a table" ), ): """ Inspect a Deployment's Services & Ingresses and run connectivity checks. """ try: report = inspect_deployment( deployment, namespace=namespace, timeout=timeout, verify_tls=(not insecure) ) except Exception as e: console.print(f"[red]Error:[/red] {e}") raise typer.Exit(1) if output_json: console.print_json(json.dumps(report.model_dump(), indent=2)) return hdr = ( f"[bold white]Deployment:[/bold white] {report.deployment} " f"[bold white]Namespace:[/bold white] {report.namespace} " f"[bold white]Context:[/bold white] {'in-cluster' if report.in_cluster else 'local'}" ) console.print(Panel(hdr, border_style="cyan", title="Overview")) # Pods table t = Table(title="Pods", box=box.SIMPLE, show_lines=False) t.add_column("Pod") t.add_column("IP") t.add_column("Ports") for pod in report.pods: ports = ", ".join(str(p) for p in report.container_ports.get(pod, [])) or "-" t.add_row(pod, report.pod_ips.get(pod, "-"), ports) console.print(t) # Health probe if report.health_probe: hp = report.health_probe console.print( Panel( f"[bold]Health Probe[/bold]\nKind: {hp.kind}\nPath: {hp.path or '-'}\nPort: {hp.port or '-'}\nScheme: {hp.scheme}", border_style="green", ) ) else: console.print( Panel( "[yellow]No HTTP health probe found on containers[/yellow]", border_style="yellow", ) ) # Services if report.services: ts = Table(title="Services", box=box.SIMPLE) ts.add_column("Service") ts.add_column("Namespace") ts.add_column("Port") ts.add_column("TargetPort") ts.add_column("Protocol") for s in report.services: ts.add_row( s.service, s.namespace, str(s.port), str(s.target_port or "-"), s.protocol, ) console.print(ts) else: console.print( Panel( "[yellow]No Service selects this deployment[/yellow]", border_style="yellow", ) ) # Ingresses if report.ingresses: ti = Table(title="Ingress Bindings", box=box.SIMPLE) ti.add_column("Ingress") ti.add_column("Host") ti.add_column("Path") ti.add_column("TLS") ti.add_column("Service:Port") for ib in report.ingresses: ti.add_row( ib.ingress, ib.host or "-", ib.path, "yes" if ib.tls else "no", f"{ib.service}:{ib.service_port}", ) console.print(ti) else: console.print( Panel( "[yellow]No Ingress rules reference Services for this deployment[/yellow]", border_style="yellow", ) ) # Reachability tr = Table(title="Reachability", box=box.SIMPLE) tr.add_column("Via") tr.add_column("Target") tr.add_column("URL") tr.add_column("OK") tr.add_column("Status") tr.add_column("Latency (ms)") tr.add_column("Error") for r in report.reachability: tr.add_row( r.via, r.target, r.url or "-", "[green]yes[/green]" if r.ok else "[red]no[/red]", str(r.status or "-"), str(r.latency_ms or "-"), r.error or "-", ) console.print(tr) @app.command("logs") def cli_logs( deployment: str = typer.Argument(..., help="Deployment name"), namespace: str = typer.Option(..., "--namespace", "-n", help="Namespace"), container: Optional[str] = typer.Option( None, "--container", "-c", help="Specific container name" ), tail: Optional[int] = typer.Option(None, "--tail", help="Tail N lines"), since: Optional[int] = typer.Option( None, "--since", help="Only return logs newer than N seconds" ), follow: bool = typer.Option(False, "--follow", "-f", help="Stream logs"), ): """ Print logs from pods belonging to a Deployment. """ try: print_pod_logs( deployment, namespace, container=container, tail=tail, since_seconds=since, follow=follow, ) except Exception as e: console.print(f"[red]Error fetching logs:[/red] {e}") raise typer.Exit(1) @app.callback(invoke_without_command=True) def _root( ctx: typer.Context, ): """ Kubernetes Ingress Debugger CLI. """ if ctx.invoked_subcommand is None: typer.echo(ctx.get_help()) if __name__ == "__main__": app()