From d5d081f74376fac9354c2c36c42dc1cba952d929 Mon Sep 17 00:00:00 2001 From: "Waylon S. Walker" Date: Tue, 25 Nov 2025 18:33:39 -0600 Subject: [PATCH 1/4] wip: feat: workspaces tmux support --- workspaces.py | 262 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 261 insertions(+), 1 deletion(-) diff --git a/workspaces.py b/workspaces.py index d8ae44a..b9ce726 100755 --- a/workspaces.py +++ b/workspaces.py @@ -10,6 +10,7 @@ # ] # /// +from __future__ import annotations import os import re import shutil @@ -18,7 +19,9 @@ from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Tuple + import typer + from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict from rich.console import Console @@ -233,7 +236,7 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]: return ws_dir.name, "" # First non-empty line must be '# ...' per spec - first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None) + first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None) if first_non_empty_idx is None: return ws_dir.name, "" @@ -1087,5 +1090,262 @@ def status_workspace( if desc: console.print(Panel(desc, title="Workspace description")) +def not_in_tmux() -> bool: + """Return True if not inside tmux or zellij.""" + return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ") + + +def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess: + """Run a tmux command.""" + env = os.environ.copy() + if clear_tmux_env: + env["TMUX"] = "" + return subprocess.run( + ["tmux", *args], + env=env, + check=False, + ) + + +def session_exists(session_name: str) -> bool: + """Check if the tmux session exists.""" + result = run_tmux(["has-session", "-t", f"={session_name}"]) + return result.returncode == 0 + + +def pick_project(base_dir: Path) -> Optional[str]: + """Use fzf to pick a subdirectory (project) from base_dir.""" + if not base_dir.is_dir(): + typer.echo(f"[ta] {base_dir} is not a directory.", err=True) + return None + + subdirs = sorted( + [p.name for p in base_dir.iterdir() if p.is_dir()] + ) + if not subdirs: + typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True) + return None + + if not shutil.which("fzf"): + typer.echo("[ta] fzf not found in PATH.", err=True) + return None + + proc = subprocess.run( + ["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"], + input="\n".join(subdirs), + text=True, + capture_output=True, + ) + + if proc.returncode != 0: + # Cancelled or failed + return None + + choice = proc.stdout.strip() + return choice or None + + +def create_detached_session( + session_name: str, + path_name: Path, + start_mode: bool, +) -> bool: + """ + Create a detached session. + + - If start_mode: just a single window. + - Else: split layout with nvim on top. + """ + # Run tmux as if not in tmux (clear TMUX env) + if start_mode: + r = run_tmux( + ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], + clear_tmux_env=True, + ) + return r.returncode == 0 + else: + r = run_tmux( + ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], + clear_tmux_env=True, + ) + if r.returncode != 0: + return False + + r = run_tmux( + [ + "split-window", + "-vb", + "-t", + session_name, + "-c", + str(path_name), + "-p", + "70", + ], + clear_tmux_env=True, + ) + if r.returncode != 0: + return False + + r = run_tmux( + [ + "send-keys", + "-t", + session_name, + "nvim '+Telescope find_files'", + "Enter", + ], + clear_tmux_env=True, + ) + return r.returncode == 0 + + +def create_if_needed_and_attach( + session_name: str, + path_name: Path, + start_mode: bool, +) -> bool: + """ + Mimic the bash logic: + + - If not in tmux: `tmux new-session -As ... -c path_name` + - Else: + - If session doesn't exist: create_detached_session() + - Then `tmux switch-client -t session_name` + """ + if not_in_tmux(): + r = run_tmux( + ["new-session", "-As", session_name, "-c", str(path_name)], + ) + return r.returncode == 0 + + # Inside tmux + if not session_exists(session_name): + if not create_detached_session(session_name, path_name, start_mode): + return False + + r = run_tmux(["switch-client", "-t", session_name]) + return r.returncode == 0 + + +def attach_to_first_session() -> None: + """Fallback: attach to first tmux session and open choose-tree.""" + # Attach to the first listed session + list_proc = subprocess.run( + ["tmux", "list-sessions", "-F", "#{session_name}"], + text=True, + capture_output=True, + ) + if list_proc.returncode != 0 or not list_proc.stdout.strip(): + typer.echo("[ta] No tmux sessions found to attach to.", err=True) + raise typer.Exit(1) + + first_session = list_proc.stdout.strip().splitlines()[0] + + attach_proc = run_tmux(["attach-session", "-t", first_session]) + if attach_proc.returncode != 0: + raise typer.Exit(attach_proc.returncode) + + # After attach, show choose-tree (this will run in the attached session) + run_tmux(["choose-tree", "-Za"]) + +tmux_app = typer.Typer( + name="tmux", + help="tmux commands", + add_completion=False, +) + +@tmux_app.command('attach') +def attach( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to show status for. " + "If omitted, uses the workspace containing the current directory." + ), + ), +): + """ + Attach or create a session for a repo in a workspace. + """ + # ws_dir = get_workspace_dir(workspace) + _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) + ws_dir = find_workspace_dir(workspaces_dir, workspace) + if not ws_dir: + console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]") + raise typer.Exit(1) + # pick repo in workspace + repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()]) + if not repo: + console.print("[red]No repo selected. Exiting.[/red]") + raise typer.Exit(1) + print(ws_dir, repo) + session_name = f'{ws_dir.name}|{repo.name}' + print(f'{ws_dir.name}|{repo.name}') + create_if_needed_and_attach(session_name, repo, False) + + +app.add_typer(tmux_app) + +# @app.command() +# def main( +# dir: Optional[Path] = typer.Argument( +# None, +# help="Base directory containing projects. If omitted, auto-attach or --start behavior.", +# ), +# start: bool = typer.Option( +# False, +# "--start", +# help="Start a new session based on the current directory.", +# ), +# ): +# # Replicate initial "DIR=$1" and attach-or-start logic +# # If no args and no --start +# if dir is None and not start: +# if not_in_tmux(): +# # Try to attach to an existing session +# attach_proc = run_tmux(["attach"]) +# if attach_proc.returncode == 0: +# # Successfully attached; exit like the bash script +# raise typer.Exit(1) +# # If attach failed, fall through to start mode +# start = True +# else: +# # In tmux and no dir/start: nothing to do +# raise typer.Exit(1) +# +# # Figure out session_name and path_name +# if start: +# path_name = Path.cwd() +# session_name = path_name.name.replace(".", "_") +# else: +# if dir is None: +# typer.echo("[ta] DIR argument is required unless --start is used.", err=True) +# raise typer.Exit(1) +# +# dir = dir.expanduser().resolve() +# project = pick_project(dir) +# if not project: +# # cancelled or error +# raise typer.Exit(1) +# +# session_name = project.replace(".", "_") +# path_name = (dir / project).resolve() +# +# typer.echo(f'session name is "{session_name}"') +# typer.echo(f"path name is {path_name}") +# +# if not session_name: +# raise typer.Exit(1) +# +# # Try main attach/create flow; on failure, fall back +# ok = create_if_needed_and_attach(session_name, path_name, start_mode=start) +# if not ok: +# attach_to_first_session() +# + if __name__ == "__main__": app() From 469845020002650e6395febcd0e71eb413430c29 Mon Sep 17 00:00:00 2001 From: "Waylon S. Walker" Date: Tue, 25 Nov 2025 18:51:09 -0600 Subject: [PATCH 2/4] make commit command with message flag --- workspaces.py | 103 ++++++++++++++++++++------------------------------ 1 file changed, 42 insertions(+), 61 deletions(-) diff --git a/workspaces.py b/workspaces.py index b9ce726..4545ec5 100755 --- a/workspaces.py +++ b/workspaces.py @@ -868,7 +868,6 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis return selected_files - @app.command("wip") def wip_workspace( ctx: typer.Context, @@ -895,9 +894,48 @@ def wip_workspace( if not ws_dir.exists(): console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) - title, _desc = read_workspace_readme(ws_dir) commit_message = f"wip: {title or ws_dir.name}" + commit_workspace(ctx, workspace, message=commit_message) + +@app.command("commit") +def commit_workspace( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to WIP-commit. " + "If omitted, uses the workspace containing the current directory." + ), + ), + message: Optional[str] = typer.Option( + None, + "--message", + "-m", + help="Commit message to use.", + ), +): + """ + For each repo in the workspace: + + - Show list of changed files. + - Ask whether to stage all, none, or pick some files. + - Stage chosen files. + """ + if not message: + console.print( + "[red]No commit message provided. Exiting.[/red]", + file=sys.stderr, + ) + raise typer.Exit(1) + _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) + ws_dir = find_workspace_dir(workspaces_dir, workspace) + if not ws_dir.exists(): + console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + raise typer.Exit(1) + title, _desc = read_workspace_readme(ws_dir) worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): @@ -938,14 +976,14 @@ def wip_workspace( continue # Commit - code, _out, err = run_cmd(["git", "commit", "-m", commit_message], cwd=wt) + code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt) if code != 0: console.print( f"[red]Failed to commit in {wt.name}:[/red]\n{err}" ) else: console.print( - f" [green]Created WIP commit in {wt.name}:[/green] '{commit_message}'" + f" [green]Created WIP commit in {wt.name}:[/green] '{message}'" ) @@ -1290,62 +1328,5 @@ def attach( app.add_typer(tmux_app) -# @app.command() -# def main( -# dir: Optional[Path] = typer.Argument( -# None, -# help="Base directory containing projects. If omitted, auto-attach or --start behavior.", -# ), -# start: bool = typer.Option( -# False, -# "--start", -# help="Start a new session based on the current directory.", -# ), -# ): -# # Replicate initial "DIR=$1" and attach-or-start logic -# # If no args and no --start -# if dir is None and not start: -# if not_in_tmux(): -# # Try to attach to an existing session -# attach_proc = run_tmux(["attach"]) -# if attach_proc.returncode == 0: -# # Successfully attached; exit like the bash script -# raise typer.Exit(1) -# # If attach failed, fall through to start mode -# start = True -# else: -# # In tmux and no dir/start: nothing to do -# raise typer.Exit(1) -# -# # Figure out session_name and path_name -# if start: -# path_name = Path.cwd() -# session_name = path_name.name.replace(".", "_") -# else: -# if dir is None: -# typer.echo("[ta] DIR argument is required unless --start is used.", err=True) -# raise typer.Exit(1) -# -# dir = dir.expanduser().resolve() -# project = pick_project(dir) -# if not project: -# # cancelled or error -# raise typer.Exit(1) -# -# session_name = project.replace(".", "_") -# path_name = (dir / project).resolve() -# -# typer.echo(f'session name is "{session_name}"') -# typer.echo(f"path name is {path_name}") -# -# if not session_name: -# raise typer.Exit(1) -# -# # Try main attach/create flow; on failure, fall back -# ok = create_if_needed_and_attach(session_name, path_name, start_mode=start) -# if not ok: -# attach_to_first_session() -# - if __name__ == "__main__": app() From 0ee5711dcd7d536dcca742ea30d6c45d5b350327 Mon Sep 17 00:00:00 2001 From: "Waylon S. Walker" Date: Tue, 25 Nov 2025 19:12:44 -0600 Subject: [PATCH 3/4] status and diff commands --- workspaces.py | 103 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 99 insertions(+), 4 deletions(-) diff --git a/workspaces.py b/workspaces.py index 4545ec5..a9bb772 100755 --- a/workspaces.py +++ b/workspaces.py @@ -983,7 +983,7 @@ def commit_workspace( ) else: console.print( - f" [green]Created WIP commit in {wt.name}:[/green] '{message}'" + f" [green]Created commit in {wt.name}:[/green] '{message}'" ) @@ -1091,8 +1091,12 @@ def status_workspace( title, desc = read_workspace_readme(ws_dir) + # if desc: + # console.print(Panel(desc, title=title)) + table = Table( - title=f"Status for workspace '{title}' (dir: {ws_dir.name})", + title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", + title_justify="left", show_lines=False, ) table.add_column("Repo (dir)", style="bold") @@ -1100,6 +1104,11 @@ def status_workspace( table.add_column("Ahead/Behind/Dirty") table.add_column("Changed Files", justify="right") + files_table = Table(show_lines=False) + files_table.add_column("Repo (dir)", style="bold") + files_table.add_column("File", justify="left") + files_table.add_column("Status", justify="right") + worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": @@ -1123,10 +1132,96 @@ def status_workspace( str(len(changes)), ) + if changes: + for f, s in changes: + files_table.add_row(wt.name, f, s) + console.print(table) - if desc: - console.print(Panel(desc, title="Workspace description")) + console.print("\nFiles with changes:") + if files_table.rows: + console.print(files_table) + +@app.command("diff") +def diff_workspace( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to diff. " + "If omitted, uses the workspace containing the current directory." + ), + ), + staged: bool = typer.Option( + False, + "--staged", + "-s", + help="Show only staged changes (git diff --cached).", + ), +): + """ + Show git diff for all repos in the workspace. + + - If --staged: uses `git diff --cached` (only staged changes). + - Otherwise: uses `git diff` (staged + unstaged). + """ + _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) + ws_dir = find_workspace_dir(workspaces_dir, workspace) + if not ws_dir.exists(): + console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + raise typer.Exit(1) + + title, _desc = read_workspace_readme(ws_dir) + console.print( + f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})" + ) + + # Collect worktrees (repos) in this workspace + worktrees: List[Path] = [] + for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): + if child.name.lower() == "readme.md": + continue + if not ensure_git_repo(child): + continue + worktrees.append(child) + + if not worktrees: + console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]") + raise typer.Exit(0) + + any_diffs = False + + for wt in worktrees: + # Quick check: skip repos with no changes at all + changes = git_status_porcelain(wt) + if not changes: + continue + + any_diffs = True + console.rule(f"[bold]{wt.name}[/bold]") + + cmd = ["git", "diff"] + if staged: + cmd.append("--cached") + + code, out, err = run_cmd(cmd, cwd=wt) + if code != 0: + console.print( + f"[red]Failed to get diff for {wt.name}:[/red]\n{err}" + ) + continue + + if not out.strip(): + console.print("[dim]No diff output.[/dim]") + else: + # Use markup=False so diff characters like [ ] don't get eaten by Rich + console.print(out, markup=False) + + if not any_diffs: + console.print("[green]No changes to diff in any repo.[/green]") + def not_in_tmux() -> bool: """Return True if not inside tmux or zellij.""" From 42d034eef2f5e00f5b4fc89e3b29fa727278208d Mon Sep 17 00:00:00 2001 From: "Waylon S. Walker" Date: Tue, 25 Nov 2025 19:17:57 -0600 Subject: [PATCH 4/4] Syntax theme diff --- workspaces.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/workspaces.py b/workspaces.py index a9bb772..f027dd7 100755 --- a/workspaces.py +++ b/workspaces.py @@ -28,6 +28,8 @@ from rich.console import Console from rich.table import Table from rich.prompt import Prompt, Confirm from rich.panel import Panel +from rich.syntax import Syntax +from rich.panel import Panel from iterfzf import iterfzf app = typer.Typer( @@ -1163,12 +1165,11 @@ def diff_workspace( ): """ Show git diff for all repos in the workspace. - - - If --staged: uses `git diff --cached` (only staged changes). - - Otherwise: uses `git diff` (staged + unstaged). + Uses rich Syntax highlighter for diffs. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) + if not ws_dir.exists(): console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) @@ -1178,7 +1179,7 @@ def diff_workspace( f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})" ) - # Collect worktrees (repos) in this workspace + # Collect repos worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": @@ -1194,12 +1195,13 @@ def diff_workspace( any_diffs = False for wt in worktrees: - # Quick check: skip repos with no changes at all + # Skip clean repos changes = git_status_porcelain(wt) if not changes: continue any_diffs = True + console.rule(f"[bold]{wt.name}[/bold]") cmd = ["git", "diff"] @@ -1215,14 +1217,22 @@ def diff_workspace( if not out.strip(): console.print("[dim]No diff output.[/dim]") - else: - # Use markup=False so diff characters like [ ] don't get eaten by Rich - console.print(out, markup=False) + continue + + syntax = Syntax( + out, + "diff", + theme="nord", + line_numbers=False, + word_wrap=False, + ) + + # Wrap each repo diff in a Panel for clarity + console.print(Panel(syntax, title=wt.name, border_style="cyan")) if not any_diffs: console.print("[green]No changes to diff in any repo.[/green]") - def not_in_tmux() -> bool: """Return True if not inside tmux or zellij.""" return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ")