From 5301b5384cfed63ffd6b6f9b16b7279a1c50d648 Mon Sep 17 00:00:00 2001 From: "Waylon S. Walker" Date: Wed, 26 Nov 2025 10:35:18 -0600 Subject: [PATCH] improve tmux --- workspaces.py | 268 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 212 insertions(+), 56 deletions(-) diff --git a/workspaces.py b/workspaces.py index 99b88ab..3dac0d0 100755 --- a/workspaces.py +++ b/workspaces.py @@ -27,7 +27,6 @@ from pydantic_settings import BaseSettings, SettingsConfigDict from rich.console import Console from rich.table import Table from rich.prompt import Prompt, Confirm -from rich.panel import Panel from rich.syntax import Syntax from rich.panel import Panel from iterfzf import iterfzf @@ -71,7 +70,9 @@ class Settings(BaseSettings): ) @classmethod - def from_env_and_override(cls, override_workspaces_name: Optional[str]) -> "Settings": + def from_env_and_override( + cls, override_workspaces_name: Optional[str] + ) -> "Settings": """ Construct settings honoring: 1. CLI override @@ -209,15 +210,16 @@ def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> P try: cwd.relative_to(workspaces_dir) except ValueError: - console.print( - f"[red]Not inside workspaces_dir ({workspaces_dir}). " - "Please use --workspace to specify a workspace.[/red]" - ) - raise typer.Exit(1) + workspace_root = pick_workspace_with_iterfzf(list_workspaces(workspaces_dir)) + if workspace_root is None: + console.print("[red]No workspace selected. Exiting.[/red]") + raise typer.Exit(code=1) + return workspace_root # The top-level workspace directory is the first component under workspaces_dir rel = cwd.relative_to(workspaces_dir) workspace_root = workspaces_dir / rel.parts[0] + print(f"Using workspace: {workspace_root}") return workspace_root @@ -238,7 +240,9 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]: return ws_dir.name, "" # First non-empty line must be '# ...' per spec - first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None) + first_non_empty_idx = next( + (i for i, line in enumerate(lines) if line.strip()), None + ) if first_non_empty_idx is None: return ws_dir.name, "" @@ -333,7 +337,7 @@ def main( # Default behavior when no subcommand is provided: if ctx.invoked_subcommand is None: - list_workspaces(ctx) + cli_list_workspaces(ctx) raise typer.Exit(0) @@ -345,8 +349,14 @@ def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]: # ---------------- list-workspaces ---------------- +def list_workspaces(workspaces_dir: Path): + workspaces_dir.mkdir(parents=True, exist_ok=True) + return sorted(p for p in workspaces_dir.iterdir() if p.is_dir()) + + @app.command("list") -def list_workspaces( +@app.command("ls", hidden=True) +def cli_list_workspaces( ctx: typer.Context, ): """ @@ -427,7 +437,9 @@ def create_workspace( dir_name = slugify_workspace_name(name) ws_dir = workspaces_dir / dir_name if ws_dir.exists(): - console.print(f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]") + console.print( + f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]" + ) raise typer.Exit(1) write_workspace_readme(ws_dir, name, description) @@ -464,7 +476,9 @@ def list_repos( ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) @@ -520,6 +534,22 @@ def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]: return None +def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]: + """ + Use iterfzf (Python library) to pick a workspace from a list of paths. + """ + names = [w.name for w in workspaces] + choice = iterfzf(names, prompt="pick a workspace> ") + + if not choice: + return None + + for w in workspaces: + if w.name == choice: + return w + return None + + @app.command("add-repo") @app.command("add", hidden=True) def add_repo( @@ -554,7 +584,9 @@ def add_repo( _settings, repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) # Use display title from README to derive branch name @@ -629,9 +661,7 @@ def add_repo( # ---------------- rm-workspace ---------------- -def find_repo_for_worktree( - worktree_path: Path, repos_dir: Path -) -> Optional[Path]: +def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Path]: """ Try to find the parent repo for a worktree, assuming it lives in repos_dir with the same directory name as the worktree. @@ -723,6 +753,7 @@ def is_branch_integrated_into_main( main_trees = {line.strip() for line in out.splitlines() if line.strip()} return branch_tree in main_trees + @app.command("rm") def remove_workspace( ctx: typer.Context, @@ -760,7 +791,9 @@ def remove_workspace( ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) @@ -855,6 +888,10 @@ def remove_workspace( elif wt.exists(): # git worktree remove *should* delete the directory; if not, clean up. shutil.rmtree(wt) + + if in_tmux(): + session_name = f"{ws_dir.name}|{repo.name}" + remove_session_if_exists(session_name) # Finally, remove the workspace directory itself shutil.rmtree(ws_dir) console.print( @@ -873,9 +910,7 @@ def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]: """ code, out, err = run_cmd(["git", "status", "--porcelain"], cwd=repo_path) if code != 0: - console.print( - f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}" - ) + console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}") return [] changes: List[Tuple[str, str]] = [] @@ -937,6 +972,7 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis return selected_files + @app.command("wip") def wip_workspace( ctx: typer.Context, @@ -961,12 +997,15 @@ def wip_workspace( _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) commit_message = f"wip: {title or ws_dir.name}" commit_workspace(ctx, workspace, message=commit_message) + @app.command("commit") def commit_workspace( ctx: typer.Context, @@ -996,13 +1035,14 @@ def commit_workspace( if not message: console.print( "[red]No commit message provided. Exiting.[/red]", - file=sys.stderr, ) raise typer.Exit(1) _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) @@ -1034,9 +1074,7 @@ def commit_workspace( for f in files_to_stage: code, _out, err = run_cmd(["git", "add", f], cwd=wt) if code != 0: - console.print( - f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}" - ) + console.print(f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}") # Check if there is anything to commit code, out, _err = run_cmd(["git", "diff", "--cached", "--name-only"], cwd=wt) @@ -1047,13 +1085,9 @@ def commit_workspace( # Commit code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt) if code != 0: - console.print( - f"[red]Failed to commit in {wt.name}:[/red]\n{err}" - ) + console.print(f"[red]Failed to commit in {wt.name}:[/red]\n{err}") else: - console.print( - f" [green]Created commit in {wt.name}:[/green] '{message}'" - ) + console.print(f" [green]Created commit in {wt.name}:[/green] '{message}'") # ---------------- push ---------------- @@ -1086,7 +1120,9 @@ def push_workspace( _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) @@ -1122,9 +1158,7 @@ def push_workspace( f"[red]Failed to push {wt.name} ({branch}) to {remote}:[/red]\n{err}" ) else: - console.print( - f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]" - ) + console.print(f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]") # ---------------- status (current workspace) ---------------- @@ -1155,7 +1189,9 @@ def status_workspace( _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) title, desc = read_workspace_readme(ws_dir) @@ -1164,8 +1200,8 @@ def status_workspace( # console.print(Panel(desc, title=title)) table = Table( - title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", - title_justify="left", + title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", + title_justify="left", show_lines=False, ) table.add_column("Repo (dir)", style="bold") @@ -1211,6 +1247,7 @@ def status_workspace( if files_table.rows: console.print(files_table) + @app.command("diff") def diff_workspace( ctx: typer.Context, @@ -1238,7 +1275,9 @@ def diff_workspace( ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + console.print( + f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" + ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) @@ -1277,9 +1316,7 @@ def diff_workspace( code, out, err = run_cmd(cmd, cwd=wt) if code != 0: - console.print( - f"[red]Failed to get diff for {wt.name}:[/red]\n{err}" - ) + console.print(f"[red]Failed to get diff for {wt.name}:[/red]\n{err}") continue if not out.strip(): @@ -1300,12 +1337,20 @@ def diff_workspace( if not any_diffs: console.print("[green]No changes to diff in any repo.[/green]") + +def in_tmux() -> bool: + """Return True if inside tmux""" + return "TMUX" in os.environ + + def not_in_tmux() -> bool: - """Return True if not inside tmux or zellij.""" - return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ") + """Return True if not inside tmux""" + return not in_tmux() -def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess: +def run_tmux( + args: list[str], *, clear_tmux_env: bool = False +) -> subprocess.CompletedProcess: """Run a tmux command.""" env = os.environ.copy() if clear_tmux_env: @@ -1314,9 +1359,16 @@ def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.Com ["tmux", *args], env=env, check=False, + capture_output=True, ) +def get_tmux_sessions() -> List[str]: + """List all tmux sessions.""" + result = run_tmux(["list-sessions", "-F", "#{session_name}"]) + return result.stdout.decode().splitlines() + + def session_exists(session_name: str) -> bool: """Check if the tmux session exists.""" result = run_tmux(["has-session", "-t", f"={session_name}"]) @@ -1329,9 +1381,7 @@ def pick_project(base_dir: Path) -> Optional[str]: typer.echo(f"[ta] {base_dir} is not a directory.", err=True) return None - subdirs = sorted( - [p.name for p in base_dir.iterdir() if p.is_dir()] - ) + subdirs = sorted([p.name for p in base_dir.iterdir() if p.is_dir()]) if not subdirs: typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True) return None @@ -1459,14 +1509,28 @@ def attach_to_first_session() -> None: # After attach, show choose-tree (this will run in the attached session) run_tmux(["choose-tree", "-Za"]) + +def remove_session_if_exists(session_name: str) -> None: + r = run_tmux(["has-session", "-t", session_name]) + if r.returncode == 0: + remove_session(session_name) + + +def remove_session(session_name: str) -> None: + r = run_tmux(["kill-session", "-t", session_name]) + if r.returncode != 0: + raise typer.Exit(r.returncode) + + tmux_app = typer.Typer( name="tmux", help="tmux commands", add_completion=False, ) -@tmux_app.command('attach') -def attach( + +@tmux_app.command("attach") +def tmux_cli_attach( ctx: typer.Context, workspace: Optional[str] = typer.Option( None, @@ -1492,15 +1556,107 @@ def attach( if not repo: console.print("[red]No repo selected. Exiting.[/red]") raise typer.Exit(1) - print(ws_dir, repo) - session_name = f'{ws_dir.name}|{repo.name}' - print(f'{ws_dir.name}|{repo.name}') + session_name = f"ω|{ws_dir.name}|{repo.name}" + print(f"Session name: {session_name}") create_if_needed_and_attach(session_name, repo, False) +@tmux_app.command("list-sessions") +def tmux_cli_list_sessions( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to show status for. " + "If omitted, uses the workspace containing the current directory." + ), + ), +): + if not_in_tmux(): + console.print("[red]Not in tmux. Exiting.[/red]") + raise typer.Exit(1) + + if not workspace: + tmux_sessions = [session for session in get_tmux_sessions() if "|" in session] + console.print(tmux_sessions) + return + + tmux_sessions = [ + session for session in get_tmux_sessions() if session.startswith(workspace) + ] + console.print(tmux_sessions) + + +@tmux_app.command("remove") +def tmux_cli_remove( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to show status for. " + "If omitted, uses the workspace containing the current directory." + ), + ), + dry_run: bool = typer.Option( + False, + "--dry-run", + "-d", + help="Don't actually remove the session.", + ), +): + if not_in_tmux(): + console.print("[red]Not in tmux. Exiting.[/red]") + raise typer.Exit(1) + + if workspace: + tmux_sessions = [ + session for session in get_tmux_sessions() if session.startswith(workspace) + ] + else: + tmux_sessions = [ + session for session in get_tmux_sessions() if session.startswith("ω") + ] + if not dry_run: + if not Confirm.ask( + f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n", + default=False, + ): + raise typer.Exit(1) + + if dry_run: + console.print(tmux_sessions) + return + + for session_name in tmux_sessions: + remove_session(session_name) + + # + # remove_session_if_exists(session_name) + + app.add_typer(tmux_app) + +@app.command("attach") +def attach( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to show status for. " + "If omitted, uses the workspace containing the current directory." + ), + ), +): + if in_tmux(): + tmux_cli_attach(ctx, workspace) + + if __name__ == "__main__": app() - -