This commit is contained in:
Waylon S. Walker 2025-03-25 09:42:49 -05:00
parent 5238ef5f6b
commit 256e62377e

288
krayt.py
View file

@ -16,12 +16,11 @@ Hunt down storage issues and explore your persistent data like a true Tatooine d
May the Force be with your volumes!
"""
import os
import glob
from pathlib import Path
from iterfzf import iterfzf
from kubernetes import client, config
import logging
import os
from pathlib import Path
import time
import typer
from typing import Any, Optional
@ -101,47 +100,57 @@ def fuzzy_select(items):
if not items:
return None, None
# Format items as "namespace/name" for display
formatted_items = [f"{ns}/{name}" for name, ns in items]
logging.debug(f"Found {len(formatted_items)} pods")
# If there's only one item, return it without prompting
if len(items) == 1:
return items[0]
# Format items for display
formatted_items = [f"{name} ({namespace})" for name, namespace in items]
# Use fzf for selection
try:
# Use iterfzf for selection
selected = iterfzf(formatted_items)
if selected:
namespace, name = selected.split("/")
logging.debug(f"Selected pod {name} in namespace {namespace}")
return name, namespace
else:
logging.debug("No selection made")
if not selected:
return None, None
# Parse selection back into name and namespace
# Example: "pod-name (namespace)" -> ("pod-name", "namespace")
name = selected.split(" (")[0]
namespace = selected.split(" (")[1][:-1]
return name, namespace
except Exception as e:
logging.error(f"Error during selection: {e}", exc_info=True)
typer.echo(f"Error during selection: {e}", err=True)
raise typer.Exit(1)
typer.echo(f"Error during selection: {e}")
return None, None
def get_pods(namespace=None):
def get_pods(
namespace=None,
label_selector: str = "app=krayt",
):
"""Get list of pods in the specified namespace or all namespaces"""
config.load_kube_config()
v1 = client.CoreV1Api()
try:
config.load_kube_config()
api = client.CoreV1Api()
if namespace:
logging.debug(f"Listing pods in namespace {namespace}")
pod_list = v1.list_namespaced_pod(namespace=namespace)
pods = api.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
)
else:
logging.debug("Listing pods in all namespaces")
pod_list = v1.list_pod_for_all_namespaces()
pods = api.list_pod_for_all_namespaces(
label_selector=label_selector,
)
pods = [(pod.metadata.name, pod.metadata.namespace) for pod in pod_list.items]
logging.debug(f"Found {len(pods)} pods")
return pods
except client.exceptions.ApiException as e:
logging.error(f"Error listing pods: {e}")
typer.echo(f"Error listing pods: {e}", err=True)
# Convert to list of (name, namespace) tuples
pod_list = []
for pod in pods.items:
if pod.metadata.namespace not in PROTECTED_NAMESPACES:
pod_list.append((pod.metadata.name, pod.metadata.namespace))
return pod_list
except client.rest.ApiException as e:
typer.echo(f"Error listing pods: {e}")
raise typer.Exit(1)
@ -206,8 +215,14 @@ def get_env_vars_and_secret_volumes(api, namespace: str):
volumes = []
# Add proxy environment variables if they exist in the host environment
proxy_vars = ["HTTP_PROXY", "HTTPS_PROXY", "NO_PROXY",
"http_proxy", "https_proxy", "no_proxy"]
proxy_vars = [
"HTTP_PROXY",
"HTTPS_PROXY",
"NO_PROXY",
"http_proxy",
"https_proxy",
"no_proxy",
]
for var in proxy_vars:
if var in os.environ:
@ -218,9 +233,8 @@ def get_env_vars_and_secret_volumes(api, namespace: str):
secrets = api.list_namespaced_secret(namespace)
for secret in secrets.items:
# Skip service account tokens and other system secrets
if (
secret.type != "Opaque"
or secret.metadata.name.startswith("default-token-")
if secret.type != "Opaque" or secret.metadata.name.startswith(
"default-token-"
):
continue
@ -259,15 +273,15 @@ def get_init_scripts():
for script in scripts:
try:
with open(script, 'r') as f:
with open(script, "r") as f:
script_content = f.read()
if script_content:
init_script += f"echo '=== Running {script.name} ==='\n"
# Write each script to a separate file
init_script += f"cat > /tmp/{script.name} << 'EOFSCRIPT'\n"
init_script += script_content
if not script_content.endswith('\n'):
init_script += '\n'
if not script_content.endswith("\n"):
init_script += "\n"
init_script += "EOFSCRIPT\n\n"
# Make it executable and run it
init_script += f"chmod +x /tmp/{script.name}\n"
@ -284,7 +298,7 @@ def get_motd_script(mount_info, pvc_info):
"""Generate the MOTD script with proper escaping"""
return f"""
# Create MOTD
cat << 'EOF' > /etc/motd
cat << EOF > /etc/motd
====================================
Krayt Dragon's Lair
A safe haven for volume inspection
@ -293,21 +307,27 @@ A safe haven for volume inspection
"Inside every volume lies a pearl of wisdom waiting to be discovered."
Mounted Volumes:
$(echo "{','.join(mount_info)}" | tr ',' '\\n' | sed 's/^/- /')
$(echo "{",".join(mount_info)}" | tr ',' '\\n' | sed 's/^/- /')
Persistent Volume Claims:
$(echo "{','.join(pvc_info)}" | tr ',' '\\n' | sed 's/^/- /')
$(echo "{",".join(pvc_info)}" | tr ',' '\\n' | sed 's/^/- /')
Mounted Secrets:
$(for d in /mnt/secrets/*; do if [ -d "$d" ]; then echo "- $(basename $d)"; fi; done)
Init Script Status:
$(if [ -f /tmp/init.log ]; then echo "View initialization log at /tmp/init.log"; fi)
EOF"""
EOF
"""
def create_inspector_job(
api, namespace: str, pod_name: str, volume_mounts: list, volumes: list, image: str = "alpine:latest"
api,
namespace: str,
pod_name: str,
volume_mounts: list,
volumes: list,
image: str = "alpine:latest",
):
"""Create a Krayt inspector job with the given mounts"""
timestamp = int(time.time())
@ -351,7 +371,8 @@ def create_inspector_job(
command_parts = []
# Configure apk proxy settings BEFORE any package installation
command_parts.extend([
command_parts.extend(
[
"# Configure apk proxy settings",
"mkdir -p /etc/apk",
"cat > /etc/apk/repositories << 'EOF'",
@ -359,8 +380,8 @@ def create_inspector_job(
"https://dl-cdn.alpinelinux.org/alpine/latest-stable/community",
"EOF",
"",
"if [ ! -z \"$HTTP_PROXY\" ]; then",
" echo \"Setting up apk proxy configuration...\"",
'if [ ! -z "$HTTP_PROXY" ]; then',
' echo "Setting up apk proxy configuration..."',
" mkdir -p /etc/apk/",
" cat > /etc/apk/repositories << EOF",
"#/media/cdrom/apks",
@ -371,12 +392,59 @@ def create_inspector_job(
"proxy=$HTTP_PROXY",
"EOF",
"fi",
""
])
"",
"# Install basic tools first",
"apk update",
"apk add curl",
"",
"# Install uv CLI",
"echo 'Installing uv CLI...'",
"curl -LsSf https://astral.sh/uv/install.sh | sh",
"echo 'uv version:'",
"uv --version",
"",
"echo 'Installing starship...'",
"curl -sS https://starship.rs/install.sh | sh -s -- -y",
"echo 'starship version:'",
"starship --version",
"",
"",
"# Install additional tools",
"apk add "
+ " ".join(
[
"ripgrep",
"exa",
"ncdu",
"dust",
"file",
"hexyl",
"jq",
"yq",
"bat",
"fd",
"fzf",
"htop",
"bottom",
"difftastic",
"mtr",
"bind-tools",
"aws-cli",
"sqlite",
"sqlite-dev",
"sqlite-libs",
"bash",
"neovim",
]
),
"",
]
)
# Add init scripts if present
if init_scripts:
command_parts.extend([
command_parts.extend(
[
"# Write and run init scripts",
"mkdir -p /tmp/init.d",
"cat > /tmp/init.sh << 'EOFSCRIPT'",
@ -387,27 +455,30 @@ def create_inspector_job(
"chmod +x /tmp/init.sh",
"/tmp/init.sh 2>&1 | tee /tmp/init.log",
"echo 'Init script log available at /tmp/init.log'",
""
])
"",
]
)
# Add base installation commands AFTER proxy configuration
command_parts.extend([
"# Install basic tools first",
"apk update",
"apk add curl",
"",
"# Install additional tools",
"apk add ripgrep exa ncdu dust file hexyl jq yq bat fd fzf htop bottom difftastic mtr bind-tools aws-cli sqlite sqlite-dev sqlite-libs",
"",
# Add shell setup and MOTD
command_parts.extend(
[
"# Create .ashrc with MOTD",
"cat > /root/.ashrc << 'EOF'",
"# Display MOTD on login",
"[ -f /etc/motd ] && cat /etc/motd",
"EOF",
"",
"# Set up shell environment",
"export EDITOR=vi",
"export PAGER=less",
"# Set up aliases",
"alias ll='ls -la'",
"alias l='ls -la'",
"alias la='ls -la'",
"alias vi='vim'",
"# Set up PATH",
"export PATH=/root/.local/bin:$PATH",
'eval "$(starship init bash)"',
"EOF",
"",
"",
"# Set up environment to always source our RC file",
"echo 'export ENV=/root/.ashrc' > /etc/profile",
@ -425,8 +496,9 @@ def create_inspector_job(
get_motd_script(mount_info, pvc_info),
"",
"# Keep container running",
"tail -f /dev/null"
])
"tail -f /dev/null",
]
)
inspector_job = {
"apiVersion": "batch/v1",
@ -435,25 +507,17 @@ def create_inspector_job(
"name": job_name,
"namespace": namespace,
"labels": {"app": "krayt"},
"annotations": {
"pvcs": ",".join(pvc_info) if pvc_info else "none"
}
"annotations": {"pvcs": ",".join(pvc_info) if pvc_info else "none"},
},
"spec": {
"template": {
"metadata": {
"labels": {"app": "krayt"}
},
"metadata": {"labels": {"app": "krayt"}},
"spec": {
"containers": [
{
"name": "inspector",
"image": image,
"command": [
"sh",
"-c",
"\n".join(command_parts)
],
"command": ["sh", "-c", "\n".join(command_parts)],
"env": env_vars,
"volumeMounts": formatted_mounts,
}
@ -495,7 +559,7 @@ def load_init_scripts():
for script in scripts:
try:
with open(script, 'r') as f:
with open(script, "r") as f:
exec(f.read(), globals())
logging.debug(f"Loaded init script: {script}")
except Exception as e:
@ -505,8 +569,14 @@ def load_init_scripts():
def setup_environment():
"""Set up the environment with proxy settings and other configurations"""
# Load environment variables for proxies
proxy_vars = ["HTTP_PROXY", "HTTPS_PROXY", "NO_PROXY",
"http_proxy", "https_proxy", "no_proxy"]
proxy_vars = [
"HTTP_PROXY",
"HTTPS_PROXY",
"NO_PROXY",
"http_proxy",
"https_proxy",
"no_proxy",
]
for var in proxy_vars:
if var in os.environ:
@ -596,7 +666,7 @@ def exec(
"--",
"/bin/sh",
"-c",
"cat /etc/motd; exec /bin/ash -l"
"cat /etc/motd; exec /bin/ash -l",
]
os.execvp("kubectl", exec_command)
@ -706,7 +776,8 @@ def create(
If namespace is not specified, will search for pods across all namespaces.
The inspector will be created in the same namespace as the selected pod.
"""
pods = get_pods(namespace)
# For create, we want to list all pods, not just Krayt pods
pods = get_pods(namespace, label_selector=None)
if not pods:
typer.echo("No pods found.")
raise typer.Exit(1)
@ -720,7 +791,12 @@ def create(
volume_mounts, volumes = get_pod_volumes_and_mounts(pod_spec)
inspector_job = create_inspector_job(
client.CoreV1Api(), selected_namespace, selected_pod, volume_mounts, volumes, image=image
client.CoreV1Api(),
selected_namespace,
selected_pod,
volume_mounts,
volumes,
image=image,
)
# Output the job manifest
@ -733,6 +809,54 @@ def version():
typer.echo(f"Version: {KRAYT_VERSION}")
@app.command()
def logs(
namespace: Optional[str] = typer.Option(
None,
help="Kubernetes namespace. If not specified, will search for inspectors across all namespaces.",
),
follow: bool = typer.Option(
False,
"--follow",
"-f",
help="Follow the logs in real-time",
),
):
"""
View logs from a Krayt inspector pod.
If multiple inspectors are found, you'll get to choose which one to explore.
"""
pods = get_pods(namespace)
if not pods:
typer.echo("No pods found.")
raise typer.Exit(1)
selected_pod, selected_namespace = fuzzy_select(pods)
if not selected_pod:
typer.echo("No pod selected.")
raise typer.Exit(1)
try:
config.load_kube_config()
api = client.CoreV1Api()
logs = api.read_namespaced_pod_log(
name=selected_pod,
namespace=selected_namespace,
follow=follow,
_preload_content=False,
)
if follow:
for line in logs:
typer.echo(line.decode("utf-8").strip())
else:
typer.echo(logs.read().decode("utf-8"))
except client.rest.ApiException as e:
typer.echo(f"Error reading logs: {e}")
raise typer.Exit(1)
def main():
setup_environment()
load_init_scripts()