implement port forward

This commit is contained in:
Waylon S. Walker 2025-04-17 09:09:40 -05:00
parent 2899ee23eb
commit 47e7f1cb5e

View file

@ -388,17 +388,7 @@ def version_callback(value: bool):
raise typer.Exit() raise typer.Exit()
@app.command() def get_pod(namespace: Optional[str] = None):
def exec(
namespace: Optional[str] = typer.Option(
None,
help="Kubernetes namespace. If not specified, will search for inspectors across all namespaces.",
),
):
"""
Enter the Krayt dragon's lair! Connect to a running inspector pod.
If multiple inspectors are found, you'll get to choose which one to explore.
"""
config.load_kube_config() config.load_kube_config()
batch_api = client.BatchV1Api() batch_api = client.BatchV1Api()
@ -438,25 +428,76 @@ def exec(
typer.echo("No inspector selected.") typer.echo("No inspector selected.")
raise typer.Exit(1) raise typer.Exit(1)
exec_command = [
"kubectl",
"exec",
"-it",
"-n",
pod_namespace,
pod_name,
"--",
"/bin/bash",
"-l",
]
os.execvp("kubectl", exec_command)
except client.exceptions.ApiException as e: except client.exceptions.ApiException as e:
logging.error(f"Failed to list jobs: {e}") logging.error(f"Failed to list jobs: {e}")
typer.echo(f"Failed to list jobs: {e}", err=True) typer.echo(f"Failed to list jobs: {e}", err=True)
raise typer.Exit(1) raise typer.Exit(1)
return pod_name, pod_namespace
@app.command()
def exec(
namespace: Optional[str] = typer.Option(
None,
help="Kubernetes namespace. If not specified, will search for inspectors across all namespaces.",
),
):
"""
Enter the Krayt dragon's lair! Connect to a running inspector pod.
If multiple inspectors are found, you'll get to choose which one to explore.
"""
pod_name, pod_namespace = get_pod(namespace)
exec_command = [
"kubectl",
"exec",
"-it",
"-n",
pod_namespace,
pod_name,
"--",
"/bin/bash",
"-l",
]
os.execvp("kubectl", exec_command)
@app.command()
def port_forward(
namespace: Optional[str] = typer.Option(
None,
help="Kubernetes namespace. If not specified, will search for inspectors across all namespaces.",
),
port: str = typer.Option(
"8080:8080",
"--port",
"-p",
help="Port to forward to the inspector pod",
),
):
"""
Enter the Krayt dragon's lair! Connect to a running inspector pod.
If multiple inspectors are found, you'll get to choose which one to explore.
"""
if ":" not in port:
# if port does not contain a ":" it should be an int
port = int(port)
port = f"{port}:{port}"
pod_name, pod_namespace = get_pod(namespace)
port_forward_command = [
"kubectl",
"port-forward",
"-n",
pod_namespace,
pod_name,
port,
]
os.execvp("kubectl", port_forward_command)
@app.command() @app.command()
def clean( def clean(