From 76d55bdf01be9794075748744b57599e56ea2b63 Mon Sep 17 00:00:00 2001 From: "Waylon S. Walker" Date: Mon, 24 Mar 2025 14:03:58 -0500 Subject: [PATCH] pick from any namespace --- pvc_inspector.py | 397 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 316 insertions(+), 81 deletions(-) diff --git a/pvc_inspector.py b/pvc_inspector.py index 799717f..dc1bd6e 100755 --- a/pvc_inspector.py +++ b/pvc_inspector.py @@ -4,15 +4,19 @@ # dependencies = [ # "typer", # "kubernetes", +# "iterfzf" # ] # /// +from iterfzf import iterfzf from kubernetes import client, config -import subprocess -import typer -import yaml +import logging import time -from typing import Any +import typer +from typing import Any, Optional +import yaml + +logging.basicConfig(level=logging.WARNING) app = typer.Typer() @@ -33,95 +37,231 @@ def format_volume_mount(vm: client.V1VolumeMount) -> dict[str, Any]: # Skip Kubernetes service account mounts if vm.mount_path.startswith("/var/run/secrets/kubernetes.io/"): return None - - return clean_dict({ - "name": vm.name, - "mountPath": vm.mount_path, - "readOnly": vm.read_only if vm.read_only else None, - }) + + return clean_dict( + { + "name": vm.name, + "mountPath": vm.mount_path, + "readOnly": vm.read_only if vm.read_only else None, + } + ) def format_volume(v: client.V1Volume) -> dict[str, Any]: - """Format volume with only relevant fields.""" + """Format volume into a dictionary, return None if it should be skipped""" # Skip Kubernetes service account volumes if v.name.startswith("kube-api-access-"): return None - + volume_source = None if v.persistent_volume_claim: volume_source = { "persistentVolumeClaim": { - "claimName": v.persistent_volume_claim.claim_name, - "readOnly": v.persistent_volume_claim.read_only, + "claimName": v.persistent_volume_claim.claim_name } } elif v.config_map: volume_source = {"configMap": {"name": v.config_map.name}} elif v.secret: volume_source = {"secret": {"secretName": v.secret.secret_name}} - + elif v.host_path: # Add support for hostPath volumes (used for device mounts) + volume_source = { + "hostPath": { + "path": v.host_path.path, + "type": v.host_path.type if v.host_path.type else None + } + } + elif v.empty_dir: # Add support for emptyDir volumes (used for /dev/shm) + volume_source = { + "emptyDir": { + "medium": v.empty_dir.medium if v.empty_dir.medium else None, + "sizeLimit": v.empty_dir.size_limit if v.empty_dir.size_limit else None + } + } + if not volume_source: return None - + return clean_dict({"name": v.name, **volume_source}) -def fuzzy_select(options: list[str]) -> str: - fzf = subprocess.Popen( - ["fzf"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True - ) - input_str = "\n".join(options) - selection, _ = fzf.communicate(input=input_str) - return selection.strip() +def fuzzy_select(items): + """Use fzf to select from a list of (name, namespace) tuples""" + if not items: + return None, None + + # Format items as "namespace/name" for display + formatted_items = [f"{ns}/{name}" for name, ns in items] + logging.debug(f"Found {len(formatted_items)} pods") + + try: + # Use iterfzf for selection + selected = iterfzf(formatted_items) + + if selected: + namespace, name = selected.split("/") + logging.debug(f"Selected pod {name} in namespace {namespace}") + return name, namespace + else: + logging.debug("No selection made") + return None, None + + except Exception as e: + logging.error(f"Error during selection: {e}", exc_info=True) + typer.echo(f"Error during selection: {e}", err=True) + raise typer.Exit(1) -def get_pods(namespace: str) -> list[str]: +def get_pods(namespace=None): + """Get list of pods in the specified namespace or all namespaces""" + config.load_kube_config() v1 = client.CoreV1Api() - return [pod.metadata.name for pod in v1.list_namespaced_pod(namespace).items] + + try: + if namespace: + logging.debug(f"Listing pods in namespace {namespace}") + pod_list = v1.list_namespaced_pod(namespace=namespace) + else: + logging.debug("Listing pods in all namespaces") + pod_list = v1.list_pod_for_all_namespaces() + + pods = [(pod.metadata.name, pod.metadata.namespace) for pod in pod_list.items] + logging.debug(f"Found {len(pods)} pods") + return pods + except client.exceptions.ApiException as e: + logging.error(f"Error listing pods: {e}") + typer.echo(f"Error listing pods: {e}", err=True) + raise typer.Exit(1) -def get_pod_spec(pod_name: str, namespace: str): +def get_pod_spec(pod_name, namespace): + config.load_kube_config() v1 = client.CoreV1Api() return v1.read_namespaced_pod(pod_name, namespace) -@app.command() -def create_inspector( - namespace: str = typer.Option("default", help="Kubernetes namespace"), -): - config.load_kube_config() - - pods = get_pods(namespace) - if not pods: - typer.echo("No pods found in the namespace.") - raise typer.Exit(1) - - selected_pod = fuzzy_select(pods) - pod_spec = get_pod_spec(selected_pod, namespace) - +def get_pod_volumes_and_mounts(pod_spec): + """Extract all volumes and mounts from a pod spec""" volume_mounts = [] - volumes = [] - for container in pod_spec.spec.containers: - volume_mounts.extend(container.volume_mounts) + if container.volume_mounts: + volume_mounts.extend(container.volume_mounts) - # Filter out None values from volume mounts and volumes + # Filter out None values from volume mounts volume_mounts = [vm for vm in volume_mounts if format_volume_mount(vm)] - volumes = [v for v in pod_spec.spec.volumes if format_volume(v)] - # Create a unique name using timestamp + # Get all volumes, including device mounts + volumes = [] + if pod_spec.spec.volumes: + for v in pod_spec.spec.volumes: + # Handle device mounts + if v.name in ["cache-volume"]: + volumes.append(client.V1Volume( + name=v.name, + empty_dir=client.V1EmptyDirVolumeSource( + medium="Memory" + ) + )) + elif v.name in ["coral-device"]: + volumes.append(client.V1Volume( + name=v.name, + host_path=client.V1HostPathVolumeSource( + path="/dev/apex_0", + type="CharDevice" + ) + )) + elif v.name in ["qsv-device"]: + volumes.append(client.V1Volume( + name=v.name, + host_path=client.V1HostPathVolumeSource( + path="/dev/dri", + type="Directory" + ) + )) + else: + volumes.append(v) + + # Filter out None values from volumes + volumes = [v for v in volumes if format_volume(v)] + + return volume_mounts, volumes + + +def get_pod_env_and_secrets(api, namespace, pod_name): + pod = api.read_namespaced_pod(pod_name, namespace) + + # Get environment variables from the pod + env_vars = [] + for container in pod.spec.containers: + if container.env: + for env in container.env: + env_dict = {"name": env.name} + if env.value: + env_dict["value"] = env.value + elif env.value_from: + if env.value_from.config_map_key_ref: + env_dict["valueFrom"] = { + "configMapKeyRef": { + "name": env.value_from.config_map_key_ref.name, + "key": env.value_from.config_map_key_ref.key, + } + } + elif env.value_from.secret_key_ref: + env_dict["valueFrom"] = { + "secretKeyRef": { + "name": env.value_from.secret_key_ref.name, + "key": env.value_from.secret_key_ref.key, + } + } + elif env.value_from.field_ref: + env_dict["valueFrom"] = { + "fieldRef": { + "fieldPath": env.value_from.field_ref.field_path + } + } + env_vars.append(env_dict) + + # Get all volume mounts that are secrets + secret_volumes = [] + if pod.spec.volumes: + secret_volumes = [v for v in pod.spec.volumes if v.secret] + + return env_vars, secret_volumes + + +def create_inspector_job(api, namespace, pod_name, volume_mounts, volumes): timestamp = int(time.time()) - job_name = f"{selected_pod}-inspector-{timestamp}" + job_name = f"{pod_name}-inspector-{timestamp}" - # Format mount and PVC information for environment variables + # Get environment variables and secrets from the target pod + env_vars, secret_volumes = get_pod_env_and_secrets(api, namespace, pod_name) + + # Add secret volumes to our volumes list + volumes.extend(secret_volumes) + + # Create corresponding volume mounts for secrets + secret_mounts = [] + for vol in secret_volumes: + secret_mounts.append( + { + "name": vol.name, + "mountPath": f"/mnt/secrets/{vol.secret.secret_name}", + "readOnly": True, + } + ) + + # Convert volume mounts to dictionaries + formatted_mounts = [format_volume_mount(vm) for vm in volume_mounts] + formatted_mounts.extend(secret_mounts) + + # Format mount and PVC info for MOTD mount_info = [] + for vm in formatted_mounts: + mount_info.append(f"{vm['name']}:{vm['mountPath']}") + pvc_info = [] - - for vm in volume_mounts: - mount_info.append(f"{vm.name}:{vm.mount_path}") - for v in volumes: - if v.persistent_volume_claim: + if hasattr(v, "persistent_volume_claim") and v.persistent_volume_claim: pvc_info.append(f"{v.name}:{v.persistent_volume_claim.claim_name}") inspector_job = { @@ -130,23 +270,20 @@ def create_inspector( "metadata": { "name": job_name, "namespace": namespace, + "labels": {"app": "pvc-inspector"}, }, "spec": { "ttlSecondsAfterFinished": 0, # Delete immediately after completion "template": { - "metadata": { - "labels": { - "app": "pvc-inspector" - } - }, + "metadata": {"labels": {"app": "pvc-inspector"}}, "spec": { "containers": [ { "name": "inspector", "image": "alpine:latest", # Use Alpine as base for package management "command": [ - "sh", - "-c", + "sh", + "-c", """ # Install basic tools first apk update @@ -159,7 +296,8 @@ curl -L https://github.com/gokcehan/lf/releases/download/r31/lf-linux-amd64.tar. apk add ripgrep exa ncdu dust \ file hexyl jq yq bat fd fzf \ htop bottom difftastic \ - mtr bind-tools + mtr bind-tools \ + aws-cli sqlite sqlite-dev sqlite-libs # Function to update MOTD update_motd() { @@ -173,6 +311,12 @@ $(echo "$MOUNTS" | tr ',' '\\n' | sed 's/^/- /') Persistent Volume Claims: $(echo "$PVCS" | tr ',' '\\n' | sed 's/^/- /') +Mounted Secrets: +$(for d in /mnt/secrets/*; do if [ -d "$d" ]; then echo "- $(basename $d)"; fi; done) + +Environment Variables: +$(env | sort | sed 's/^/- /') + Available Tools: File Navigation: - lf: Terminal file manager (run 'lf') @@ -205,6 +349,10 @@ Network Tools: - dig: DNS lookup - mtr: Network diagnostics +Cloud & Database: +- aws: AWS CLI +- sqlite3: SQLite database tool + Type 'tools-help' for detailed usage information ==================================== EOF @@ -261,6 +409,13 @@ tools-help() { echo "Network Tools:" echo " dig domain : DNS lookup" echo " mtr host : Network diagnostics" + echo + echo "Cloud & Database:" + echo " aws : AWS CLI tool" + echo " sqlite3 : SQLite database tool" + echo + echo "Secrets:" + echo " ls /mnt/secrets : List mounted secrets" } # Set some helpful environment variables @@ -283,35 +438,115 @@ ln -sf /root/.ashrc /etc/shinit update_motd sleep 3600 - """ + """, ], - "env": [ - { - "name": "MOUNTS", - "value": ",".join(mount_info) - }, - { - "name": "PVCS", - "value": ",".join(pvc_info) - }, - { - "name": "ENV", - "value": "/root/.ashrc" - } + "env": env_vars + + [ + {"name": "MOUNTS", "value": ",".join(mount_info)}, + {"name": "PVCS", "value": ",".join(pvc_info)}, + {"name": "ENV", "value": "/root/.ashrc"}, ], - "volumeMounts": [format_volume_mount(vm) for vm in volume_mounts], + "volumeMounts": formatted_mounts, } ], "volumes": [format_volume(v) for v in volumes if format_volume(v)], - "restartPolicy": "Never" - } - } - } + "restartPolicy": "Never", + }, + }, + }, } + return inspector_job - # Apply the job spec + +@app.command() +def create_inspector( + namespace: Optional[str] = typer.Option( + None, + help="Kubernetes namespace. If not specified, will search for pods across all namespaces.", + ), +): + """ + Create a PVC inspector job. If namespace is not specified, will search for pods across all namespaces. + The inspector job will be created in the same namespace as the selected pod. + """ + pods = get_pods(namespace) + if not pods: + typer.echo("No pods found.") + raise typer.Exit(1) + + selected_pod, selected_namespace = fuzzy_select(pods) + if not selected_pod: + typer.echo("No pod selected.") + raise typer.Exit(1) + + pod_spec = get_pod_spec(selected_pod, selected_namespace) + volume_mounts, volumes = get_pod_volumes_and_mounts(pod_spec) + + inspector_job = create_inspector_job( + client.CoreV1Api(), selected_namespace, selected_pod, volume_mounts, volumes + ) + + # Output the job manifest typer.echo(yaml.dump(clean_dict(inspector_job), sort_keys=False)) +@app.command() +def cleanup_inspectors( + namespace: Optional[str] = typer.Option( + None, + help="Kubernetes namespace. If not specified, will cleanup in all namespaces.", + ), +): + """ + Delete all PVC inspector jobs in the specified namespace or all namespaces + """ + config.load_kube_config() + batch_api = client.BatchV1Api() + + try: + if namespace: + logging.debug(f"Listing jobs in namespace {namespace}") + jobs = batch_api.list_namespaced_job( + namespace=namespace, label_selector="app=pvc-inspector" + ) + else: + logging.debug("Listing jobs in all namespaces") + jobs = batch_api.list_job_for_all_namespaces( + label_selector="app=pvc-inspector" + ) + + if not jobs.items: + typer.echo("No PVC inspector jobs found.") + return + + # Delete each job + for job in jobs.items: + try: + logging.debug( + f"Deleting job {job.metadata.namespace}/{job.metadata.name}" + ) + batch_api.delete_namespaced_job( + name=job.metadata.name, + namespace=job.metadata.namespace, + body=client.V1DeleteOptions(propagation_policy="Background"), + ) + typer.echo(f"Deleted job: {job.metadata.namespace}/{job.metadata.name}") + except client.exceptions.ApiException as e: + logging.error( + f"Failed to delete job {job.metadata.namespace}/{job.metadata.name}: {e}" + ) + typer.echo( + f"Failed to delete job {job.metadata.namespace}/{job.metadata.name}: {e}", + err=True, + ) + + except client.exceptions.ApiException as e: + logging.error(f"Failed to list jobs: {e}") + typer.echo(f"Failed to list jobs: {e}", err=True) + raise typer.Exit(1) + + typer.echo("Cleanup complete.") + + if __name__ == "__main__": app()