workspaces/workspaces.py

791 lines
23 KiB
Python
Executable file

#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "typer",
# "rich",
# "pydantic",
# "pydantic-settings",
# "iterfzf",
# ]
# ///
import os
import re
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
import typer
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt, Confirm
from iterfzf import iterfzf
app = typer.Typer(
help="Workspace management tool",
invoke_without_command=True,
)
console = Console()
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
class Settings(BaseSettings):
"""
Global configuration for workspaces.
Resolution for workspaces_name:
1. Command-line flag --workspaces-name
2. Environment variable WORKSPACES_NAME
3. Default "git"
repos_dir = ~/workspaces_name
workspaces_dir = ~/workspaces_name + ".workspaces"
"""
workspaces_name: str = Field(
default="git",
description="Logical name of the workspace group (e.g. 'git', 'work', 'personal').",
)
# pydantic v2-style config
model_config = SettingsConfigDict(
env_prefix="",
env_file=None,
env_nested_delimiter="__",
extra="ignore",
)
@classmethod
def from_env_and_override(cls, override_workspaces_name: Optional[str]) -> "Settings":
"""
Construct settings honoring:
1. CLI override
2. WORKSPACES_NAME env
3. default "git"
"""
s = cls()
if override_workspaces_name is not None:
s.workspaces_name = override_workspaces_name
env_val = os.getenv("WORKSPACES_NAME")
if env_val and override_workspaces_name is None:
s.workspaces_name = env_val
return s
def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]:
"""
Build Settings and derived paths, honoring CLI override of workspaces_name.
"""
base_settings = Settings.from_env_and_override(workspaces_name)
name = base_settings.workspaces_name
repos_dir = Path(os.path.expanduser(f"~/{name}")).resolve()
workspaces_dir = Path(os.path.expanduser(f"~/{name}.workspaces")).resolve()
return base_settings, repos_dir, workspaces_dir
# ---------------------------------------------------------------------------
# Models / helpers
# ---------------------------------------------------------------------------
@dataclass
class GitStatus:
ahead: int = 0
behind: int = 0
dirty: bool = False
@property
def indicator(self) -> str:
"""
Build ASCII indicator:
- clean: "·"
- ahead 1: "↑1"
- behind 2: "↓2"
- both ahead/behind: "↑1 ↓2"
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
"""
parts: List[str] = []
if self.ahead:
parts.append(f"{self.ahead}")
if self.behind:
parts.append(f"{self.behind}")
base = " ".join(parts) if parts else "·"
if self.dirty:
base += "*"
return base
def get_git_status(repo_path: Path) -> GitStatus:
"""
Get ahead/behind and dirty info for a Git repo.
Uses `git status --porcelain=v2 --branch` and parses:
- '# branch.ab +A -B' for ahead/behind
- any non-comment line for dirty
"""
try:
out = subprocess.check_output(
["git", "status", "--porcelain=v2", "--branch"],
cwd=repo_path,
stderr=subprocess.DEVNULL,
text=True,
)
except Exception:
return GitStatus()
ahead = 0
behind = 0
dirty = False
for line in out.splitlines():
if line.startswith("# branch.ab"):
# Example: "# branch.ab +1 -2"
m = re.search(r"\+(\d+)\s+-(\d+)", line)
if m:
ahead = int(m.group(1))
behind = int(m.group(2))
elif not line.startswith("#"):
dirty = True
return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
def get_current_branch(repo_path: Path) -> Optional[str]:
try:
out = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
cwd=repo_path,
stderr=subprocess.DEVNULL,
text=True,
)
return out.strip()
except Exception:
return None
def ensure_git_repo(path: Path) -> bool:
# Works for repos and worktrees (.git file or dir)
return (path / ".git").exists()
def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
proc = subprocess.Popen(
cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
out, err = proc.communicate()
return proc.returncode, out, err
def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path:
"""
Resolve the directory of a workspace.
- If workspace_name given: use workspaces_dir / workspace_name
- Else: use current working directory (must be inside workspaces_dir).
"""
if workspace_name:
return workspaces_dir / workspace_name
cwd = Path.cwd().resolve()
try:
cwd.relative_to(workspaces_dir)
except ValueError:
console.print(
f"[red]Not inside workspaces_dir ({workspaces_dir}). "
"Please use --workspace to specify a workspace.[/red]"
)
raise typer.Exit(1)
# The top-level workspace directory is the first component under workspaces_dir
rel = cwd.relative_to(workspaces_dir)
workspace_root = workspaces_dir / rel.parts[0]
return workspace_root
def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
"""
Return (name_from_h1, description_from_rest_of_file).
If file missing or malformed, fallback appropriately.
"""
readme = ws_dir / "readme.md"
if not readme.exists():
name = ws_dir.name
return name, ""
text = readme.read_text(encoding="utf-8")
lines = text.splitlines()
if not lines:
return ws_dir.name, ""
# First non-empty line must be '# ...' per spec
first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None)
if first_non_empty_idx is None:
return ws_dir.name, ""
first_line = lines[first_non_empty_idx].strip()
if not first_line.startswith("# "):
# Fallback
return ws_dir.name, "\n".join(lines[first_non_empty_idx + 1 :]).strip()
name = first_line[2:].strip()
desc = "\n".join(lines[first_non_empty_idx + 1 :]).strip()
return name, desc
def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
ws_dir.mkdir(parents=True, exist_ok=True)
content = f"# {name}\n\n{description}\n"
(ws_dir / "readme.md").write_text(content, encoding="utf-8")
# --- name/branch helpers ----------------------------------------------------
def slugify_workspace_name(name: str) -> str:
"""
Turn arbitrary workspace name into a safe directory/worktree name.
- Lowercase
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
- Replace spaces with '-'
- Replace any char not [a-z0-9._/-] with '-'
- Collapse multiple '-' and strip leading/trailing '-' and '/'
"""
name = name.replace(":", "-")
name = name.replace(" ", "-")
safe = []
for ch in name:
if re.match(r"[a-zA-Z0-9._/-]", ch):
safe.append(ch.lower())
else:
safe.append("-")
s = "".join(safe)
s = re.sub(r"-+", "-", s)
s = s.strip("-/")
return s or "workspace"
def branch_name_for_workspace(name: str) -> str:
"""
Compute branch name from workspace name.
Rules:
- Start from slugified name (no spaces/specials).
- Replace *first* '-' with '/' so:
'fix-my-issue' -> 'fix/my-issue'
(original 'fix:my issue' -> slug 'fix-my-issue' -> branch 'fix/my-issue')
"""
base = slugify_workspace_name(name)
if "-" in base:
return base.replace("-", "/", 1)
return base
# ---------------------------------------------------------------------------
# Commands / main callback
# ---------------------------------------------------------------------------
@app.callback()
def main(
ctx: typer.Context,
workspaces_name: Optional[str] = typer.Option(
None,
"--workspaces-name",
"-W",
help=(
"Logical name for this workspace set (e.g. 'git', 'work', 'personal'). "
"Overrides WORKSPACES_NAME env. Defaults to 'git'."
),
),
):
"""
Manage workspaces and associated Git worktrees.
If no command is given, this will list workspaces.
"""
settings, repos_dir, workspaces_dir = resolve_paths(workspaces_name)
ctx.obj = {
"settings": settings,
"repos_dir": repos_dir,
"workspaces_dir": workspaces_dir,
}
# Default behavior when no subcommand is provided:
if ctx.invoked_subcommand is None:
list_workspaces(ctx)
raise typer.Exit(0)
def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]:
obj = ctx.obj or {}
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
# ---------------- list-workspaces ----------------
@app.command("list")
def list_workspaces(
ctx: typer.Context,
):
"""
List all workspaces.
Shows:
- workspace directory name
- workspace description (from README markdown, everything after H1)
- included repos with git status indicators
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
workspaces_dir.mkdir(parents=True, exist_ok=True)
table = Table(title=f"Workspaces ({workspaces_dir})")
table.add_column("Workspace (dir)", style="bold")
table.add_column("Title", overflow="fold")
table.add_column("Description", overflow="fold")
table.add_column("Repos", overflow="fold")
if not workspaces_dir.exists():
console.print(f"[yellow]No workspaces_dir found at {workspaces_dir}[/yellow]")
raise typer.Exit(0)
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
title, desc = read_workspace_readme(ws)
repos: List[str] = []
for child in sorted(p for p in ws.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
status = get_git_status(child)
branch = get_current_branch(child) or "?"
indicator = status.indicator
repos.append(f"{child.name} [{branch}] {indicator}")
repos_str = "\n".join(repos) if repos else "-"
table.add_row(ws.name, title or "-", desc or "-", repos_str)
console.print(table)
# ---------------- create-workspace ----------------
@app.command("create")
def create_workspace(
ctx: typer.Context,
name: Optional[str] = typer.Option(
None,
"--name",
"-n",
help="Name of the new workspace (display name; can contain spaces).",
),
description: Optional[str] = typer.Option(
None,
"--description",
"-d",
help="Description of the workspace. Will be written into readme.md.",
),
):
"""
Create a new workspace.
- Asks for name and description if not provided.
- Workspace directory uses a slugified version of the name.
- README uses the original name as '# <name>\\n\\n<description>'.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
workspaces_dir.mkdir(parents=True, exist_ok=True)
if not name:
name = Prompt.ask("Workspace name (can contain spaces)")
if not description:
description = Prompt.ask("Workspace description", default="")
dir_name = slugify_workspace_name(name)
ws_dir = workspaces_dir / dir_name
if ws_dir.exists():
console.print(f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]")
raise typer.Exit(1)
write_workspace_readme(ws_dir, name, description)
console.print(
f"[green]Created workspace[/green] title='{name}' dir='{ws_dir.name}' at {ws_dir}"
)
# ---------------- list-repos ----------------
@app.command("list-repos")
def list_repos(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to inspect. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""
List repos and branches in the current (or specified) workspace.
Shows:
- repo directory name
- current branch
- ahead/behind/dirty indicators
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
table = Table(title=f"Repos in workspace '{title}' (dir: {ws_dir.name})")
table.add_column("Repo (dir)", style="bold")
table.add_column("Branch")
table.add_column("Status")
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
branch = get_current_branch(child) or "?"
status = get_git_status(child)
table.add_row(child.name, branch, status.indicator)
console.print(table)
# ---------------- add-repo ----------------
def list_all_repos(repos_dir: Path) -> List[Path]:
"""
List all directories in repos_dir that appear to be git repos.
"""
if not repos_dir.exists():
return []
repos = []
for p in sorted(repos_dir.iterdir()):
if p.is_dir() and ensure_git_repo(p):
repos.append(p)
return repos
def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
"""
Use iterfzf (Python library) to pick a repo from a list of paths.
"""
if not repos:
return None
names = [r.name for r in repos]
choice = iterfzf(names, prompt="pick a repo> ")
if not choice:
return None
for r in repos:
if r.name == choice:
return r
return None
@app.command("add-repo")
def add_repo(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to add repo to. "
"If omitted, uses the workspace containing the current directory."
),
),
repo_name: Optional[str] = typer.Option(
None,
"--repo",
"-r",
help=(
"Name of repo (directory under repos_dir). "
"If omitted, uses iterfzf to pick from repos_dir."
),
),
):
"""
Add a repo to a workspace.
- Lists all directories in repos_dir as repos.
- Uses iterfzf to pick repo if --repo not given.
- Creates a worktree for a branch derived from the workspace name
into workspace_dir / repo_name.
"""
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
# Use display title from README to derive branch name
title, _desc = read_workspace_readme(ws_dir)
branch = branch_name_for_workspace(title or ws_dir.name)
all_repos = list_all_repos(repos_dir)
if not all_repos:
console.print(f"[red]No git repos found in {repos_dir}[/red]")
raise typer.Exit(1)
repo_path: Optional[Path] = None
if repo_name:
for r in all_repos:
if r.name == repo_name:
repo_path = r
break
if repo_path is None:
console.print(
f"[red]Repo '{repo_name}' not found in {repos_dir}. "
"Use --repo with a valid name or omit to use iterfzf.[/red]"
)
raise typer.Exit(1)
else:
repo_path = pick_repo_with_iterfzf(all_repos)
if repo_path is None:
console.print("[yellow]No repo selected.[/yellow]")
raise typer.Exit(0)
target_dir = ws_dir / repo_path.name
if target_dir.exists():
console.print(
f"[yellow]Directory {target_dir} already exists. "
"Assuming repo already added.[/yellow]"
)
raise typer.Exit(0)
# Ensure branch exists or create it
code, _out, err = run_cmd(["git", "rev-parse", "--verify", branch], cwd=repo_path)
if code != 0:
# create branch from current HEAD
console.print(
f"[yellow]Branch '{branch}' does not exist in {repo_path.name}; creating it.[/yellow]"
)
code, _out, err = run_cmd(["git", "branch", branch], cwd=repo_path)
if code != 0:
console.print(
f"[red]Failed to create branch '{branch}' in {repo_path.name}:[/red]\n{err}"
)
raise typer.Exit(1)
# Create worktree (dir name is safe because repo_path.name is)
ws_dir.mkdir(parents=True, exist_ok=True)
code, _out, err = run_cmd(
["git", "worktree", "add", str(target_dir), branch],
cwd=repo_path,
)
if code != 0:
console.print(
f"[red]Failed to create worktree for repo {repo_path.name} "
f"on branch '{branch}' into {target_dir}:[/red]\n{err}"
)
raise typer.Exit(1)
console.print(
f"[green]Added repo[/green] {repo_path.name} "
f"to workspace [bold]{title or ws_dir.name}[/bold] "
f"(dir: {ws_dir.name}) on branch '{branch}' at {target_dir}"
)
# ---------------- rm-workspace ----------------
def find_repo_for_worktree(
worktree_path: Path, repos_dir: Path
) -> Optional[Path]:
"""
Try to find the parent repo for a worktree, assuming it lives in repos_dir
with the same directory name as the worktree.
"""
candidate = repos_dir / worktree_path.name
if candidate.exists() and ensure_git_repo(candidate):
return candidate
return None
def has_unpushed_commits(repo_path: Path, branch: str) -> bool:
"""
Detect if branch has commits not on its upstream.
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
"""
# Check if upstream exists
code, _, _ = run_cmd(
["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
cwd=repo_path,
)
if code != 0:
# No upstream configured; treat as unpushed work.
return True
code, out, _ = run_cmd(
["git", "rev-list", "@{u}..HEAD", "--count"],
cwd=repo_path,
)
if code != 0:
return True
try:
count = int(out.strip() or "0")
except ValueError:
return True
return count > 0
@app.command("rm")
def remove_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to remove. "
"If omitted, uses the workspace containing the current directory."
),
),
force: bool = typer.Option(
False,
"--force",
"-f",
help="Force removal even if there is dirty or unpushed work.",
),
):
"""
Remove a workspace:
- For each repo worktree in the workspace:
* Check for dirty work or unpushed commits.
* If any found and not --force, abort.
* Otherwise, run 'git worktree remove'.
- Finally, delete the workspace directory.
"""
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
# Collect worktrees (subdirs that look like git repos, excluding readme.md)
worktrees: List[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
worktrees.append(child)
if not worktrees:
# No worktrees, just remove workspace dir
if not Confirm.ask(
f"[red]No worktrees found.[/red] Remove empty workspace "
f"[bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})?",
default=False,
):
raise typer.Exit(0)
shutil.rmtree(ws_dir)
console.print(f"[green]Removed workspace[/green] {ws_dir}")
raise typer.Exit(0)
# Check for dirty / unpushed changes
problems: List[str] = []
for wt in worktrees:
status = get_git_status(wt)
branch = get_current_branch(wt) or "?"
if status.dirty:
problems.append(f"{wt.name}: dirty working tree on '{branch}'")
repo = find_repo_for_worktree(wt, repos_dir)
if repo is not None and branch != "?":
if has_unpushed_commits(wt, branch):
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
if problems and not force:
console.print(
"[red]Refusing to remove workspace; found dirty or unpushed work:[/red]"
)
for p in problems:
console.print(f" - {p}")
console.print(
"\nUse [bold]--force[/bold] to remove the workspace and its worktrees anyway."
)
raise typer.Exit(1)
if not Confirm.ask(
f"Remove workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name}) "
"and clean up its worktrees?",
default=False,
):
raise typer.Exit(0)
# Remove worktrees via git
for wt in worktrees:
repo = find_repo_for_worktree(wt, repos_dir)
display_name = wt.name
if repo is None:
console.print(
f"[yellow]Could not find parent repo for worktree {display_name}; "
"deleting directory directly.[/yellow]"
)
shutil.rmtree(wt)
continue
console.print(f"Removing worktree for [bold]{display_name}[/bold]...")
code, _out, err = run_cmd(
["git", "worktree", "remove", "--force" if force else "--detach", str(wt)],
cwd=repo,
)
if code != 0:
console.print(
f"[red]Failed to remove worktree {display_name} via git:[/red]\n{err}"
)
# As a last resort, if force is given, nuke the dir.
if force:
console.print(
f"[yellow]Forcing directory removal of {wt} despite git error.[/yellow]"
)
shutil.rmtree(wt)
elif wt.exists():
# git worktree remove should delete the directory; if not, clean up.
shutil.rmtree(wt)
# Finally, remove the workspace directory itself
shutil.rmtree(ws_dir)
console.print(
f"[green]Removed workspace[/green] title='{title or ws_dir.name}' dir='{ws_dir.name}'"
)
if __name__ == "__main__":
app()