#!/usr/bin/env -S uv run --quiet --script # /// script # requires-python = ">=3.12" # dependencies = [ # "typer", # "rich", # "pydantic", # "pydantic-settings", # "iterfzf", # ] # /// from __future__ import annotations import os import re import shutil import subprocess from dataclasses import dataclass from pathlib import Path import typer from iterfzf import iterfzf from pydantic import Field from pydantic_settings import BaseSettings from pydantic_settings import SettingsConfigDict from rich.console import Console from rich.panel import Panel from rich.prompt import Confirm from rich.prompt import Prompt from rich.syntax import Syntax from rich.table import Table app = typer.Typer( help="Workspace management tool", invoke_without_command=True, ) console = Console() # --------------------------------------------------------------------------- # Settings # --------------------------------------------------------------------------- class Settings(BaseSettings): """Global configuration for workspaces. Resolution for workspaces_name: 1. Command-line flag --workspaces-name 2. Environment variable WORKSPACES_NAME 3. Default "git" repos_dir = ~/workspaces_name workspaces_dir = ~/workspaces_name + ".workspaces" """ workspaces_name: str = Field( default="git", description="Logical name of the workspace group (e.g. 'git', 'work', 'personal').", ) # pydantic v2-style config model_config = SettingsConfigDict( env_prefix="", env_file=None, env_nested_delimiter="__", extra="ignore", ) @classmethod def from_env_and_override( cls, override_workspaces_name: str | None ) -> Settings: """Construct settings honoring: 1. CLI override 2. WORKSPACES_NAME env 3. default "git" """ s = cls() if override_workspaces_name is not None: s.workspaces_name = override_workspaces_name env_val = os.getenv("WORKSPACES_NAME") if env_val and override_workspaces_name is None: s.workspaces_name = env_val return s def resolve_paths(workspaces_name: str | None) -> tuple[Settings, Path, Path]: """Build Settings and derived paths, honoring CLI override of workspaces_name. """ base_settings = Settings.from_env_and_override(workspaces_name) name = base_settings.workspaces_name repos_dir = Path(f"~/{name}").expanduser().resolve() workspaces_dir = Path(f"~/{name}.workspaces").expanduser().resolve() return base_settings, repos_dir, workspaces_dir # --------------------------------------------------------------------------- # Models / helpers # --------------------------------------------------------------------------- @dataclass class GitStatus: ahead: int = 0 behind: int = 0 dirty: bool = False @property def indicator(self) -> str: """Build ASCII indicator: - clean: "·" - ahead 1: "↑1" - behind 2: "↓2" - both ahead/behind: "↑1 ↓2" - add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*" """ parts: list[str] = [] if self.ahead: parts.append(f"↑{self.ahead}") if self.behind: parts.append(f"↓{self.behind}") base = " ".join(parts) if parts else "·" if self.dirty: base += "*" return base def get_git_status(repo_path: Path) -> GitStatus: """Get ahead/behind and dirty info for a Git repo. Uses `git status --porcelain=v2 --branch` and parses: - '# branch.ab +A -B' for ahead/behind - any non-comment line for dirty """ try: out = subprocess.check_output( ["git", "status", "--porcelain=v2", "--branch"], cwd=repo_path, stderr=subprocess.DEVNULL, text=True, ) except Exception: return GitStatus() ahead = 0 behind = 0 dirty = False for line in out.splitlines(): if line.startswith("# branch.ab"): # Example: # branch.ab +1 -2 m = re.search(r"\+(\d+)\s+-(\d+)", line) if m: ahead = int(m.group(1)) behind = int(m.group(2)) elif not line.startswith("#"): dirty = True return GitStatus(ahead=ahead, behind=behind, dirty=dirty) def get_current_branch(repo_path: Path) -> str | None: try: out = subprocess.check_output( ["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=repo_path, stderr=subprocess.DEVNULL, text=True, ) return out.strip() except Exception: return None def ensure_git_repo(path: Path) -> bool: # Works for repos and worktrees (.git file or dir) return (path / ".git").exists() def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]: proc = subprocess.Popen( cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) out, err = proc.communicate() return proc.returncode, out, err def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path: """Resolve the directory of a workspace. - If workspace_name given: use workspaces_dir / workspace_name - Else: use current working directory (must be inside workspaces_dir). """ if workspace_name: return workspaces_dir / workspace_name cwd = Path.cwd().resolve() try: cwd.relative_to(workspaces_dir) except ValueError as e: workspace_root = pick_workspace_with_iterfzf(list_workspaces(workspaces_dir)) if workspace_root is None: console.print("[red]No workspace selected. Exiting.[/red]") raise typer.Exit(code=1) from e return workspace_root # The top-level workspace directory is the first component under workspaces_dir rel = cwd.relative_to(workspaces_dir) workspace_root = workspaces_dir / rel.parts[0] console.print(f"Using workspace: {workspace_root}") return workspace_root def read_workspace_readme(ws_dir: Path) -> tuple[str, str]: """Return (name_from_h1, description_from_rest_of_file). If file missing or malformed, fallback appropriately. """ readme = ws_dir / "readme.md" if not readme.exists(): name = ws_dir.name return name, "" text = readme.read_text(encoding="utf-8") lines = text.splitlines() if not lines: return ws_dir.name, "" # First non-empty line must be '# ...' per spec first_non_empty_idx = next( (i for i, line in enumerate(lines) if line.strip()), None ) if first_non_empty_idx is None: return ws_dir.name, "" first_line = lines[first_non_empty_idx].strip() if not first_line.startswith("# "): # Fallback return ws_dir.name, "\n".join(lines[first_non_empty_idx + 1 :]).strip() name = first_line[2:].strip() desc = "\n".join(lines[first_non_empty_idx + 1 :]).strip() return name, desc def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None: ws_dir.mkdir(parents=True, exist_ok=True) content = f"# {name}\n\n{description}\n" (ws_dir / "readme.md").write_text(content, encoding="utf-8") # --- name/branch helpers ---------------------------------------------------- def slugify_workspace_name(name: str) -> str: """Turn arbitrary workspace name into a safe directory/worktree name. - Lowercase - Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later) - Replace spaces with '-' - Replace any char not [a-z0-9._/-] with '-' - Collapse multiple '-' and strip leading/trailing '-' and '/' """ name = name.replace(":", "-") name = name.replace(" ", "-") safe = [] for ch in name: if re.match(r"[a-zA-Z0-9._/-]", ch): safe.append(ch.lower()) else: safe.append("-") s = "".join(safe) s = re.sub(r"-+", "-", s) s = s.strip("-/") return s or "workspace" def branch_name_for_workspace(name: str) -> str: """Compute branch name from workspace name. Rules: - Start from slugified name (no spaces/specials). - Replace *first* '-' with '/' so: 'fix-my-issue' -> 'fix/my-issue' (original 'fix:my issue' -> slug 'fix-my-issue' -> branch 'fix/my-issue') """ base = slugify_workspace_name(name) if "-" in base: return base.replace("-", "/", 1) return base # --------------------------------------------------------------------------- # Commands / main callback # --------------------------------------------------------------------------- @app.callback() def main( ctx: typer.Context, workspaces_name: str | None = typer.Option( None, "--workspaces-name", "-W", help=( "Logical name for this workspace set (e.g. 'git', 'work', 'personal'). " "Overrides WORKSPACES_NAME env. Defaults to 'git'." ), ), ): """Manage workspaces and associated Git worktrees. If no command is given, this will list workspaces. """ settings, repos_dir, workspaces_dir = resolve_paths(workspaces_name) ctx.obj = { "settings": settings, "repos_dir": repos_dir, "workspaces_dir": workspaces_dir, } # Default behavior when no subcommand is provided: if ctx.invoked_subcommand is None: cli_list_workspaces(ctx) raise typer.Exit(0) def get_ctx_paths(ctx: typer.Context) -> tuple[Settings, Path, Path]: obj = ctx.obj or {} return obj["settings"], obj["repos_dir"], obj["workspaces_dir"] # ---------------- list-workspaces ---------------- def list_workspaces(workspaces_dir: Path): workspaces_dir.mkdir(parents=True, exist_ok=True) return sorted(p for p in workspaces_dir.iterdir() if p.is_dir()) @app.command("list") @app.command("ls", hidden=True) def cli_list_workspaces( ctx: typer.Context, ): """List all workspaces. Shows: - workspace directory name - workspace description (from README markdown, everything after H1) - included repos with git status indicators """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) workspaces_dir.mkdir(parents=True, exist_ok=True) table = Table(title=f"Workspaces ({workspaces_dir})") table.add_column("Workspace (dir)", style="bold") table.add_column("Title", overflow="fold") table.add_column("Description", overflow="fold") table.add_column("Repos", overflow="fold") if not workspaces_dir.exists(): console.print(f"[yellow]No workspaces_dir found at {workspaces_dir}[/yellow]") raise typer.Exit(0) for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()): title, desc = read_workspace_readme(ws) repos: list[str] = [] for child in sorted(p for p in ws.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue status = get_git_status(child) branch = get_current_branch(child) or "?" indicator = status.indicator repos.append(f"{child.name} [{branch}] {indicator}") repos_str = "\n".join(repos) if repos else "-" table.add_row(ws.name, title or "-", desc or "-", repos_str) console.print(table) # ---------------- create-workspace ---------------- @app.command("create") @app.command("new", hidden=True) def create_workspace( ctx: typer.Context, name: str | None = typer.Option( None, "--name", "-n", help="Name of the new workspace (display name; can contain spaces).", ), description: str | None = typer.Option( None, "--description", "-d", help="Description of the workspace. Will be written into readme.md.", ), ): """Create a new workspace. - Asks for name and description if not provided. - Workspace directory uses a slugified version of the name. - README uses the original name as '# \\n\\n'. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) workspaces_dir.mkdir(parents=True, exist_ok=True) if not name: name = Prompt.ask("Workspace name (can contain spaces)") if not description: description = Prompt.ask("Workspace description", default="") dir_name = slugify_workspace_name(name) ws_dir = workspaces_dir / dir_name if ws_dir.exists(): console.print( f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]" ) raise typer.Exit(1) write_workspace_readme(ws_dir, name, description) console.print( f"[green]Created workspace[/green] title='{name}' dir='{ws_dir.name}' at {ws_dir}" ) # ---------------- list-repos ---------------- @app.command("list-repos") def list_repos( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to inspect. " "If omitted, uses the workspace containing the current directory." ), ), ): """List repos and branches in the current (or specified) workspace. Shows: - repo directory name - current branch - ahead/behind/dirty indicators """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) table = Table(title=f"Repos in workspace '{title}' (dir: {ws_dir.name})") table.add_column("Repo (dir)", style="bold") table.add_column("Branch") table.add_column("Status") for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue branch = get_current_branch(child) or "?" status = get_git_status(child) table.add_row(child.name, branch, status.indicator) console.print(table) # ---------------- add-repo ---------------- def list_all_repos(repos_dir: Path) -> list[Path]: """List all directories in repos_dir that appear to be git repos. """ if not repos_dir.exists(): return [] repos = [] for p in sorted(repos_dir.iterdir()): if p.is_dir() and ensure_git_repo(p): repos.append(p) return repos def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None: """Use iterfzf (Python library) to pick a repo from a list of paths. """ if not repos: return None names = [r.name for r in repos] if len(names) == 1: return repos[0] elif not names: console.print("[red]No repos found.[/red]") console.print("Use 'workspaces add' to add repos.") return None choice = iterfzf(names, prompt="pick a repo> ") if not choice: return None for r in repos: if r.name == choice: return r return None def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None: """Use iterfzf (Python library) to pick a workspace from a list of paths. """ names = [w.name for w in workspaces] if not names: return None choice = iterfzf(names, prompt="pick a workspace> ") if not choice: return None for w in workspaces: if w.name == choice: return w return None @app.command("add-repo") @app.command("add", hidden=True) def add_repo( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to add repo to. " "If omitted, uses the workspace containing the current directory." ), ), repo_name: str | None = typer.Option( None, "--repo", "-r", help=( "Name of repo (directory under repos_dir). " "If omitted, uses iterfzf to pick from repos_dir." ), ), ): """Add a repo to a workspace. - Lists all directories in repos_dir as repos. - Uses iterfzf to pick repo if --repo not given. - Creates a worktree for a branch derived from the workspace name into workspace_dir / repo_name. """ _settings, repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) # Use display title from README to derive branch name title, _desc = read_workspace_readme(ws_dir) branch = branch_name_for_workspace(title or ws_dir.name) all_repos = list_all_repos(repos_dir) if not all_repos: console.print(f"[red]No git repos found in {repos_dir}[/red]") raise typer.Exit(1) repo_path: Path | None = None if repo_name: for r in all_repos: if r.name == repo_name: repo_path = r break if repo_path is None: console.print( f"[red]Repo '{repo_name}' not found in {repos_dir}. " "Use --repo with a valid name or omit to use iterfzf.[/red]" ) raise typer.Exit(1) else: repo_path = pick_repo_with_iterfzf(all_repos) if repo_path is None: console.print("[yellow]No repo selected.[/yellow]") raise typer.Exit(0) target_dir = ws_dir / repo_path.name if target_dir.exists(): console.print( f"[yellow]Directory {target_dir} already exists. " "Assuming repo already added.[/yellow]" ) raise typer.Exit(0) # Ensure branch exists or create it code, _out, err = run_cmd(["git", "rev-parse", "--verify", branch], cwd=repo_path) if code != 0: # create branch from current HEAD console.print( f"[yellow]Branch '{branch}' does not exist in {repo_path.name}; creating it.[/yellow]" ) code, _out, err = run_cmd(["git", "branch", branch], cwd=repo_path) if code != 0: console.print( f"[red]Failed to create branch '{branch}' in {repo_path.name}:[/red]\n{err}" ) raise typer.Exit(1) # Create worktree (dir name is safe because repo_path.name is) ws_dir.mkdir(parents=True, exist_ok=True) code, _out, err = run_cmd( ["git", "worktree", "add", str(target_dir), branch], cwd=repo_path, ) if code != 0: console.print( f"[red]Failed to create worktree for repo {repo_path.name} " f"on branch '{branch}' into {target_dir}:[/red]\n{err}" ) raise typer.Exit(1) console.print( f"[green]Added repo[/green] {repo_path.name} " f"to workspace [bold]{title or ws_dir.name}[/bold] " f"(dir: {ws_dir.name}) on branch '{branch}' at {target_dir}" ) # ---------------- rm-workspace ---------------- def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None: """Try to find the parent repo for a worktree, assuming it lives in repos_dir with the same directory name as the worktree. """ candidate = repos_dir / worktree_path.name if candidate.exists() and ensure_git_repo(candidate): return candidate return None def has_unpushed_commits(repo_path: Path) -> bool: """Detect if branch has commits not on its upstream. Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits. """ # Check if upstream exists code, _, _ = run_cmd( ["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], cwd=repo_path, ) if code != 0: # No upstream configured; treat as unpushed work. return True code, out, _ = run_cmd( ["git", "rev-list", "@{u}..HEAD", "--count"], cwd=repo_path, ) if code != 0: return True try: count = int(out.strip() or "0") except ValueError: return True return count > 0 def is_branch_integrated_into_main( repo_path: Path, branch: str, main_ref: str = "origin/main", ) -> bool: """Heuristic: is `branch`'s content already in `main_ref`? Returns True if: - branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR - the branch tip's tree hash appears somewhere in main_ref's history (common for squash merges and cherry-picks). This does not know about PRs, only Git history. """ # Make sure we have latest main (best-effort; ignore fetch errors) _code, _out, _err = run_cmd(["git", "fetch", "--quiet", "origin"], cwd=repo_path) # 1) Simple case: branch is ancestor of main_ref code, _out, _err = run_cmd( ["git", "merge-base", "--is-ancestor", branch, main_ref], cwd=repo_path, ) if code == 0: return True # 2) Squash / cherry-pick heuristic: same tree exists on main_ref code, out, err = run_cmd( ["git", "show", "-s", "--format=%T", branch], cwd=repo_path, ) if code != 0: console.print( f"[red]Failed to get tree for branch {branch} in {repo_path.name}:[/red]\n{err}" ) return False branch_tree = out.strip() if not branch_tree: return False code, out, err = run_cmd( ["git", "log", "--format=%T", main_ref], cwd=repo_path, ) if code != 0: console.print( f"[red]Failed to walk {main_ref} in {repo_path.name}:[/red]\n{err}" ) return False main_trees = {line.strip() for line in out.splitlines() if line.strip()} return branch_tree in main_trees @app.command("rm") def remove_workspace( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to remove. " "If omitted, uses the workspace containing the current directory." ), ), force: bool = typer.Option( False, "--force", "-f", help="Force removal even if there is dirty or unpushed work.", ), main_ref: str = typer.Option( "origin/main", "--main-ref", help="Ref to consider as the integration target (default: origin/main).", ), ): """Remove a workspace: - For each repo worktree in the workspace: * Check for dirty work or unpushed commits. * If any found and not --force, abort. * Otherwise, run 'git worktree remove'. - Finally, delete the workspace directory. """ _settings, repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) # Collect worktrees (subdirs that look like git repos, excluding readme.md) worktrees: list[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue worktrees.append(child) if not worktrees: # No worktrees, just remove workspace dir if not Confirm.ask( f"[red]No worktrees found.[/red] Remove empty workspace " f"[bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})?", default=False, ): raise typer.Exit(0) shutil.rmtree(ws_dir) console.print(f"[green]Removed workspace[/green] {ws_dir}") raise typer.Exit(0) # Check for dirty / unpushed changes problems: list[str] = [] for wt in worktrees: status = get_git_status(wt) branch = get_current_branch(wt) or "?" repo = find_repo_for_worktree(wt, repos_dir) integrated = False if repo is not None and branch != "?": integrated = is_branch_integrated_into_main(wt, branch, main_ref=main_ref) if status.dirty: problems.append(f"{wt.name}: dirty working tree on '{branch}'") # Only care about "unpushed commits" if the branch is NOT integrated into main. if repo is not None and branch != "?" and not integrated and has_unpushed_commits(wt): problems.append(f"{wt.name}: unpushed commits on '{branch}'") if problems and not force: console.print( "[red]Refusing to remove workspace; found dirty or unpushed work:[/red]" ) for p in problems: console.print(f" - {p}") console.print( "\nUse [bold]--force[/bold] to remove the workspace and its worktrees anyway." ) raise typer.Exit(1) if not Confirm.ask( f"Remove workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name}) " "and clean up its worktrees?", default=False, ): raise typer.Exit(0) # Remove worktrees via git for wt in worktrees: repo = find_repo_for_worktree(wt, repos_dir) display_name = wt.name if repo is None: console.print( f"[yellow]Could not find parent repo for worktree {display_name}; " "deleting directory directly.[/yellow]" ) shutil.rmtree(wt) continue console.print(f"Removing worktree for [bold]{display_name}[/bold]...") args = ["git", "worktree", "remove"] if force: args.append("--force") args.append(str(wt)) code, _out, err = run_cmd(args, cwd=repo) if code != 0: console.print( f"[red]Failed to remove worktree {display_name} via git:[/red]\n{err}" ) # As a last resort, if force is given, nuke the dir. if force: console.print( f"[yellow]Forcing directory removal of {wt} despite git error.[/yellow]" ) shutil.rmtree(wt) elif wt.exists(): # git worktree remove *should* delete the directory; if not, clean up. shutil.rmtree(wt) if in_tmux(): session_name = get_session_name(ws_dir, repo) remove_session_if_exists(session_name) # Finally, remove the workspace directory itself shutil.rmtree(ws_dir) console.print( f"[green]Removed workspace[/green] title='{title or ws_dir.name}' dir='{ws_dir.name}'" ) # ---------------- wip (stage + commit) ---------------- def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]: """Return a list of (status_code, path) for changes in the repo/worktree. Uses 'git status --porcelain'. """ code, out, err = run_cmd(["git", "status", "--porcelain"], cwd=repo_path) if code != 0: console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}") return [] changes: list[tuple[str, str]] = [] for line in out.splitlines(): if not line.strip(): continue status = line[:2] path = line[3:] changes.append((status, path)) return changes def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> list[str]: """Interactively choose which files to stage for WIP. - Show list of changed files. - Ask user: * stage all * stage none * pick some using iterfzf (multi-select) """ if not changes: return [] files = [p for _s, p in changes] console.print( Panel( "\n".join(files), title=f"Changed files in {repo_path.name}", subtitle="Use 'all', 'none', or 'pick' when prompted", ) ) choice = Prompt.ask( "Stage which files?", choices=["all", "none", "pick"], default="all", ) if choice == "none": return [] if choice == "all": return files # Multi-select via iterfzf selected = iterfzf(files, multi=True, prompt="pick files> ") if not selected: return [] # iterfzf may return a single string or list depending on version; # normalize to list of strings. return [selected] if isinstance(selected, str) else list(selected) @app.command("wip") def wip_workspace( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to WIP-commit. " "If omitted, uses the workspace containing the current directory." ), ), ): """For each repo in the workspace: - Show list of changed files. - Ask whether to stage all, none, or pick some files. - Stage chosen files. - Make commit: 'wip: '. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) commit_message = f"wip: {title or ws_dir.name}" commit_workspace(ctx, workspace, message=commit_message) @app.command("commit") def commit_workspace( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to WIP-commit. " "If omitted, uses the workspace containing the current directory." ), ), message: str | None = typer.Option( None, "--message", "-m", help="Commit message to use.", ), ): """For each repo in the workspace: - Show list of changed files. - Ask whether to stage all, none, or pick some files. - Stage chosen files. """ if not message: console.print( "[red]No commit message provided. Exiting.[/red]", ) raise typer.Exit(1) _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) worktrees: list[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue worktrees.append(child) if not worktrees: console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]") raise typer.Exit(0) for wt in worktrees: console.print(f"\n[bold]Repo:[/bold] {wt.name}") changes = git_status_porcelain(wt) if not changes: console.print(" [green]No changes.[/green]") continue files_to_stage = choose_files_for_wip(wt, changes) if not files_to_stage: console.print(" [yellow]Nothing selected to stage.[/yellow]") continue # Stage selected files for f in files_to_stage: code, _out, err = run_cmd(["git", "add", f], cwd=wt) if code != 0: console.print(f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}") # Check if there is anything to commit code, out, _err = run_cmd(["git", "diff", "--cached", "--name-only"], cwd=wt) if code != 0 or not out.strip(): console.print(" [yellow]No staged changes to commit.[/yellow]") continue # Commit code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt) if code != 0: console.print(f"[red]Failed to commit in {wt.name}:[/red]\n{err}") else: console.print(f" [green]Created commit in {wt.name}:[/green] '{message}'") # ---------------- push ---------------- @app.command("push") def push_workspace( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to push. " "If omitted, uses the workspace containing the current directory." ), ), remote: str = typer.Option( "origin", "--remote", "-r", help="Remote name to push to (default: origin).", ), ): """For each repo in the workspace, run 'git push '. Skips repos with no current branch or when push fails. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) console.print( f"Pushing all repos in workspace [bold]{title or ws_dir.name}[/bold] " f"(dir: {ws_dir.name}) to remote '{remote}'" ) worktrees: list[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue worktrees.append(child) if not worktrees: console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]") raise typer.Exit(0) for wt in worktrees: branch = get_current_branch(wt) if not branch or branch == "HEAD": console.print( f"[yellow]Skipping {wt.name}: no current branch (detached HEAD?).[/yellow]" ) continue console.print(f"\n[bold]Repo:[/bold] {wt.name} [bold]Branch:[/bold] {branch}") code, _out, err = run_cmd(["git", "push", remote, branch], cwd=wt) if code != 0: console.print( f"[red]Failed to push {wt.name} ({branch}) to {remote}:[/red]\n{err}" ) else: console.print(f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]") # ---------------- status (current workspace) ---------------- @app.command("status") def status_workspace( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to show status for. " "If omitted, uses the workspace containing the current directory." ), ), ): """Show detailed status for the current (or specified) workspace. For each repo in the workspace: - repo directory name - branch - ahead/behind/dirty indicator - number of changed files """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) title, desc = read_workspace_readme(ws_dir) table = Table( title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", title_justify="left", show_lines=False, ) table.add_column("Repo (dir)", style="bold") table.add_column("Branch") table.add_column("Ahead/Behind/Dirty") table.add_column("Changed Files", justify="right") files_table = Table(show_lines=False) files_table.add_column("Repo (dir)", style="bold") files_table.add_column("File", justify="left") files_table.add_column("Status", justify="right") worktrees: list[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue worktrees.append(child) if not worktrees: console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]") raise typer.Exit(0) for wt in worktrees: branch = get_current_branch(wt) or "?" status = get_git_status(wt) changes = git_status_porcelain(wt) table.add_row( wt.name, branch, status.indicator, str(len(changes)), ) if changes: for f, s in changes: files_table.add_row(wt.name, f, s) console.print(table) console.print("\nFiles with changes:") if files_table.rows: console.print(files_table) @app.command("diff") def diff_workspace( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to diff. " "If omitted, uses the workspace containing the current directory." ), ), staged: bool = typer.Option( False, "--staged", "-s", help="Show only staged changes (git diff --cached).", ), ): """Show git diff for all repos in the workspace. Uses rich Syntax highlighter for diffs. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir.exists(): console.print( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) console.print( f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})" ) # Collect repos worktrees: list[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue worktrees.append(child) if not worktrees: console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]") raise typer.Exit(0) any_diffs = False for wt in worktrees: # Skip clean repos changes = git_status_porcelain(wt) if not changes: continue any_diffs = True console.rule(f"[bold]{wt.name}[/bold]") cmd = ["git", "diff"] if staged: cmd.append("--cached") code, out, err = run_cmd(cmd, cwd=wt) if code != 0: console.print(f"[red]Failed to get diff for {wt.name}:[/red]\n{err}") continue if not out.strip(): console.print("[dim]No diff output.[/dim]") continue syntax = Syntax( out, "diff", theme="nord", line_numbers=False, word_wrap=False, ) # Wrap each repo diff in a Panel for clarity console.print(Panel(syntax, title=wt.name, border_style="cyan")) if not any_diffs: console.print("[green]No changes to diff in any repo.[/green]") def get_branches_merged(repo_dir: Path, remote: bool = False, main_branch: str="main") -> list[str]: cmd = ["git", "branch", "--merged", main_branch] if remote: cmd.append("--remotes") branches = run_cmd(cmd, cwd=repo_dir) current_branch = get_current_branch(repo_dir) branches = [b.strip() for b in branches[1].splitlines() if not b.startswith("*") and not b.startswith("+")] branches = [ b for b in branches if b not in {main_branch, f"origin/{main_branch}", "origin/HEAD -> origin/main", current_branch} and "->" not in b # filters symbolic refs ] return branches @app.command("clean") def clean_workspace( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to diff. " "If omitted, uses the workspace containing the current directory." ), ), dry_run: bool = typer.Option( False, "--dry-run", "-n", help="Print what would be deleted, but don't actually delete.", ), ): """Show detailed status for the current (or specified) workspace. For each repo in the workspace: - look up all local branches that are merged into main - ask user to delete local branches that are merged into main - look up all remote branches that are merged into main - ask user to delete remote branches that are merged into main """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir: console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) repos = [directory for directory in ws_dir.iterdir() if directory.is_dir()] repo = pick_repo_with_iterfzf(repos) if not repo: console.print("[red]No repo selected.[/red]") raise typer.Exit(1) title, _desc = read_workspace_readme(ws_dir) console.print( f"Cleaning workspace [bold]{title or ws_dir.name}[/bold] (dir: {repo.name})" ) run_cmd(["git", "fetch", "--all", "--prune"], cwd=repo) local_branches_merged = get_branches_merged(repo, remote=False) remote_branches_merged = get_branches_merged(repo, remote=True) if len(local_branches_merged) == 0 and len(remote_branches_merged) == 0: console.print("[green]No branches to delete.[/green]") raise typer.Exit(0) console.print( f"[green]Would delete {len(local_branches_merged)} local branches[/green]" ) for b in local_branches_merged: console.print(f"[yellow]{b}[/yellow]") console.print( f"[green]Would delete {len(remote_branches_merged)} remote branches[/green]" ) for b in remote_branches_merged: console.print(f"[yellow]{b}[/yellow]") if dry_run: console.print("[grey]Dry run. Exiting.[/grey]") raise typer.Exit(0) if not Confirm.ask("Delete these branches?"): console.print("[red]Aborting.[/red]") raise typer.Exit(1) for b in local_branches_merged: cmd = ["git", "branch", "-D", b] run_cmd(cmd, cwd=repo) for b in remote_branches_merged: # b looks like "origin/feat/workspaces-tmux-support" if "->" in b: # safety: skip symbolic refs like "origin/HEAD -> origin/main" continue try: remote, branch = b.split("/", 1) except ValueError: # fallback: no slash? assume origin + raw name remote, branch = "origin", b cmd = ["git", "push", remote, "--delete", branch] status, out, err = run_cmd(cmd, cwd=repo) if status != 0: console.print(f"[red]Failed to delete {remote}/{branch}:[/red]\n{err}") continue console.print("[green]Done.[/green]") def in_tmux() -> bool: """Return True if inside tmux""" return "TMUX" in os.environ def not_in_tmux() -> bool: """Return True if not inside tmux""" return not in_tmux() def run_tmux( args: list[str], *, clear_tmux_env: bool = False ) -> subprocess.CompletedProcess: """Run a tmux command.""" env = os.environ.copy() if clear_tmux_env: env["TMUX"] = "" return subprocess.run( ["tmux", *args], env=env, check=False, capture_output=True, ) def get_tmux_sessions() -> list[str]: """List all tmux sessions.""" result = run_tmux(["list-sessions", "-F", "#{session_name}"]) return result.stdout.decode().splitlines() def session_exists(session_name: str) -> bool: """Check if the tmux session exists.""" result = run_tmux(["has-session", "-t", f"={session_name}"]) return result.returncode == 0 def pick_project(base_dir: Path) -> str | None: """Use fzf to pick a subdirectory (project) from base_dir.""" if not base_dir.is_dir(): typer.echo(f"[ta] {base_dir} is not a directory.", err=True) return None subdirs = sorted([p.name for p in base_dir.iterdir() if p.is_dir()]) if not subdirs: typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True) return None if not shutil.which("fzf"): typer.echo("[ta] fzf not found in PATH.", err=True) return None proc = subprocess.run( ["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"], input="\n".join(subdirs), text=True, check=False, capture_output=True, ) if proc.returncode != 0: # Cancelled or failed return None choice = proc.stdout.strip() return choice or None def create_detached_session( session_name: str, path_name: Path, start_mode: bool, ) -> bool: """Create a detached session. - If start_mode: just a single window. - Else: split layout with nvim on top. """ # Run tmux as if not in tmux (clear TMUX env) if start_mode: r = run_tmux( ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], clear_tmux_env=True, ) return r.returncode == 0 r = run_tmux( ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], clear_tmux_env=True, ) if r.returncode != 0: return False r = run_tmux( [ "split-window", "-vb", "-t", session_name, "-c", str(path_name), "-p", "70", ], clear_tmux_env=True, ) if r.returncode != 0: return False r = run_tmux( [ "send-keys", "-t", session_name, "nvim '+Telescope find_files'", "Enter", ], clear_tmux_env=True, ) return r.returncode == 0 def create_if_needed_and_attach( session_name: str, path_name: Path, start_mode: bool, ) -> bool: """Mimic the bash logic: - If not in tmux: `tmux new-session -As ... -c path_name` - Else: - If session doesn't exist: create_detached_session() - Then `tmux switch-client -t session_name` """ session_name = session_name.strip().replace(" ", "_").replace(':', '_').replace('.', '_').replace('/', '_') if not_in_tmux(): r = run_tmux( ["new-session", "-As", session_name, "-c", str(path_name)], ) return r.returncode == 0 # Inside tmux if not session_exists(session_name) and not create_detached_session(session_name, path_name, start_mode): return False r = run_tmux(["switch-client", "-t", session_name]) return r.returncode == 0 def attach_to_first_session() -> None: """Fallback: attach to first tmux session and open choose-tree.""" # Attach to the first listed session list_proc = subprocess.run( ["tmux", "list-sessions", "-F", "#{session_name}"], text=True, check=False, capture_output=True, ) if list_proc.returncode != 0 or not list_proc.stdout.strip(): typer.echo("[ta] No tmux sessions found to attach to.", err=True) raise typer.Exit(1) first_session = list_proc.stdout.strip().splitlines()[0] attach_proc = run_tmux(["attach-session", "-t", first_session]) if attach_proc.returncode != 0: raise typer.Exit(attach_proc.returncode) # After attach, show choose-tree (this will run in the attached session) run_tmux(["choose-tree", "-Za"]) def remove_session_if_exists(session_name: str) -> None: r = run_tmux(["has-session", "-t", session_name]) if r.returncode == 0: remove_session(session_name) def remove_session(session_name: str) -> None: r = run_tmux(["kill-session", "-t", session_name]) if r.returncode != 0: raise typer.Exit(r.returncode) def get_session_name(ws_dir: Path, repo: Path): session_name = f"ω|{ws_dir.name}|{repo.name}" session_name = session_name.strip().replace(" ", "_").replace(':', '_').replace('.', '_').replace('/', '_') return session_name return result.stdout.decode().strip() tmux_app = typer.Typer( name="tmux", help="tmux commands", add_completion=False, ) @tmux_app.command("attach") def tmux_cli_attach( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to show status for. " "If omitted, uses the workspace containing the current directory." ), ), ): """ Attach or create a tmux session for a repo in a workspace. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) workspace = ws_dir.name if not ws_dir: console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) # pick repo in workspace repos = [directory for directory in ws_dir.iterdir() if directory.is_dir()] if not repos: console.print("[red]No repos found in workspace. Add One.[/red]") add_repo(ctx=ctx, workspace=workspace, repo_name=None) repos = [directory for directory in ws_dir.iterdir() if directory.is_dir()] repo = pick_repo_with_iterfzf(repos) if not repo: console.print("[red]No repo selected. Exiting.[/red]") raise typer.Exit(1) session_name = get_session_name(ws_dir, repo) console.print(f"Session name: {session_name}") create_if_needed_and_attach(session_name, repo, False) @tmux_app.command("list-sessions") def tmux_cli_list_sessions( workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to show status for. " "If omitted, uses the workspace containing the current directory." ), ), ): """ List tmux workspace sessions. """ if not_in_tmux(): console.print("[red]Not in tmux. Exiting.[/red]") raise typer.Exit(1) if not workspace: tmux_sessions = [session for session in get_tmux_sessions() if "|" in session] console.print(tmux_sessions) return tmux_sessions = [ session for session in get_tmux_sessions() if session.startswith(workspace) ] console.print(tmux_sessions) @tmux_app.command("remove") def tmux_cli_remove( workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to show status for. " "If omitted, uses the workspace containing the current directory." ), ), dry_run: bool = typer.Option( False, "--dry-run", "-d", help="Don't actually remove the session.", ), ): """ Remove tmux workspace sessions. """ if not_in_tmux(): console.print("[red]Not in tmux. Exiting.[/red]") raise typer.Exit(1) if workspace: tmux_sessions = [ session for session in get_tmux_sessions() if session.startswith(workspace) ] else: tmux_sessions = [ session for session in get_tmux_sessions() if session.startswith("ω") ] if not dry_run and not Confirm.ask( f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n", default=False, ): raise typer.Exit(1) if dry_run: console.print(tmux_sessions) return for session_name in tmux_sessions: remove_session(session_name) app.add_typer(tmux_app) @app.command("attach") def attach( ctx: typer.Context, workspace: str | None = typer.Option( None, "--workspace", "-w", help=( "Workspace directory name to show status for. " "If omitted, uses the workspace containing the current directory." ), ), ): """ Attach to a workspace. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) if len(list_workspaces(workspaces_dir)) == 0: import sys cmd = sys.executable console.print("[red]No workspaces found. Exiting.[/red]") console.print(f"Create a workspace with `[yellow]workspaces create[/yellow]` first.") raise typer.Exit(1) if in_tmux(): tmux_cli_attach(ctx, workspace) if __name__ == "__main__": app()