workspaces/workspaces.py
Waylon S. Walker 6d93dc8177 consistent tmux-session names (#5)
Reviewed-on: #5
Co-authored-by: Waylon S. Walker <waylon@waylonwalker.com>
Co-committed-by: Waylon S. Walker <waylon@waylonwalker.com>
2025-11-27 20:27:26 -06:00

1665 lines
48 KiB
Python
Executable file

#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "typer",
# "rich",
# "pydantic",
# "pydantic-settings",
# "iterfzf",
# ]
# ///
from __future__ import annotations
import os
import re
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
import typer
from iterfzf import iterfzf
from pydantic import Field
from pydantic_settings import BaseSettings
from pydantic_settings import SettingsConfigDict
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm
from rich.prompt import Prompt
from rich.syntax import Syntax
from rich.table import Table
app = typer.Typer(
help="Workspace management tool",
invoke_without_command=True,
)
console = Console()
# ---------------------------------------------------------------------------
# Settings
# ---------------------------------------------------------------------------
class Settings(BaseSettings):
"""Global configuration for workspaces.
Resolution for workspaces_name:
1. Command-line flag --workspaces-name
2. Environment variable WORKSPACES_NAME
3. Default "git"
repos_dir = ~/workspaces_name
workspaces_dir = ~/workspaces_name + ".workspaces"
"""
workspaces_name: str = Field(
default="git",
description="Logical name of the workspace group (e.g. 'git', 'work', 'personal').",
)
# pydantic v2-style config
model_config = SettingsConfigDict(
env_prefix="",
env_file=None,
env_nested_delimiter="__",
extra="ignore",
)
@classmethod
def from_env_and_override(
cls, override_workspaces_name: str | None
) -> Settings:
"""Construct settings honoring:
1. CLI override
2. WORKSPACES_NAME env
3. default "git"
"""
s = cls()
if override_workspaces_name is not None:
s.workspaces_name = override_workspaces_name
env_val = os.getenv("WORKSPACES_NAME")
if env_val and override_workspaces_name is None:
s.workspaces_name = env_val
return s
def resolve_paths(workspaces_name: str | None) -> tuple[Settings, Path, Path]:
"""Build Settings and derived paths, honoring CLI override of workspaces_name.
"""
base_settings = Settings.from_env_and_override(workspaces_name)
name = base_settings.workspaces_name
repos_dir = Path(f"~/{name}").expanduser().resolve()
workspaces_dir = Path(f"~/{name}.workspaces").expanduser().resolve()
return base_settings, repos_dir, workspaces_dir
# ---------------------------------------------------------------------------
# Models / helpers
# ---------------------------------------------------------------------------
@dataclass
class GitStatus:
ahead: int = 0
behind: int = 0
dirty: bool = False
@property
def indicator(self) -> str:
"""Build ASCII indicator:
- clean: "·"
- ahead 1: "↑1"
- behind 2: "↓2"
- both ahead/behind: "↑1 ↓2"
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
"""
parts: list[str] = []
if self.ahead:
parts.append(f"{self.ahead}")
if self.behind:
parts.append(f"{self.behind}")
base = " ".join(parts) if parts else "·"
if self.dirty:
base += "*"
return base
def get_git_status(repo_path: Path) -> GitStatus:
"""Get ahead/behind and dirty info for a Git repo.
Uses `git status --porcelain=v2 --branch` and parses:
- '# branch.ab +A -B' for ahead/behind
- any non-comment line for dirty
"""
try:
out = subprocess.check_output(
["git", "status", "--porcelain=v2", "--branch"],
cwd=repo_path,
stderr=subprocess.DEVNULL,
text=True,
)
except Exception:
return GitStatus()
ahead = 0
behind = 0
dirty = False
for line in out.splitlines():
if line.startswith("# branch.ab"):
# Example:
# branch.ab +1 -2
m = re.search(r"\+(\d+)\s+-(\d+)", line)
if m:
ahead = int(m.group(1))
behind = int(m.group(2))
elif not line.startswith("#"):
dirty = True
return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
def get_current_branch(repo_path: Path) -> str | None:
try:
out = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
cwd=repo_path,
stderr=subprocess.DEVNULL,
text=True,
)
return out.strip()
except Exception:
return None
def ensure_git_repo(path: Path) -> bool:
# Works for repos and worktrees (.git file or dir)
return (path / ".git").exists()
def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
proc = subprocess.Popen(
cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
out, err = proc.communicate()
return proc.returncode, out, err
def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path:
"""Resolve the directory of a workspace.
- If workspace_name given: use workspaces_dir / workspace_name
- Else: use current working directory (must be inside workspaces_dir).
"""
if workspace_name:
return workspaces_dir / workspace_name
cwd = Path.cwd().resolve()
try:
cwd.relative_to(workspaces_dir)
except ValueError as e:
workspace_root = pick_workspace_with_iterfzf(list_workspaces(workspaces_dir))
if workspace_root is None:
console.print("[red]No workspace selected. Exiting.[/red]")
raise typer.Exit(code=1) from e
return workspace_root
# The top-level workspace directory is the first component under workspaces_dir
rel = cwd.relative_to(workspaces_dir)
workspace_root = workspaces_dir / rel.parts[0]
console.print(f"Using workspace: {workspace_root}")
return workspace_root
def read_workspace_readme(ws_dir: Path) -> tuple[str, str]:
"""Return (name_from_h1, description_from_rest_of_file).
If file missing or malformed, fallback appropriately.
"""
readme = ws_dir / "readme.md"
if not readme.exists():
name = ws_dir.name
return name, ""
text = readme.read_text(encoding="utf-8")
lines = text.splitlines()
if not lines:
return ws_dir.name, ""
# First non-empty line must be '# ...' per spec
first_non_empty_idx = next(
(i for i, line in enumerate(lines) if line.strip()), None
)
if first_non_empty_idx is None:
return ws_dir.name, ""
first_line = lines[first_non_empty_idx].strip()
if not first_line.startswith("# "):
# Fallback
return ws_dir.name, "\n".join(lines[first_non_empty_idx + 1 :]).strip()
name = first_line[2:].strip()
desc = "\n".join(lines[first_non_empty_idx + 1 :]).strip()
return name, desc
def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
ws_dir.mkdir(parents=True, exist_ok=True)
content = f"# {name}\n\n{description}\n"
(ws_dir / "readme.md").write_text(content, encoding="utf-8")
# --- name/branch helpers ----------------------------------------------------
def slugify_workspace_name(name: str) -> str:
"""Turn arbitrary workspace name into a safe directory/worktree name.
- Lowercase
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
- Replace spaces with '-'
- Replace any char not [a-z0-9._/-] with '-'
- Collapse multiple '-' and strip leading/trailing '-' and '/'
"""
name = name.replace(":", "-")
name = name.replace(" ", "-")
safe = []
for ch in name:
if re.match(r"[a-zA-Z0-9._/-]", ch):
safe.append(ch.lower())
else:
safe.append("-")
s = "".join(safe)
s = re.sub(r"-+", "-", s)
s = s.strip("-/")
return s or "workspace"
def branch_name_for_workspace(name: str) -> str:
"""Compute branch name from workspace name.
Rules:
- Start from slugified name (no spaces/specials).
- Replace *first* '-' with '/' so:
'fix-my-issue' -> 'fix/my-issue'
(original 'fix:my issue' -> slug 'fix-my-issue' -> branch 'fix/my-issue')
"""
base = slugify_workspace_name(name)
if "-" in base:
return base.replace("-", "/", 1)
return base
# ---------------------------------------------------------------------------
# Commands / main callback
# ---------------------------------------------------------------------------
@app.callback()
def main(
ctx: typer.Context,
workspaces_name: str | None = typer.Option(
None,
"--workspaces-name",
"-W",
help=(
"Logical name for this workspace set (e.g. 'git', 'work', 'personal'). "
"Overrides WORKSPACES_NAME env. Defaults to 'git'."
),
),
):
"""Manage workspaces and associated Git worktrees.
If no command is given, this will list workspaces.
"""
settings, repos_dir, workspaces_dir = resolve_paths(workspaces_name)
ctx.obj = {
"settings": settings,
"repos_dir": repos_dir,
"workspaces_dir": workspaces_dir,
}
# Default behavior when no subcommand is provided:
if ctx.invoked_subcommand is None:
cli_list_workspaces(ctx)
raise typer.Exit(0)
def get_ctx_paths(ctx: typer.Context) -> tuple[Settings, Path, Path]:
obj = ctx.obj or {}
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
# ---------------- list-workspaces ----------------
def list_workspaces(workspaces_dir: Path):
workspaces_dir.mkdir(parents=True, exist_ok=True)
return sorted(p for p in workspaces_dir.iterdir() if p.is_dir())
@app.command("list")
@app.command("ls", hidden=True)
def cli_list_workspaces(
ctx: typer.Context,
):
"""List all workspaces.
Shows:
- workspace directory name
- workspace description (from README markdown, everything after H1)
- included repos with git status indicators
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
workspaces_dir.mkdir(parents=True, exist_ok=True)
table = Table(title=f"Workspaces ({workspaces_dir})")
table.add_column("Workspace (dir)", style="bold")
table.add_column("Title", overflow="fold")
table.add_column("Description", overflow="fold")
table.add_column("Repos", overflow="fold")
if not workspaces_dir.exists():
console.print(f"[yellow]No workspaces_dir found at {workspaces_dir}[/yellow]")
raise typer.Exit(0)
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
title, desc = read_workspace_readme(ws)
repos: list[str] = []
for child in sorted(p for p in ws.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
status = get_git_status(child)
branch = get_current_branch(child) or "?"
indicator = status.indicator
repos.append(f"{child.name} [{branch}] {indicator}")
repos_str = "\n".join(repos) if repos else "-"
table.add_row(ws.name, title or "-", desc or "-", repos_str)
console.print(table)
# ---------------- create-workspace ----------------
@app.command("create")
@app.command("new", hidden=True)
def create_workspace(
ctx: typer.Context,
name: str | None = typer.Option(
None,
"--name",
"-n",
help="Name of the new workspace (display name; can contain spaces).",
),
description: str | None = typer.Option(
None,
"--description",
"-d",
help="Description of the workspace. Will be written into readme.md.",
),
):
"""Create a new workspace.
- Asks for name and description if not provided.
- Workspace directory uses a slugified version of the name.
- README uses the original name as '# <name>\\n\\n<description>'.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
workspaces_dir.mkdir(parents=True, exist_ok=True)
if not name:
name = Prompt.ask("Workspace name (can contain spaces)")
if not description:
description = Prompt.ask("Workspace description", default="")
dir_name = slugify_workspace_name(name)
ws_dir = workspaces_dir / dir_name
if ws_dir.exists():
console.print(
f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]"
)
raise typer.Exit(1)
write_workspace_readme(ws_dir, name, description)
console.print(
f"[green]Created workspace[/green] title='{name}' dir='{ws_dir.name}' at {ws_dir}"
)
# ---------------- list-repos ----------------
@app.command("list-repos")
def list_repos(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to inspect. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""List repos and branches in the current (or specified) workspace.
Shows:
- repo directory name
- current branch
- ahead/behind/dirty indicators
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
table = Table(title=f"Repos in workspace '{title}' (dir: {ws_dir.name})")
table.add_column("Repo (dir)", style="bold")
table.add_column("Branch")
table.add_column("Status")
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
branch = get_current_branch(child) or "?"
status = get_git_status(child)
table.add_row(child.name, branch, status.indicator)
console.print(table)
# ---------------- add-repo ----------------
def list_all_repos(repos_dir: Path) -> list[Path]:
"""List all directories in repos_dir that appear to be git repos.
"""
if not repos_dir.exists():
return []
repos = []
for p in sorted(repos_dir.iterdir()):
if p.is_dir() and ensure_git_repo(p):
repos.append(p)
return repos
def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None:
"""Use iterfzf (Python library) to pick a repo from a list of paths.
"""
if not repos:
return None
names = [r.name for r in repos]
choice = iterfzf(names, prompt="pick a repo> ")
if not choice:
return None
for r in repos:
if r.name == choice:
return r
return None
def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None:
"""Use iterfzf (Python library) to pick a workspace from a list of paths.
"""
names = [w.name for w in workspaces]
if not names:
return None
choice = iterfzf(names, prompt="pick a workspace> ")
if not choice:
return None
for w in workspaces:
if w.name == choice:
return w
return None
@app.command("add-repo")
@app.command("add", hidden=True)
def add_repo(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to add repo to. "
"If omitted, uses the workspace containing the current directory."
),
),
repo_name: str | None = typer.Option(
None,
"--repo",
"-r",
help=(
"Name of repo (directory under repos_dir). "
"If omitted, uses iterfzf to pick from repos_dir."
),
),
):
"""Add a repo to a workspace.
- Lists all directories in repos_dir as repos.
- Uses iterfzf to pick repo if --repo not given.
- Creates a worktree for a branch derived from the workspace name
into workspace_dir / repo_name.
"""
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
# Use display title from README to derive branch name
title, _desc = read_workspace_readme(ws_dir)
branch = branch_name_for_workspace(title or ws_dir.name)
all_repos = list_all_repos(repos_dir)
if not all_repos:
console.print(f"[red]No git repos found in {repos_dir}[/red]")
raise typer.Exit(1)
repo_path: Path | None = None
if repo_name:
for r in all_repos:
if r.name == repo_name:
repo_path = r
break
if repo_path is None:
console.print(
f"[red]Repo '{repo_name}' not found in {repos_dir}. "
"Use --repo with a valid name or omit to use iterfzf.[/red]"
)
raise typer.Exit(1)
else:
repo_path = pick_repo_with_iterfzf(all_repos)
if repo_path is None:
console.print("[yellow]No repo selected.[/yellow]")
raise typer.Exit(0)
target_dir = ws_dir / repo_path.name
if target_dir.exists():
console.print(
f"[yellow]Directory {target_dir} already exists. "
"Assuming repo already added.[/yellow]"
)
raise typer.Exit(0)
# Ensure branch exists or create it
code, _out, err = run_cmd(["git", "rev-parse", "--verify", branch], cwd=repo_path)
if code != 0:
# create branch from current HEAD
console.print(
f"[yellow]Branch '{branch}' does not exist in {repo_path.name}; creating it.[/yellow]"
)
code, _out, err = run_cmd(["git", "branch", branch], cwd=repo_path)
if code != 0:
console.print(
f"[red]Failed to create branch '{branch}' in {repo_path.name}:[/red]\n{err}"
)
raise typer.Exit(1)
# Create worktree (dir name is safe because repo_path.name is)
ws_dir.mkdir(parents=True, exist_ok=True)
code, _out, err = run_cmd(
["git", "worktree", "add", str(target_dir), branch],
cwd=repo_path,
)
if code != 0:
console.print(
f"[red]Failed to create worktree for repo {repo_path.name} "
f"on branch '{branch}' into {target_dir}:[/red]\n{err}"
)
raise typer.Exit(1)
console.print(
f"[green]Added repo[/green] {repo_path.name} "
f"to workspace [bold]{title or ws_dir.name}[/bold] "
f"(dir: {ws_dir.name}) on branch '{branch}' at {target_dir}"
)
# ---------------- rm-workspace ----------------
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None:
"""Try to find the parent repo for a worktree, assuming it lives in repos_dir
with the same directory name as the worktree.
"""
candidate = repos_dir / worktree_path.name
if candidate.exists() and ensure_git_repo(candidate):
return candidate
return None
def has_unpushed_commits(repo_path: Path) -> bool:
"""Detect if branch has commits not on its upstream.
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
"""
# Check if upstream exists
code, _, _ = run_cmd(
["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
cwd=repo_path,
)
if code != 0:
# No upstream configured; treat as unpushed work.
return True
code, out, _ = run_cmd(
["git", "rev-list", "@{u}..HEAD", "--count"],
cwd=repo_path,
)
if code != 0:
return True
try:
count = int(out.strip() or "0")
except ValueError:
return True
return count > 0
def is_branch_integrated_into_main(
repo_path: Path,
branch: str,
main_ref: str = "origin/main",
) -> bool:
"""Heuristic: is `branch`'s content already in `main_ref`?
Returns True if:
- branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR
- the branch tip's tree hash appears somewhere in main_ref's history
(common for squash merges and cherry-picks).
This does not know about PRs, only Git history.
"""
# Make sure we have latest main (best-effort; ignore fetch errors)
_code, _out, _err = run_cmd(["git", "fetch", "--quiet", "origin"], cwd=repo_path)
# 1) Simple case: branch is ancestor of main_ref
code, _out, _err = run_cmd(
["git", "merge-base", "--is-ancestor", branch, main_ref],
cwd=repo_path,
)
if code == 0:
return True
# 2) Squash / cherry-pick heuristic: same tree exists on main_ref
code, out, err = run_cmd(
["git", "show", "-s", "--format=%T", branch],
cwd=repo_path,
)
if code != 0:
console.print(
f"[red]Failed to get tree for branch {branch} in {repo_path.name}:[/red]\n{err}"
)
return False
branch_tree = out.strip()
if not branch_tree:
return False
code, out, err = run_cmd(
["git", "log", "--format=%T", main_ref],
cwd=repo_path,
)
if code != 0:
console.print(
f"[red]Failed to walk {main_ref} in {repo_path.name}:[/red]\n{err}"
)
return False
main_trees = {line.strip() for line in out.splitlines() if line.strip()}
return branch_tree in main_trees
@app.command("rm")
def remove_workspace(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to remove. "
"If omitted, uses the workspace containing the current directory."
),
),
force: bool = typer.Option(
False,
"--force",
"-f",
help="Force removal even if there is dirty or unpushed work.",
),
main_ref: str = typer.Option(
"origin/main",
"--main-ref",
help="Ref to consider as the integration target (default: origin/main).",
),
):
"""Remove a workspace:
- For each repo worktree in the workspace:
* Check for dirty work or unpushed commits.
* If any found and not --force, abort.
* Otherwise, run 'git worktree remove'.
- Finally, delete the workspace directory.
"""
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
# Collect worktrees (subdirs that look like git repos, excluding readme.md)
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
worktrees.append(child)
if not worktrees:
# No worktrees, just remove workspace dir
if not Confirm.ask(
f"[red]No worktrees found.[/red] Remove empty workspace "
f"[bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})?",
default=False,
):
raise typer.Exit(0)
shutil.rmtree(ws_dir)
console.print(f"[green]Removed workspace[/green] {ws_dir}")
raise typer.Exit(0)
# Check for dirty / unpushed changes
problems: list[str] = []
for wt in worktrees:
status = get_git_status(wt)
branch = get_current_branch(wt) or "?"
repo = find_repo_for_worktree(wt, repos_dir)
integrated = False
if repo is not None and branch != "?":
integrated = is_branch_integrated_into_main(wt, branch, main_ref=main_ref)
if status.dirty:
problems.append(f"{wt.name}: dirty working tree on '{branch}'")
# Only care about "unpushed commits" if the branch is NOT integrated into main.
if repo is not None and branch != "?" and not integrated and has_unpushed_commits(wt):
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
if problems and not force:
console.print(
"[red]Refusing to remove workspace; found dirty or unpushed work:[/red]"
)
for p in problems:
console.print(f" - {p}")
console.print(
"\nUse [bold]--force[/bold] to remove the workspace and its worktrees anyway."
)
raise typer.Exit(1)
if not Confirm.ask(
f"Remove workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name}) "
"and clean up its worktrees?",
default=False,
):
raise typer.Exit(0)
# Remove worktrees via git
for wt in worktrees:
repo = find_repo_for_worktree(wt, repos_dir)
display_name = wt.name
if repo is None:
console.print(
f"[yellow]Could not find parent repo for worktree {display_name}; "
"deleting directory directly.[/yellow]"
)
shutil.rmtree(wt)
continue
console.print(f"Removing worktree for [bold]{display_name}[/bold]...")
args = ["git", "worktree", "remove"]
if force:
args.append("--force")
args.append(str(wt))
code, _out, err = run_cmd(args, cwd=repo)
if code != 0:
console.print(
f"[red]Failed to remove worktree {display_name} via git:[/red]\n{err}"
)
# As a last resort, if force is given, nuke the dir.
if force:
console.print(
f"[yellow]Forcing directory removal of {wt} despite git error.[/yellow]"
)
shutil.rmtree(wt)
elif wt.exists():
# git worktree remove *should* delete the directory; if not, clean up.
shutil.rmtree(wt)
if in_tmux():
session_name = get_session_name(ws_dir, repo)
remove_session_if_exists(session_name)
# Finally, remove the workspace directory itself
shutil.rmtree(ws_dir)
console.print(
f"[green]Removed workspace[/green] title='{title or ws_dir.name}' dir='{ws_dir.name}'"
)
# ---------------- wip (stage + commit) ----------------
def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]:
"""Return a list of (status_code, path) for changes in the repo/worktree.
Uses 'git status --porcelain'.
"""
code, out, err = run_cmd(["git", "status", "--porcelain"], cwd=repo_path)
if code != 0:
console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}")
return []
changes: list[tuple[str, str]] = []
for line in out.splitlines():
if not line.strip():
continue
status = line[:2]
path = line[3:]
changes.append((status, path))
return changes
def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> list[str]:
"""Interactively choose which files to stage for WIP.
- Show list of changed files.
- Ask user:
* stage all
* stage none
* pick some using iterfzf (multi-select)
"""
if not changes:
return []
files = [p for _s, p in changes]
console.print(
Panel(
"\n".join(files),
title=f"Changed files in {repo_path.name}",
subtitle="Use 'all', 'none', or 'pick' when prompted",
)
)
choice = Prompt.ask(
"Stage which files?",
choices=["all", "none", "pick"],
default="all",
)
if choice == "none":
return []
if choice == "all":
return files
# Multi-select via iterfzf
selected = iterfzf(files, multi=True, prompt="pick files> ")
if not selected:
return []
# iterfzf may return a single string or list depending on version;
# normalize to list of strings.
return [selected] if isinstance(selected, str) else list(selected)
@app.command("wip")
def wip_workspace(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to WIP-commit. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""For each repo in the workspace:
- Show list of changed files.
- Ask whether to stage all, none, or pick some files.
- Stage chosen files.
- Make commit: 'wip: <workspace display name>'.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
commit_message = f"wip: {title or ws_dir.name}"
commit_workspace(ctx, workspace, message=commit_message)
@app.command("commit")
def commit_workspace(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to WIP-commit. "
"If omitted, uses the workspace containing the current directory."
),
),
message: str | None = typer.Option(
None,
"--message",
"-m",
help="Commit message to use.",
),
):
"""For each repo in the workspace:
- Show list of changed files.
- Ask whether to stage all, none, or pick some files.
- Stage chosen files.
"""
if not message:
console.print(
"[red]No commit message provided. Exiting.[/red]",
)
raise typer.Exit(1)
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
worktrees.append(child)
if not worktrees:
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
raise typer.Exit(0)
for wt in worktrees:
console.print(f"\n[bold]Repo:[/bold] {wt.name}")
changes = git_status_porcelain(wt)
if not changes:
console.print(" [green]No changes.[/green]")
continue
files_to_stage = choose_files_for_wip(wt, changes)
if not files_to_stage:
console.print(" [yellow]Nothing selected to stage.[/yellow]")
continue
# Stage selected files
for f in files_to_stage:
code, _out, err = run_cmd(["git", "add", f], cwd=wt)
if code != 0:
console.print(f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}")
# Check if there is anything to commit
code, out, _err = run_cmd(["git", "diff", "--cached", "--name-only"], cwd=wt)
if code != 0 or not out.strip():
console.print(" [yellow]No staged changes to commit.[/yellow]")
continue
# Commit
code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt)
if code != 0:
console.print(f"[red]Failed to commit in {wt.name}:[/red]\n{err}")
else:
console.print(f" [green]Created commit in {wt.name}:[/green] '{message}'")
# ---------------- push ----------------
@app.command("push")
def push_workspace(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to push. "
"If omitted, uses the workspace containing the current directory."
),
),
remote: str = typer.Option(
"origin",
"--remote",
"-r",
help="Remote name to push to (default: origin).",
),
):
"""For each repo in the workspace, run 'git push <remote> <current-branch>'.
Skips repos with no current branch or when push fails.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
console.print(
f"Pushing all repos in workspace [bold]{title or ws_dir.name}[/bold] "
f"(dir: {ws_dir.name}) to remote '{remote}'"
)
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
worktrees.append(child)
if not worktrees:
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
raise typer.Exit(0)
for wt in worktrees:
branch = get_current_branch(wt)
if not branch or branch == "HEAD":
console.print(
f"[yellow]Skipping {wt.name}: no current branch (detached HEAD?).[/yellow]"
)
continue
console.print(f"\n[bold]Repo:[/bold] {wt.name} [bold]Branch:[/bold] {branch}")
code, _out, err = run_cmd(["git", "push", remote, branch], cwd=wt)
if code != 0:
console.print(
f"[red]Failed to push {wt.name} ({branch}) to {remote}:[/red]\n{err}"
)
else:
console.print(f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]")
# ---------------- status (current workspace) ----------------
@app.command("status")
def status_workspace(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""Show detailed status for the current (or specified) workspace.
For each repo in the workspace:
- repo directory name
- branch
- ahead/behind/dirty indicator
- number of changed files
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, desc = read_workspace_readme(ws_dir)
table = Table(
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
title_justify="left",
show_lines=False,
)
table.add_column("Repo (dir)", style="bold")
table.add_column("Branch")
table.add_column("Ahead/Behind/Dirty")
table.add_column("Changed Files", justify="right")
files_table = Table(show_lines=False)
files_table.add_column("Repo (dir)", style="bold")
files_table.add_column("File", justify="left")
files_table.add_column("Status", justify="right")
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
worktrees.append(child)
if not worktrees:
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
raise typer.Exit(0)
for wt in worktrees:
branch = get_current_branch(wt) or "?"
status = get_git_status(wt)
changes = git_status_porcelain(wt)
table.add_row(
wt.name,
branch,
status.indicator,
str(len(changes)),
)
if changes:
for f, s in changes:
files_table.add_row(wt.name, f, s)
console.print(table)
console.print("\nFiles with changes:")
if files_table.rows:
console.print(files_table)
@app.command("diff")
def diff_workspace(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to diff. "
"If omitted, uses the workspace containing the current directory."
),
),
staged: bool = typer.Option(
False,
"--staged",
"-s",
help="Show only staged changes (git diff --cached).",
),
):
"""Show git diff for all repos in the workspace.
Uses rich Syntax highlighter for diffs.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir.exists():
console.print(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
console.print(
f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})"
)
# Collect repos
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
worktrees.append(child)
if not worktrees:
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
raise typer.Exit(0)
any_diffs = False
for wt in worktrees:
# Skip clean repos
changes = git_status_porcelain(wt)
if not changes:
continue
any_diffs = True
console.rule(f"[bold]{wt.name}[/bold]")
cmd = ["git", "diff"]
if staged:
cmd.append("--cached")
code, out, err = run_cmd(cmd, cwd=wt)
if code != 0:
console.print(f"[red]Failed to get diff for {wt.name}:[/red]\n{err}")
continue
if not out.strip():
console.print("[dim]No diff output.[/dim]")
continue
syntax = Syntax(
out,
"diff",
theme="nord",
line_numbers=False,
word_wrap=False,
)
# Wrap each repo diff in a Panel for clarity
console.print(Panel(syntax, title=wt.name, border_style="cyan"))
if not any_diffs:
console.print("[green]No changes to diff in any repo.[/green]")
def in_tmux() -> bool:
"""Return True if inside tmux"""
return "TMUX" in os.environ
def not_in_tmux() -> bool:
"""Return True if not inside tmux"""
return not in_tmux()
def run_tmux(
args: list[str], *, clear_tmux_env: bool = False
) -> subprocess.CompletedProcess:
"""Run a tmux command."""
env = os.environ.copy()
if clear_tmux_env:
env["TMUX"] = ""
return subprocess.run(
["tmux", *args],
env=env,
check=False,
capture_output=True,
)
def get_tmux_sessions() -> list[str]:
"""List all tmux sessions."""
result = run_tmux(["list-sessions", "-F", "#{session_name}"])
return result.stdout.decode().splitlines()
def session_exists(session_name: str) -> bool:
"""Check if the tmux session exists."""
result = run_tmux(["has-session", "-t", f"={session_name}"])
return result.returncode == 0
def pick_project(base_dir: Path) -> str | None:
"""Use fzf to pick a subdirectory (project) from base_dir."""
if not base_dir.is_dir():
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
return None
subdirs = sorted([p.name for p in base_dir.iterdir() if p.is_dir()])
if not subdirs:
typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True)
return None
if not shutil.which("fzf"):
typer.echo("[ta] fzf not found in PATH.", err=True)
return None
proc = subprocess.run(
["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"],
input="\n".join(subdirs),
text=True,
check=False,
capture_output=True,
)
if proc.returncode != 0:
# Cancelled or failed
return None
choice = proc.stdout.strip()
return choice or None
def create_detached_session(
session_name: str,
path_name: Path,
start_mode: bool,
) -> bool:
"""Create a detached session.
- If start_mode: just a single window.
- Else: split layout with nvim on top.
"""
# Run tmux as if not in tmux (clear TMUX env)
if start_mode:
r = run_tmux(
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
clear_tmux_env=True,
)
return r.returncode == 0
r = run_tmux(
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
clear_tmux_env=True,
)
if r.returncode != 0:
return False
r = run_tmux(
[
"split-window",
"-vb",
"-t",
session_name,
"-c",
str(path_name),
"-p",
"70",
],
clear_tmux_env=True,
)
if r.returncode != 0:
return False
r = run_tmux(
[
"send-keys",
"-t",
session_name,
"nvim '+Telescope find_files'",
"Enter",
],
clear_tmux_env=True,
)
return r.returncode == 0
def create_if_needed_and_attach(
session_name: str,
path_name: Path,
start_mode: bool,
) -> bool:
"""Mimic the bash logic:
- If not in tmux: `tmux new-session -As ... -c path_name`
- Else:
- If session doesn't exist: create_detached_session()
- Then `tmux switch-client -t session_name`
"""
session_name = session_name.strip().replace(" ", "_").replace(':', '_').replace('.', '_').replace('/', '_')
if not_in_tmux():
r = run_tmux(
["new-session", "-As", session_name, "-c", str(path_name)],
)
return r.returncode == 0
# Inside tmux
if not session_exists(session_name) and not create_detached_session(session_name, path_name, start_mode):
return False
r = run_tmux(["switch-client", "-t", session_name])
return r.returncode == 0
def attach_to_first_session() -> None:
"""Fallback: attach to first tmux session and open choose-tree."""
# Attach to the first listed session
list_proc = subprocess.run(
["tmux", "list-sessions", "-F", "#{session_name}"],
text=True,
check=False,
capture_output=True,
)
if list_proc.returncode != 0 or not list_proc.stdout.strip():
typer.echo("[ta] No tmux sessions found to attach to.", err=True)
raise typer.Exit(1)
first_session = list_proc.stdout.strip().splitlines()[0]
attach_proc = run_tmux(["attach-session", "-t", first_session])
if attach_proc.returncode != 0:
raise typer.Exit(attach_proc.returncode)
# After attach, show choose-tree (this will run in the attached session)
run_tmux(["choose-tree", "-Za"])
def remove_session_if_exists(session_name: str) -> None:
r = run_tmux(["has-session", "-t", session_name])
if r.returncode == 0:
remove_session(session_name)
def remove_session(session_name: str) -> None:
r = run_tmux(["kill-session", "-t", session_name])
if r.returncode != 0:
raise typer.Exit(r.returncode)
def get_session_name(ws_dir: Path, repo: Path):
session_name = f"ω|{ws_dir.name}|{repo.name}"
session_name = session_name.strip().replace(" ", "_").replace(':', '_').replace('.', '_').replace('/', '_')
return session_name
return result.stdout.decode().strip()
tmux_app = typer.Typer(
name="tmux",
help="tmux commands",
add_completion=False,
)
@tmux_app.command("attach")
def tmux_cli_attach(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""
Attach or create a tmux session for a repo in a workspace.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
workspace = ws_dir.name
if not ws_dir:
console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
# pick repo in workspace
repos = [directory for directory in ws_dir.iterdir() if directory.is_dir()]
if not repos:
console.print("[red]No repos found in workspace. Add One.[/red]")
add_repo(ctx=ctx, workspace=workspace, repo_name=None)
repos = [directory for directory in ws_dir.iterdir() if directory.is_dir()]
repo = pick_repo_with_iterfzf(repos)
if not repo:
console.print("[red]No repo selected. Exiting.[/red]")
raise typer.Exit(1)
session_name = get_session_name(ws_dir, repo)
console.print(f"Session name: {session_name}")
create_if_needed_and_attach(session_name, repo, False)
@tmux_app.command("list-sessions")
def tmux_cli_list_sessions(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""
List tmux workspace sessions.
"""
if not_in_tmux():
console.print("[red]Not in tmux. Exiting.[/red]")
raise typer.Exit(1)
if not workspace:
tmux_sessions = [session for session in get_tmux_sessions() if "|" in session]
console.print(tmux_sessions)
return
tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith(workspace)
]
console.print(tmux_sessions)
@tmux_app.command("remove")
def tmux_cli_remove(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
dry_run: bool = typer.Option(
False,
"--dry-run",
"-d",
help="Don't actually remove the session.",
),
):
"""
Remove tmux workspace sessions.
"""
if not_in_tmux():
console.print("[red]Not in tmux. Exiting.[/red]")
raise typer.Exit(1)
if workspace:
tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith(workspace)
]
else:
tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith("ω")
]
if not dry_run and not Confirm.ask(
f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n",
default=False,
):
raise typer.Exit(1)
if dry_run:
console.print(tmux_sessions)
return
for session_name in tmux_sessions:
remove_session(session_name)
app.add_typer(tmux_app)
@app.command("attach")
def attach(
ctx: typer.Context,
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""
Attach to a workspace.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
if len(list_workspaces(workspaces_dir)) == 0:
import sys
cmd = sys.executable
console.print("[red]No workspaces found. Exiting.[/red]")
console.print(f"Create a workspace with `[yellow]workspaces create[/yellow]` first.")
raise typer.Exit(1)
if in_tmux():
tmux_cli_attach(ctx, workspace)
if __name__ == "__main__":
app()