python native exec

This commit is contained in:
Waylon S. Walker 2025-04-18 14:05:48 -05:00
parent cc425cf812
commit 7daa9a3874

View file

@ -1,5 +1,6 @@
import iterfzf
from krayt.templates import env
from kubernetes.stream import stream
from kubernetes import client, config
import logging
import os
@ -8,6 +9,10 @@ import typer
from typing import Any, List, Optional
import yaml
from krayt.__about__ import __version__
import sys
import tty
import termios
import select
logging.basicConfig(level=logging.WARNING)
@ -35,8 +40,8 @@ def format_volume_mount(vm: client.V1VolumeMount) -> dict[str, Any]:
return clean_dict(
{
"name": vm.name,
"mountPath": vm.mount_path,
"readOnly": vm.read_only if vm.read_only else None,
"mount_path": vm.mount_path,
"read_only": vm.read_only if vm.read_only else None,
}
)
@ -271,92 +276,81 @@ def create_inspector_job(
pre_init_hooks: Optional[List[str]] = None,
post_init_hooks: Optional[List[str]] = None,
):
"""Create a Krayt inspector job with the given mounts"""
timestamp = int(time.time())
job_name = f"{pod_name}-krayt-{timestamp}"
# Get environment variables and secret volumes from the target pod
env_vars, secret_volumes = get_env_vars_and_secret_volumes(api, namespace)
# Add secret volumes to our volumes list
volumes.extend(secret_volumes)
# Create corresponding volume mounts for secrets
secret_mounts = []
for vol in secret_volumes:
secret_mounts.append(
{
"name": vol.name,
"mountPath": f"/mnt/secrets/{vol.secret.secret_name}",
"readOnly": True,
}
secret_mounts = [
client.V1VolumeMount(
name=vol.name,
mount_path=f"/mnt/secrets/{vol.secret.secret_name}",
read_only=True,
)
for vol in secret_volumes
]
# Convert volume mounts to dictionaries
formatted_mounts = [format_volume_mount(vm) for vm in volume_mounts]
formatted_mounts = [client.V1VolumeMount(**vm) for vm in formatted_mounts if vm]
formatted_mounts.extend(secret_mounts)
# Format mount and PVC info for MOTD
mount_info = []
for vm in formatted_mounts:
if vm:
mount_info.append(f"{vm['name']}:{vm['mountPath']}")
pvc_info = [
f"{v.name}:{v.persistent_volume_claim.claim_name}"
for v in volumes
if hasattr(v, "persistent_volume_claim") and v.persistent_volume_claim
]
pvc_info = []
for v in volumes:
if hasattr(v, "persistent_volume_claim") and v.persistent_volume_claim:
pvc_info.append(f"{v.name}:{v.persistent_volume_claim.claim_name}")
template_name = "base.sh"
template = env.get_template(template_name)
pvcs = None
pre_init_scripts = None
post_init_scripts = None
pre_init_hooks = None
post_init_hooks = None
template = env.get_template("base.sh")
command = template.render(
volumes=volumes,
pvcs=pvcs,
pvcs=None,
additional_packages=additional_packages,
pre_init_scripts=pre_init_scripts,
post_init_scripts=post_init_scripts,
pre_init_hooks=pre_init_hooks,
post_init_hooks=post_init_hooks,
pre_init_scripts=None,
post_init_scripts=None,
pre_init_hooks=None,
post_init_hooks=None,
)
inspector_job = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": job_name,
"namespace": namespace,
"labels": {"app": "krayt"},
"annotations": {"pvcs": ",".join(pvc_info) if pvc_info else "none"},
},
"spec": {
"ttlSecondsAfterFinished": 600,
"template": {
"metadata": {"labels": {"app": "krayt"}},
"spec": {
"containers": [
{
"name": "inspector",
"image": image,
"command": ["sh", "-c", command],
"env": env_vars,
"volumeMounts": formatted_mounts,
}
],
"volumes": [format_volume(v) for v in volumes if format_volume(v)],
"imagePullSecrets": [{"name": imagepullsecret}]
container = client.V1Container(
name="inspector",
image=image,
command=["sh", "-c", command],
env=env_vars,
volume_mounts=formatted_mounts,
)
spec = client.V1PodSpec(
containers=[container],
volumes=[format_volume(v) for v in volumes if format_volume(v)],
restart_policy="Never",
image_pull_secrets=[client.V1LocalObjectReference(name=imagepullsecret)]
if imagepullsecret
else None,
"restartPolicy": "Never",
},
},
},
}
return inspector_job
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "krayt"}), spec=spec
)
job_spec = client.V1JobSpec(
template=template,
ttl_seconds_after_finished=600,
)
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(
name=job_name,
namespace=namespace,
labels={"app": "krayt"},
annotations={"pvcs": ",".join(pvc_info) if pvc_info else "none"},
),
spec=job_spec,
)
return job
PROTECTED_NAMESPACES = {
@ -449,6 +443,89 @@ def get_pod(namespace: Optional[str] = None):
return pod_name, pod_namespace
# @app.command()
# def exec(
# namespace: Optional[str] = typer.Option(
# None,
# help="Kubernetes namespace. If not specified, will search for inspectors across all namespaces.",
# ),
# ):
# """
# Enter the Krayt dragon's lair! Connect to a running inspector pod.
# If multiple inspectors are found, you'll get to choose which one to explore.
# """
#
# pod_name, pod_namespace = get_pod(namespace)
# exec_command = [
# "kubectl",
# "exec",
# "-it",
# "-n",
# pod_namespace,
# pod_name,
# "--",
# "/bin/bash",
# "-l",
# ]
#
# os.execvp("kubectl", exec_command)
def interactive_exec(pod_name: str, namespace: str):
# Load kubeconfig from local context (or use load_incluster_config if running inside the cluster)
config.load_kube_config()
core_v1 = client.CoreV1Api()
command = ["/bin/bash", "-i"]
# Save the current terminal settings
oldtty = termios.tcgetattr(sys.stdin)
try:
# Put terminal into raw mode but don't handle local echo ourselves
# Let the remote terminal handle echoing and control characters
tty.setraw(sys.stdin.fileno())
# Create a TTY-enabled exec connection to the pod
resp = stream(
core_v1.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=command,
stderr=True,
stdin=True,
stdout=True,
tty=True,
_preload_content=False,
)
# Set up a simple select-based event loop to handle I/O
while resp.is_open():
# Update the websocket connection
resp.update(timeout=0.1)
# Handle output from the pod
if resp.peek_stdout():
sys.stdout.write(resp.read_stdout())
sys.stdout.flush()
if resp.peek_stderr():
sys.stderr.write(resp.read_stderr())
sys.stderr.flush()
# Check for input from the user
rlist, _, _ = select.select([sys.stdin], [], [], 0.01)
if sys.stdin in rlist:
# Read input and forward it to the pod without local echo
data = os.read(sys.stdin.fileno(), 1024)
if data:
resp.write_stdin(data.decode())
except Exception as e:
print(f"\nError in interactive session: {e}", file=sys.stderr)
finally:
# Always restore terminal settings
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
@app.command()
def exec(
namespace: Optional[str] = typer.Option(
@ -460,21 +537,28 @@ def exec(
Enter the Krayt dragon's lair! Connect to a running inspector pod.
If multiple inspectors are found, you'll get to choose which one to explore.
"""
config.load_kube_config() # or config.load_incluster_config() if running inside a pod
core_v1 = client.CoreV1Api()
pod_name, pod_namespace = get_pod(namespace)
exec_command = [
"kubectl",
"exec",
"-it",
"-n",
pod_namespace,
pod_name,
"--",
"/bin/bash",
"-l",
]
interactive_exec(pod_name, pod_namespace)
os.execvp("kubectl", exec_command)
# command = ["/bin/bash", "-l"]
# print(f"kubectl exec -it -n {pod_namespace} {pod_name} -- {' '.join(command)}")
# print(
# f"execing into {pod_name} in {pod_namespace} with command {' '.join(command)}"
# )
# resp = stream(
# core_v1.connect_get_namespaced_pod_exec,
# pod_name,
# pod_namespace,
# command=command,
# stderr=True,
# stdin=True,
# stdout=True,
# tty=True,
# )
# print(resp)
@app.command()
@ -687,9 +771,6 @@ def create(
typer.echo("No pod selected.")
raise typer.Exit(1)
# typer.echo(f"Selected pod exists: {selected_pod in (p[0] for p in pods)}")
# typer.echo(f"Selected pod: {selected_pod} ({selected_namespace})")
pod_spec = get_pod_spec(selected_pod, selected_namespace)
volume_mounts, volumes = get_pod_volumes_and_mounts(pod_spec)
@ -709,33 +790,11 @@ def create(
)
# Output the job manifest
job_yaml = yaml.dump(clean_dict(inspector_job), sort_keys=False)
api_client = client.ApiClient()
job_dict = api_client.sanitize_for_serialization(inspector_job)
job_yaml = yaml.dump(job_dict, sort_keys=False)
if apply:
# # Apply the job to the cluster
# import tempfile
# import subprocess
#
# with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") as temp_file:
# temp_file.write(job_yaml)
# temp_file.flush()
#
# try:
# typer.echo(
# f"Applying job {job_name} to namespace {selected_namespace}..."
# )
# result = subprocess.run(
# ["kubectl", "apply", "-f", temp_file.name],
# capture_output=True,
# text=True,
# check=True,
# )
# typer.echo(result.stdout)
# typer.echo(f"Successfully created inspector job {job_name}")
# except subprocess.CalledProcessError as e:
# typer.echo(f"Error applying job: {e.stderr}", err=True)
# raise typer.Exit(1)
#
batch_api = client.BatchV1Api()
job = batch_api.create_namespaced_job(
namespace=selected_namespace,