improve tmux

This commit is contained in:
Waylon Walker 2025-11-26 10:35:18 -06:00
parent dd7742cf18
commit 5301b5384c

View file

@ -27,7 +27,6 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt, Confirm
from rich.panel import Panel
from rich.syntax import Syntax
from rich.panel import Panel
from iterfzf import iterfzf
@ -71,7 +70,9 @@ class Settings(BaseSettings):
)
@classmethod
def from_env_and_override(cls, override_workspaces_name: Optional[str]) -> "Settings":
def from_env_and_override(
cls, override_workspaces_name: Optional[str]
) -> "Settings":
"""
Construct settings honoring:
1. CLI override
@ -209,15 +210,16 @@ def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> P
try:
cwd.relative_to(workspaces_dir)
except ValueError:
console.print(
f"[red]Not inside workspaces_dir ({workspaces_dir}). "
"Please use --workspace to specify a workspace.[/red]"
)
raise typer.Exit(1)
workspace_root = pick_workspace_with_iterfzf(list_workspaces(workspaces_dir))
if workspace_root is None:
console.print("[red]No workspace selected. Exiting.[/red]")
raise typer.Exit(code=1)
return workspace_root
# The top-level workspace directory is the first component under workspaces_dir
rel = cwd.relative_to(workspaces_dir)
workspace_root = workspaces_dir / rel.parts[0]
print(f"Using workspace: {workspace_root}")
return workspace_root
@ -238,7 +240,9 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
return ws_dir.name, ""
# First non-empty line must be '# ...' per spec
first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None)
first_non_empty_idx = next(
(i for i, line in enumerate(lines) if line.strip()), None
)
if first_non_empty_idx is None:
return ws_dir.name, ""
@ -333,7 +337,7 @@ def main(
# Default behavior when no subcommand is provided:
if ctx.invoked_subcommand is None:
list_workspaces(ctx)
cli_list_workspaces(ctx)
raise typer.Exit(0)
@ -345,8 +349,14 @@ def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]:
# ---------------- list-workspaces ----------------
def list_workspaces(workspaces_dir: Path):
workspaces_dir.mkdir(parents=True, exist_ok=True)
return sorted(p for p in workspaces_dir.iterdir() if p.is_dir())
@app.command("list")
def list_workspaces(
@app.command("ls", hidden=True)
def cli_list_workspaces(
ctx: typer.Context,
):
"""
@ -427,7 +437,9 @@ def create_workspace(
dir_name = slugify_workspace_name(name)
ws_dir = workspaces_dir / dir_name
if ws_dir.exists():
console.print(f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]")
console.print(
f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]"
)
raise typer.Exit(1)
write_workspace_readme(ws_dir, name, description)
@ -464,7 +476,9 @@ def list_repos(
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
@ -520,6 +534,22 @@ def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
return None
def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
"""
Use iterfzf (Python library) to pick a workspace from a list of paths.
"""
names = [w.name for w in workspaces]
choice = iterfzf(names, prompt="pick a workspace> ")
if not choice:
return None
for w in workspaces:
if w.name == choice:
return w
return None
@app.command("add-repo")
@app.command("add", hidden=True)
def add_repo(
@ -554,7 +584,9 @@ def add_repo(
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
# Use display title from README to derive branch name
@ -629,9 +661,7 @@ def add_repo(
# ---------------- rm-workspace ----------------
def find_repo_for_worktree(
worktree_path: Path, repos_dir: Path
) -> Optional[Path]:
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Path]:
"""
Try to find the parent repo for a worktree, assuming it lives in repos_dir
with the same directory name as the worktree.
@ -723,6 +753,7 @@ def is_branch_integrated_into_main(
main_trees = {line.strip() for line in out.splitlines() if line.strip()}
return branch_tree in main_trees
@app.command("rm")
def remove_workspace(
ctx: typer.Context,
@ -760,7 +791,9 @@ def remove_workspace(
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
@ -855,6 +888,10 @@ def remove_workspace(
elif wt.exists():
# git worktree remove *should* delete the directory; if not, clean up.
shutil.rmtree(wt)
if in_tmux():
session_name = f"{ws_dir.name}|{repo.name}"
remove_session_if_exists(session_name)
# Finally, remove the workspace directory itself
shutil.rmtree(ws_dir)
console.print(
@ -873,9 +910,7 @@ def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
"""
code, out, err = run_cmd(["git", "status", "--porcelain"], cwd=repo_path)
if code != 0:
console.print(
f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}"
)
console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}")
return []
changes: List[Tuple[str, str]] = []
@ -937,6 +972,7 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
return selected_files
@app.command("wip")
def wip_workspace(
ctx: typer.Context,
@ -961,12 +997,15 @@ def wip_workspace(
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
commit_message = f"wip: {title or ws_dir.name}"
commit_workspace(ctx, workspace, message=commit_message)
@app.command("commit")
def commit_workspace(
ctx: typer.Context,
@ -996,13 +1035,14 @@ def commit_workspace(
if not message:
console.print(
"[red]No commit message provided. Exiting.[/red]",
file=sys.stderr,
)
raise typer.Exit(1)
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
@ -1034,9 +1074,7 @@ def commit_workspace(
for f in files_to_stage:
code, _out, err = run_cmd(["git", "add", f], cwd=wt)
if code != 0:
console.print(
f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}"
)
console.print(f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}")
# Check if there is anything to commit
code, out, _err = run_cmd(["git", "diff", "--cached", "--name-only"], cwd=wt)
@ -1047,13 +1085,9 @@ def commit_workspace(
# Commit
code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt)
if code != 0:
console.print(
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
)
console.print(f"[red]Failed to commit in {wt.name}:[/red]\n{err}")
else:
console.print(
f" [green]Created commit in {wt.name}:[/green] '{message}'"
)
console.print(f" [green]Created commit in {wt.name}:[/green] '{message}'")
# ---------------- push ----------------
@ -1086,7 +1120,9 @@ def push_workspace(
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
@ -1122,9 +1158,7 @@ def push_workspace(
f"[red]Failed to push {wt.name} ({branch}) to {remote}:[/red]\n{err}"
)
else:
console.print(
f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]"
)
console.print(f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]")
# ---------------- status (current workspace) ----------------
@ -1155,7 +1189,9 @@ def status_workspace(
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, desc = read_workspace_readme(ws_dir)
@ -1164,8 +1200,8 @@ def status_workspace(
# console.print(Panel(desc, title=title))
table = Table(
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
title_justify="left",
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
title_justify="left",
show_lines=False,
)
table.add_column("Repo (dir)", style="bold")
@ -1211,6 +1247,7 @@ def status_workspace(
if files_table.rows:
console.print(files_table)
@app.command("diff")
def diff_workspace(
ctx: typer.Context,
@ -1238,7 +1275,9 @@ def diff_workspace(
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
@ -1277,9 +1316,7 @@ def diff_workspace(
code, out, err = run_cmd(cmd, cwd=wt)
if code != 0:
console.print(
f"[red]Failed to get diff for {wt.name}:[/red]\n{err}"
)
console.print(f"[red]Failed to get diff for {wt.name}:[/red]\n{err}")
continue
if not out.strip():
@ -1300,12 +1337,20 @@ def diff_workspace(
if not any_diffs:
console.print("[green]No changes to diff in any repo.[/green]")
def in_tmux() -> bool:
"""Return True if inside tmux"""
return "TMUX" in os.environ
def not_in_tmux() -> bool:
"""Return True if not inside tmux or zellij."""
return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ")
"""Return True if not inside tmux"""
return not in_tmux()
def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess:
def run_tmux(
args: list[str], *, clear_tmux_env: bool = False
) -> subprocess.CompletedProcess:
"""Run a tmux command."""
env = os.environ.copy()
if clear_tmux_env:
@ -1314,9 +1359,16 @@ def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.Com
["tmux", *args],
env=env,
check=False,
capture_output=True,
)
def get_tmux_sessions() -> List[str]:
"""List all tmux sessions."""
result = run_tmux(["list-sessions", "-F", "#{session_name}"])
return result.stdout.decode().splitlines()
def session_exists(session_name: str) -> bool:
"""Check if the tmux session exists."""
result = run_tmux(["has-session", "-t", f"={session_name}"])
@ -1329,9 +1381,7 @@ def pick_project(base_dir: Path) -> Optional[str]:
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
return None
subdirs = sorted(
[p.name for p in base_dir.iterdir() if p.is_dir()]
)
subdirs = sorted([p.name for p in base_dir.iterdir() if p.is_dir()])
if not subdirs:
typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True)
return None
@ -1459,14 +1509,28 @@ def attach_to_first_session() -> None:
# After attach, show choose-tree (this will run in the attached session)
run_tmux(["choose-tree", "-Za"])
def remove_session_if_exists(session_name: str) -> None:
r = run_tmux(["has-session", "-t", session_name])
if r.returncode == 0:
remove_session(session_name)
def remove_session(session_name: str) -> None:
r = run_tmux(["kill-session", "-t", session_name])
if r.returncode != 0:
raise typer.Exit(r.returncode)
tmux_app = typer.Typer(
name="tmux",
help="tmux commands",
add_completion=False,
)
@tmux_app.command('attach')
def attach(
@tmux_app.command("attach")
def tmux_cli_attach(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
@ -1492,15 +1556,107 @@ def attach(
if not repo:
console.print("[red]No repo selected. Exiting.[/red]")
raise typer.Exit(1)
print(ws_dir, repo)
session_name = f'{ws_dir.name}|{repo.name}'
print(f'{ws_dir.name}|{repo.name}')
session_name = f"ω|{ws_dir.name}|{repo.name}"
print(f"Session name: {session_name}")
create_if_needed_and_attach(session_name, repo, False)
@tmux_app.command("list-sessions")
def tmux_cli_list_sessions(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
):
if not_in_tmux():
console.print("[red]Not in tmux. Exiting.[/red]")
raise typer.Exit(1)
if not workspace:
tmux_sessions = [session for session in get_tmux_sessions() if "|" in session]
console.print(tmux_sessions)
return
tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith(workspace)
]
console.print(tmux_sessions)
@tmux_app.command("remove")
def tmux_cli_remove(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
dry_run: bool = typer.Option(
False,
"--dry-run",
"-d",
help="Don't actually remove the session.",
),
):
if not_in_tmux():
console.print("[red]Not in tmux. Exiting.[/red]")
raise typer.Exit(1)
if workspace:
tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith(workspace)
]
else:
tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith("ω")
]
if not dry_run:
if not Confirm.ask(
f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n",
default=False,
):
raise typer.Exit(1)
if dry_run:
console.print(tmux_sessions)
return
for session_name in tmux_sessions:
remove_session(session_name)
#
# remove_session_if_exists(session_name)
app.add_typer(tmux_app)
@app.command("attach")
def attach(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
):
if in_tmux():
tmux_cli_attach(ctx, workspace)
if __name__ == "__main__":
app()