pick from any namespace

This commit is contained in:
Waylon S. Walker 2025-03-24 14:03:58 -05:00
parent 8335195707
commit 76d55bdf01

View file

@ -4,15 +4,19 @@
# dependencies = [ # dependencies = [
# "typer", # "typer",
# "kubernetes", # "kubernetes",
# "iterfzf"
# ] # ]
# /// # ///
from iterfzf import iterfzf
from kubernetes import client, config from kubernetes import client, config
import subprocess import logging
import typer
import yaml
import time import time
from typing import Any import typer
from typing import Any, Optional
import yaml
logging.basicConfig(level=logging.WARNING)
app = typer.Typer() app = typer.Typer()
@ -34,15 +38,17 @@ def format_volume_mount(vm: client.V1VolumeMount) -> dict[str, Any]:
if vm.mount_path.startswith("/var/run/secrets/kubernetes.io/"): if vm.mount_path.startswith("/var/run/secrets/kubernetes.io/"):
return None return None
return clean_dict({ return clean_dict(
"name": vm.name, {
"mountPath": vm.mount_path, "name": vm.name,
"readOnly": vm.read_only if vm.read_only else None, "mountPath": vm.mount_path,
}) "readOnly": vm.read_only if vm.read_only else None,
}
)
def format_volume(v: client.V1Volume) -> dict[str, Any]: def format_volume(v: client.V1Volume) -> dict[str, Any]:
"""Format volume with only relevant fields.""" """Format volume into a dictionary, return None if it should be skipped"""
# Skip Kubernetes service account volumes # Skip Kubernetes service account volumes
if v.name.startswith("kube-api-access-"): if v.name.startswith("kube-api-access-"):
return None return None
@ -51,14 +57,27 @@ def format_volume(v: client.V1Volume) -> dict[str, Any]:
if v.persistent_volume_claim: if v.persistent_volume_claim:
volume_source = { volume_source = {
"persistentVolumeClaim": { "persistentVolumeClaim": {
"claimName": v.persistent_volume_claim.claim_name, "claimName": v.persistent_volume_claim.claim_name
"readOnly": v.persistent_volume_claim.read_only,
} }
} }
elif v.config_map: elif v.config_map:
volume_source = {"configMap": {"name": v.config_map.name}} volume_source = {"configMap": {"name": v.config_map.name}}
elif v.secret: elif v.secret:
volume_source = {"secret": {"secretName": v.secret.secret_name}} volume_source = {"secret": {"secretName": v.secret.secret_name}}
elif v.host_path: # Add support for hostPath volumes (used for device mounts)
volume_source = {
"hostPath": {
"path": v.host_path.path,
"type": v.host_path.type if v.host_path.type else None
}
}
elif v.empty_dir: # Add support for emptyDir volumes (used for /dev/shm)
volume_source = {
"emptyDir": {
"medium": v.empty_dir.medium if v.empty_dir.medium else None,
"sizeLimit": v.empty_dir.size_limit if v.empty_dir.size_limit else None
}
}
if not volume_source: if not volume_source:
return None return None
@ -66,62 +85,183 @@ def format_volume(v: client.V1Volume) -> dict[str, Any]:
return clean_dict({"name": v.name, **volume_source}) return clean_dict({"name": v.name, **volume_source})
def fuzzy_select(options: list[str]) -> str: def fuzzy_select(items):
fzf = subprocess.Popen( """Use fzf to select from a list of (name, namespace) tuples"""
["fzf"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True if not items:
) return None, None
input_str = "\n".join(options)
selection, _ = fzf.communicate(input=input_str) # Format items as "namespace/name" for display
return selection.strip() formatted_items = [f"{ns}/{name}" for name, ns in items]
logging.debug(f"Found {len(formatted_items)} pods")
try:
# Use iterfzf for selection
selected = iterfzf(formatted_items)
if selected:
namespace, name = selected.split("/")
logging.debug(f"Selected pod {name} in namespace {namespace}")
return name, namespace
else:
logging.debug("No selection made")
return None, None
except Exception as e:
logging.error(f"Error during selection: {e}", exc_info=True)
typer.echo(f"Error during selection: {e}", err=True)
raise typer.Exit(1)
def get_pods(namespace: str) -> list[str]: def get_pods(namespace=None):
"""Get list of pods in the specified namespace or all namespaces"""
config.load_kube_config()
v1 = client.CoreV1Api() v1 = client.CoreV1Api()
return [pod.metadata.name for pod in v1.list_namespaced_pod(namespace).items]
try:
if namespace:
logging.debug(f"Listing pods in namespace {namespace}")
pod_list = v1.list_namespaced_pod(namespace=namespace)
else:
logging.debug("Listing pods in all namespaces")
pod_list = v1.list_pod_for_all_namespaces()
pods = [(pod.metadata.name, pod.metadata.namespace) for pod in pod_list.items]
logging.debug(f"Found {len(pods)} pods")
return pods
except client.exceptions.ApiException as e:
logging.error(f"Error listing pods: {e}")
typer.echo(f"Error listing pods: {e}", err=True)
raise typer.Exit(1)
def get_pod_spec(pod_name: str, namespace: str): def get_pod_spec(pod_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api() v1 = client.CoreV1Api()
return v1.read_namespaced_pod(pod_name, namespace) return v1.read_namespaced_pod(pod_name, namespace)
@app.command() def get_pod_volumes_and_mounts(pod_spec):
def create_inspector( """Extract all volumes and mounts from a pod spec"""
namespace: str = typer.Option("default", help="Kubernetes namespace"),
):
config.load_kube_config()
pods = get_pods(namespace)
if not pods:
typer.echo("No pods found in the namespace.")
raise typer.Exit(1)
selected_pod = fuzzy_select(pods)
pod_spec = get_pod_spec(selected_pod, namespace)
volume_mounts = [] volume_mounts = []
volumes = []
for container in pod_spec.spec.containers: for container in pod_spec.spec.containers:
volume_mounts.extend(container.volume_mounts) if container.volume_mounts:
volume_mounts.extend(container.volume_mounts)
# Filter out None values from volume mounts and volumes # Filter out None values from volume mounts
volume_mounts = [vm for vm in volume_mounts if format_volume_mount(vm)] volume_mounts = [vm for vm in volume_mounts if format_volume_mount(vm)]
volumes = [v for v in pod_spec.spec.volumes if format_volume(v)]
# Create a unique name using timestamp # Get all volumes, including device mounts
volumes = []
if pod_spec.spec.volumes:
for v in pod_spec.spec.volumes:
# Handle device mounts
if v.name in ["cache-volume"]:
volumes.append(client.V1Volume(
name=v.name,
empty_dir=client.V1EmptyDirVolumeSource(
medium="Memory"
)
))
elif v.name in ["coral-device"]:
volumes.append(client.V1Volume(
name=v.name,
host_path=client.V1HostPathVolumeSource(
path="/dev/apex_0",
type="CharDevice"
)
))
elif v.name in ["qsv-device"]:
volumes.append(client.V1Volume(
name=v.name,
host_path=client.V1HostPathVolumeSource(
path="/dev/dri",
type="Directory"
)
))
else:
volumes.append(v)
# Filter out None values from volumes
volumes = [v for v in volumes if format_volume(v)]
return volume_mounts, volumes
def get_pod_env_and_secrets(api, namespace, pod_name):
pod = api.read_namespaced_pod(pod_name, namespace)
# Get environment variables from the pod
env_vars = []
for container in pod.spec.containers:
if container.env:
for env in container.env:
env_dict = {"name": env.name}
if env.value:
env_dict["value"] = env.value
elif env.value_from:
if env.value_from.config_map_key_ref:
env_dict["valueFrom"] = {
"configMapKeyRef": {
"name": env.value_from.config_map_key_ref.name,
"key": env.value_from.config_map_key_ref.key,
}
}
elif env.value_from.secret_key_ref:
env_dict["valueFrom"] = {
"secretKeyRef": {
"name": env.value_from.secret_key_ref.name,
"key": env.value_from.secret_key_ref.key,
}
}
elif env.value_from.field_ref:
env_dict["valueFrom"] = {
"fieldRef": {
"fieldPath": env.value_from.field_ref.field_path
}
}
env_vars.append(env_dict)
# Get all volume mounts that are secrets
secret_volumes = []
if pod.spec.volumes:
secret_volumes = [v for v in pod.spec.volumes if v.secret]
return env_vars, secret_volumes
def create_inspector_job(api, namespace, pod_name, volume_mounts, volumes):
timestamp = int(time.time()) timestamp = int(time.time())
job_name = f"{selected_pod}-inspector-{timestamp}" job_name = f"{pod_name}-inspector-{timestamp}"
# Format mount and PVC information for environment variables # Get environment variables and secrets from the target pod
env_vars, secret_volumes = get_pod_env_and_secrets(api, namespace, pod_name)
# Add secret volumes to our volumes list
volumes.extend(secret_volumes)
# Create corresponding volume mounts for secrets
secret_mounts = []
for vol in secret_volumes:
secret_mounts.append(
{
"name": vol.name,
"mountPath": f"/mnt/secrets/{vol.secret.secret_name}",
"readOnly": True,
}
)
# Convert volume mounts to dictionaries
formatted_mounts = [format_volume_mount(vm) for vm in volume_mounts]
formatted_mounts.extend(secret_mounts)
# Format mount and PVC info for MOTD
mount_info = [] mount_info = []
for vm in formatted_mounts:
mount_info.append(f"{vm['name']}:{vm['mountPath']}")
pvc_info = [] pvc_info = []
for vm in volume_mounts:
mount_info.append(f"{vm.name}:{vm.mount_path}")
for v in volumes: for v in volumes:
if v.persistent_volume_claim: if hasattr(v, "persistent_volume_claim") and v.persistent_volume_claim:
pvc_info.append(f"{v.name}:{v.persistent_volume_claim.claim_name}") pvc_info.append(f"{v.name}:{v.persistent_volume_claim.claim_name}")
inspector_job = { inspector_job = {
@ -130,15 +270,12 @@ def create_inspector(
"metadata": { "metadata": {
"name": job_name, "name": job_name,
"namespace": namespace, "namespace": namespace,
"labels": {"app": "pvc-inspector"},
}, },
"spec": { "spec": {
"ttlSecondsAfterFinished": 0, # Delete immediately after completion "ttlSecondsAfterFinished": 0, # Delete immediately after completion
"template": { "template": {
"metadata": { "metadata": {"labels": {"app": "pvc-inspector"}},
"labels": {
"app": "pvc-inspector"
}
},
"spec": { "spec": {
"containers": [ "containers": [
{ {
@ -159,7 +296,8 @@ curl -L https://github.com/gokcehan/lf/releases/download/r31/lf-linux-amd64.tar.
apk add ripgrep exa ncdu dust \ apk add ripgrep exa ncdu dust \
file hexyl jq yq bat fd fzf \ file hexyl jq yq bat fd fzf \
htop bottom difftastic \ htop bottom difftastic \
mtr bind-tools mtr bind-tools \
aws-cli sqlite sqlite-dev sqlite-libs
# Function to update MOTD # Function to update MOTD
update_motd() { update_motd() {
@ -173,6 +311,12 @@ $(echo "$MOUNTS" | tr ',' '\\n' | sed 's/^/- /')
Persistent Volume Claims: Persistent Volume Claims:
$(echo "$PVCS" | tr ',' '\\n' | sed 's/^/- /') $(echo "$PVCS" | tr ',' '\\n' | sed 's/^/- /')
Mounted Secrets:
$(for d in /mnt/secrets/*; do if [ -d "$d" ]; then echo "- $(basename $d)"; fi; done)
Environment Variables:
$(env | sort | sed 's/^/- /')
Available Tools: Available Tools:
File Navigation: File Navigation:
- lf: Terminal file manager (run 'lf') - lf: Terminal file manager (run 'lf')
@ -205,6 +349,10 @@ Network Tools:
- dig: DNS lookup - dig: DNS lookup
- mtr: Network diagnostics - mtr: Network diagnostics
Cloud & Database:
- aws: AWS CLI
- sqlite3: SQLite database tool
Type 'tools-help' for detailed usage information Type 'tools-help' for detailed usage information
==================================== ====================================
EOF EOF
@ -261,6 +409,13 @@ tools-help() {
echo "Network Tools:" echo "Network Tools:"
echo " dig domain : DNS lookup" echo " dig domain : DNS lookup"
echo " mtr host : Network diagnostics" echo " mtr host : Network diagnostics"
echo
echo "Cloud & Database:"
echo " aws : AWS CLI tool"
echo " sqlite3 : SQLite database tool"
echo
echo "Secrets:"
echo " ls /mnt/secrets : List mounted secrets"
} }
# Set some helpful environment variables # Set some helpful environment variables
@ -283,35 +438,115 @@ ln -sf /root/.ashrc /etc/shinit
update_motd update_motd
sleep 3600 sleep 3600
""" """,
], ],
"env": [ "env": env_vars
{ + [
"name": "MOUNTS", {"name": "MOUNTS", "value": ",".join(mount_info)},
"value": ",".join(mount_info) {"name": "PVCS", "value": ",".join(pvc_info)},
}, {"name": "ENV", "value": "/root/.ashrc"},
{
"name": "PVCS",
"value": ",".join(pvc_info)
},
{
"name": "ENV",
"value": "/root/.ashrc"
}
], ],
"volumeMounts": [format_volume_mount(vm) for vm in volume_mounts], "volumeMounts": formatted_mounts,
} }
], ],
"volumes": [format_volume(v) for v in volumes if format_volume(v)], "volumes": [format_volume(v) for v in volumes if format_volume(v)],
"restartPolicy": "Never" "restartPolicy": "Never",
} },
} },
} },
} }
return inspector_job
# Apply the job spec
@app.command()
def create_inspector(
namespace: Optional[str] = typer.Option(
None,
help="Kubernetes namespace. If not specified, will search for pods across all namespaces.",
),
):
"""
Create a PVC inspector job. If namespace is not specified, will search for pods across all namespaces.
The inspector job will be created in the same namespace as the selected pod.
"""
pods = get_pods(namespace)
if not pods:
typer.echo("No pods found.")
raise typer.Exit(1)
selected_pod, selected_namespace = fuzzy_select(pods)
if not selected_pod:
typer.echo("No pod selected.")
raise typer.Exit(1)
pod_spec = get_pod_spec(selected_pod, selected_namespace)
volume_mounts, volumes = get_pod_volumes_and_mounts(pod_spec)
inspector_job = create_inspector_job(
client.CoreV1Api(), selected_namespace, selected_pod, volume_mounts, volumes
)
# Output the job manifest
typer.echo(yaml.dump(clean_dict(inspector_job), sort_keys=False)) typer.echo(yaml.dump(clean_dict(inspector_job), sort_keys=False))
@app.command()
def cleanup_inspectors(
namespace: Optional[str] = typer.Option(
None,
help="Kubernetes namespace. If not specified, will cleanup in all namespaces.",
),
):
"""
Delete all PVC inspector jobs in the specified namespace or all namespaces
"""
config.load_kube_config()
batch_api = client.BatchV1Api()
try:
if namespace:
logging.debug(f"Listing jobs in namespace {namespace}")
jobs = batch_api.list_namespaced_job(
namespace=namespace, label_selector="app=pvc-inspector"
)
else:
logging.debug("Listing jobs in all namespaces")
jobs = batch_api.list_job_for_all_namespaces(
label_selector="app=pvc-inspector"
)
if not jobs.items:
typer.echo("No PVC inspector jobs found.")
return
# Delete each job
for job in jobs.items:
try:
logging.debug(
f"Deleting job {job.metadata.namespace}/{job.metadata.name}"
)
batch_api.delete_namespaced_job(
name=job.metadata.name,
namespace=job.metadata.namespace,
body=client.V1DeleteOptions(propagation_policy="Background"),
)
typer.echo(f"Deleted job: {job.metadata.namespace}/{job.metadata.name}")
except client.exceptions.ApiException as e:
logging.error(
f"Failed to delete job {job.metadata.namespace}/{job.metadata.name}: {e}"
)
typer.echo(
f"Failed to delete job {job.metadata.namespace}/{job.metadata.name}: {e}",
err=True,
)
except client.exceptions.ApiException as e:
logging.error(f"Failed to list jobs: {e}")
typer.echo(f"Failed to list jobs: {e}", err=True)
raise typer.Exit(1)
typer.echo("Cleanup complete.")
if __name__ == "__main__": if __name__ == "__main__":
app() app()