#!/usr/bin/env -S uv run --quiet --script # /// script # requires-python = ">=3.12" # dependencies = [ # "typer", # "rich", # "pydantic", # "pydantic-settings", # "iterfzf", # ] # /// import os import re import shutil import subprocess from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Tuple import typer from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict from rich.console import Console from rich.table import Table from rich.prompt import Prompt, Confirm from iterfzf import iterfzf app = typer.Typer( help="Workspace management tool", invoke_without_command=True, ) console = Console() # --------------------------------------------------------------------------- # Settings # --------------------------------------------------------------------------- class Settings(BaseSettings): """ Global configuration for workspaces. Resolution for workspaces_name: 1. Command-line flag --workspaces-name 2. Environment variable WORKSPACES_NAME 3. Default "git" repos_dir = ~/workspaces_name workspaces_dir = ~/workspaces_name + ".workspaces" """ workspaces_name: str = Field( default="git", description="Logical name of the workspace group (e.g. 'git', 'work', 'personal').", ) # pydantic v2-style config model_config = SettingsConfigDict( env_prefix="", env_file=None, env_nested_delimiter="__", extra="ignore", ) @classmethod def from_env_and_override(cls, override_workspaces_name: Optional[str]) -> "Settings": """ Construct settings honoring: 1. CLI override 2. WORKSPACES_NAME env 3. default "git" """ s = cls() if override_workspaces_name is not None: s.workspaces_name = override_workspaces_name env_val = os.getenv("WORKSPACES_NAME") if env_val and override_workspaces_name is None: s.workspaces_name = env_val return s def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]: """ Build Settings and derived paths, honoring CLI override of workspaces_name. """ base_settings = Settings.from_env_and_override(workspaces_name) name = base_settings.workspaces_name repos_dir = Path(os.path.expanduser(f"~/{name}")).resolve() workspaces_dir = Path(os.path.expanduser(f"~/{name}.workspaces")).resolve() return base_settings, repos_dir, workspaces_dir # --------------------------------------------------------------------------- # Models / helpers # --------------------------------------------------------------------------- @dataclass class GitStatus: ahead: int = 0 behind: int = 0 dirty: bool = False @property def indicator(self) -> str: """ Build ASCII indicator: - clean: "·" - ahead 1: "↑1" - behind 2: "↓2" - both ahead/behind: "↑1 ↓2" - add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*" """ parts: List[str] = [] if self.ahead: parts.append(f"↑{self.ahead}") if self.behind: parts.append(f"↓{self.behind}") base = " ".join(parts) if parts else "·" if self.dirty: base += "*" return base def get_git_status(repo_path: Path) -> GitStatus: """ Get ahead/behind and dirty info for a Git repo. Uses `git status --porcelain=v2 --branch` and parses: - '# branch.ab +A -B' for ahead/behind - any non-comment line for dirty """ try: out = subprocess.check_output( ["git", "status", "--porcelain=v2", "--branch"], cwd=repo_path, stderr=subprocess.DEVNULL, text=True, ) except Exception: return GitStatus() ahead = 0 behind = 0 dirty = False for line in out.splitlines(): if line.startswith("# branch.ab"): # Example: "# branch.ab +1 -2" m = re.search(r"\+(\d+)\s+-(\d+)", line) if m: ahead = int(m.group(1)) behind = int(m.group(2)) elif not line.startswith("#"): dirty = True return GitStatus(ahead=ahead, behind=behind, dirty=dirty) def get_current_branch(repo_path: Path) -> Optional[str]: try: out = subprocess.check_output( ["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=repo_path, stderr=subprocess.DEVNULL, text=True, ) return out.strip() except Exception: return None def ensure_git_repo(path: Path) -> bool: # Works for repos and worktrees (.git file or dir) return (path / ".git").exists() def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]: proc = subprocess.Popen( cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) out, err = proc.communicate() return proc.returncode, out, err def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path: """ Resolve the directory of a workspace. - If workspace_name given: use workspaces_dir / workspace_name - Else: use current working directory (must be inside workspaces_dir). """ if workspace_name: return workspaces_dir / workspace_name cwd = Path.cwd().resolve() try: cwd.relative_to(workspaces_dir) except ValueError: console.print( f"[red]Not inside workspaces_dir ({workspaces_dir}). " "Please use --workspace to specify a workspace.[/red]" ) raise typer.Exit(1) # The top-level workspace directory is the first component under workspaces_dir rel = cwd.relative_to(workspaces_dir) workspace_root = workspaces_dir / rel.parts[0] return workspace_root def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]: """ Return (name_from_h1, description_from_rest_of_file). If file missing or malformed, fallback appropriately. """ readme = ws_dir / "readme.md" if not readme.exists(): name = ws_dir.name return name, "" text = readme.read_text(encoding="utf-8") lines = text.splitlines() if not lines: return ws_dir.name, "" # First non-empty line must be '# ...' per spec first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None) if first_non_empty_idx is None: return ws_dir.name, "" first_line = lines[first_non_empty_idx].strip() if not first_line.startswith("# "): # Fallback return ws_dir.name, "\n".join(lines[first_non_empty_idx + 1 :]).strip() name = first_line[2:].strip() desc = "\n".join(lines[first_non_empty_idx + 1 :]).strip() return name, desc def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None: ws_dir.mkdir(parents=True, exist_ok=True) content = f"# {name}\n\n{description}\n" (ws_dir / "readme.md").write_text(content, encoding="utf-8") # --------------------------------------------------------------------------- # Commands / main callback # --------------------------------------------------------------------------- @app.callback() def main( ctx: typer.Context, workspaces_name: Optional[str] = typer.Option( None, "--workspaces-name", "-W", help=( "Logical name for this workspace set (e.g. 'git', 'work', 'personal'). " "Overrides WORKSPACES_NAME env. Defaults to 'git'." ), ), ): """ Manage workspaces and associated Git worktrees. If no command is given, this will list workspaces. """ settings, repos_dir, workspaces_dir = resolve_paths(workspaces_name) ctx.obj = { "settings": settings, "repos_dir": repos_dir, "workspaces_dir": workspaces_dir, } # Default behavior when no subcommand is provided: if ctx.invoked_subcommand is None: list_workspaces(ctx) raise typer.Exit(0) def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]: obj = ctx.obj or {} return obj["settings"], obj["repos_dir"], obj["workspaces_dir"] # ---------------- list-workspaces ---------------- @app.command("list") def list_workspaces( ctx: typer.Context, ): """ List all workspaces. Shows: - workspace directory name - workspace description (from README markdown, everything after H1) - included repos with git status indicators """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) workspaces_dir.mkdir(parents=True, exist_ok=True) table = Table(title=f"Workspaces ({workspaces_dir})") table.add_column("Workspace", style="bold") table.add_column("Description", overflow="fold") table.add_column("Repos", overflow="fold") if not workspaces_dir.exists(): console.print(f"[yellow]No workspaces_dir found at {workspaces_dir}[/yellow]") raise typer.Exit(0) for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()): name, desc = read_workspace_readme(ws) repos: List[str] = [] for child in sorted(p for p in ws.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue status = get_git_status(child) branch = get_current_branch(child) or "?" indicator = status.indicator repos.append(f"{child.name} [{branch}] {indicator}") repos_str = "\n".join(repos) if repos else "-" table.add_row(ws.name, desc or "-", repos_str) console.print(table) # ---------------- create-workspace ---------------- @app.command("create") @app.command("new", hidden=True) def create_workspace( ctx: typer.Context, name: Optional[str] = typer.Option( None, "--name", "-n", help="Name of the new workspace (directory name)." ), description: Optional[str] = typer.Option( None, "--description", "-d", help="Description of the workspace. Will be written into readme.md.", ), ): """ Create a new workspace. - Asks for name and description if not provided. - Creates directory under workspaces_dir. - Creates README with format '# \\n\\n'. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) workspaces_dir.mkdir(parents=True, exist_ok=True) if not name: name = Prompt.ask("Workspace name") if not description: description = Prompt.ask("Workspace description", default="") ws_dir = workspaces_dir / name if ws_dir.exists(): console.print(f"[red]Workspace '{name}' already exists at {ws_dir}[/red]") raise typer.Exit(1) write_workspace_readme(ws_dir, name, description) console.print(f"[green]Created workspace[/green] {ws_dir}") # ---------------- list-repos ---------------- @app.command("list-repos") def list_repos( ctx: typer.Context, workspace: Optional[str] = typer.Option( None, "--workspace", "-w", help=( "Workspace name to inspect. " "If omitted, uses the workspace containing the current directory." ), ), ): """ List repos and branches in the current (or specified) workspace. Shows: - repo directory name - current branch - ahead/behind/dirty indicators """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) table = Table(title=f"Repos in workspace '{ws_dir.name}'") table.add_column("Repo (dir)", style="bold") table.add_column("Branch") table.add_column("Status") for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue branch = get_current_branch(child) or "?" status = get_git_status(child) table.add_row(child.name, branch, status.indicator) console.print(table) # ---------------- add-repo ---------------- def list_all_repos(repos_dir: Path) -> List[Path]: """ List all directories in repos_dir that appear to be git repos. """ if not repos_dir.exists(): return [] repos = [] for p in sorted(repos_dir.iterdir()): if p.is_dir() and ensure_git_repo(p): repos.append(p) return repos def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]: """ Use iterfzf (Python library) to pick a repo from a list of paths. """ if not repos: return None names = [r.name for r in repos] choice = iterfzf(names, prompt="pick a repo> ") if not choice: return None for r in repos: if r.name == choice: return r return None @app.command("add-repo") def add_repo( ctx: typer.Context, workspace: Optional[str] = typer.Option( None, "--workspace", "-w", help=( "Workspace to add repo to. " "If omitted, uses the workspace containing the current directory." ), ), repo_name: Optional[str] = typer.Option( None, "--repo", "-r", help=( "Name of repo (directory under repos_dir). " "If omitted, uses iterfzf to pick from repos_dir." ), ), ): """ Add a repo to a workspace. - Lists all directories in repos_dir as repos. - Uses iterfzf to pick repo if --repo not given. - Creates a worktree for branch named after the workspace into workspace_dir / repo_name. """ _settings, repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) ws_name = ws_dir.name all_repos = list_all_repos(repos_dir) if not all_repos: console.print(f"[red]No git repos found in {repos_dir}[/red]") raise typer.Exit(1) repo_path: Optional[Path] = None if repo_name: for r in all_repos: if r.name == repo_name: repo_path = r break if repo_path is None: console.print( f"[red]Repo '{repo_name}' not found in {repos_dir}. " "Use --repo with a valid name or omit to use iterfzf.[/red]" ) raise typer.Exit(1) else: repo_path = pick_repo_with_iterfzf(all_repos) if repo_path is None: console.print("[yellow]No repo selected.[/yellow]") raise typer.Exit(0) target_dir = ws_dir / repo_path.name if target_dir.exists(): console.print( f"[yellow]Directory {target_dir} already exists. " "Assuming repo already added.[/yellow]" ) raise typer.Exit(0) # Ensure branch exists or create it branch = ws_name code, _out, err = run_cmd(["git", "rev-parse", "--verify", branch], cwd=repo_path) if code != 0: # create branch from current HEAD console.print( f"[yellow]Branch '{branch}' does not exist in {repo_path.name}; creating it.[/yellow]" ) code, _out, err = run_cmd(["git", "branch", branch], cwd=repo_path) if code != 0: console.print( f"[red]Failed to create branch '{branch}' in {repo_path.name}:[/red]\n{err}" ) raise typer.Exit(1) # Create worktree ws_dir.mkdir(parents=True, exist_ok=True) code, _out, err = run_cmd( ["git", "worktree", "add", str(target_dir), branch], cwd=repo_path, ) if code != 0: console.print( f"[red]Failed to create worktree for repo {repo_path.name} " f"on branch '{branch}' into {target_dir}:[/red]\n{err}" ) raise typer.Exit(1) console.print( f"[green]Added repo[/green] {repo_path.name} " f"to workspace [bold]{ws_name}[/bold] at {target_dir}" ) # ---------------- rm-workspace ---------------- def find_repo_for_worktree( worktree_path: Path, repos_dir: Path ) -> Optional[Path]: """ Try to find the parent repo for a worktree, assuming it lives in repos_dir with the same directory name as the worktree. """ candidate = repos_dir / worktree_path.name if candidate.exists() and ensure_git_repo(candidate): return candidate return None def has_unpushed_commits(repo_path: Path, branch: str) -> bool: """ Detect if branch has commits not on its upstream. Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits. """ # Check if upstream exists code, _, _ = run_cmd( ["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], cwd=repo_path, ) if code != 0: # No upstream configured; treat as unpushed work. return True code, out, _ = run_cmd( ["git", "rev-list", "@{u}..HEAD", "--count"], cwd=repo_path, ) if code != 0: return True try: count = int(out.strip() or "0") except ValueError: return True return count > 0 @app.command("rm") def remove_workspace( ctx: typer.Context, workspace: Optional[str] = typer.Option( None, "--workspace", "-w", help=( "Workspace to remove. " "If omitted, uses the workspace containing the current directory." ), ), force: bool = typer.Option( False, "--force", "-f", help="Force removal even if there is dirty or unpushed work.", ), ): """ Remove a workspace: - For each repo worktree in the workspace: * Check for dirty work or unpushed commits. * If any found and not --force, abort. * Otherwise, run 'git worktree remove'. - Finally, delete the workspace directory. """ _settings, repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir.exists(): console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) # Collect worktrees (subdirs that look like git repos, excluding readme.md) worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue if not ensure_git_repo(child): continue worktrees.append(child) if not worktrees: # No worktrees, just remove workspace dir if not Confirm.ask( f"[red]No worktrees found.[/red] Remove empty workspace [bold]{ws_dir.name}[/bold]?", default=False, ): raise typer.Exit(0) shutil.rmtree(ws_dir) console.print(f"[green]Removed workspace[/green] {ws_dir}") raise typer.Exit(0) # Check for dirty / unpushed changes problems: List[str] = [] for wt in worktrees: status = get_git_status(wt) branch = get_current_branch(wt) or "?" if status.dirty: problems.append(f"{wt.name}: dirty working tree on '{branch}'") repo = find_repo_for_worktree(wt, repos_dir) if repo is not None and branch != "?": if has_unpushed_commits(wt, branch): problems.append(f"{wt.name}: unpushed commits on '{branch}'") if problems and not force: console.print( "[red]Refusing to remove workspace; found dirty or unpushed work:[/red]" ) for p in problems: console.print(f" - {p}") console.print( "\nUse [bold]--force[/bold] to remove the workspace and its worktrees anyway." ) raise typer.Exit(1) if not Confirm.ask( f"Remove workspace [bold]{ws_dir.name}[/bold] and clean up its worktrees?", default=False, ): raise typer.Exit(0) # Remove worktrees via git for wt in worktrees: repo = find_repo_for_worktree(wt, repos_dir) display_name = wt.name if repo is None: console.print( f"[yellow]Could not find parent repo for worktree {display_name}; " "deleting directory directly.[/yellow]" ) shutil.rmtree(wt) continue console.print(f"Removing worktree for [bold]{display_name}[/bold]...") code, _out, err = run_cmd( ["git", "worktree", "remove", "--force" if force else "--detach", str(wt)], cwd=repo, ) if code != 0: console.print( f"[red]Failed to remove worktree {display_name} via git:[/red]\n{err}" ) # As a last resort, if force is given, nuke the dir. if force: console.print( f"[yellow]Forcing directory removal of {wt} despite git error.[/yellow]" ) shutil.rmtree(wt) elif wt.exists(): # git worktree remove should delete the directory; if not, clean up. shutil.rmtree(wt) # Finally, remove the workspace directory itself shutil.rmtree(ws_dir) console.print(f"[green]Removed workspace[/green] {ws_dir}") if __name__ == "__main__": app()