1427 lines
42 KiB
Python
Executable file
1427 lines
42 KiB
Python
Executable file
#!/usr/bin/env -S uv run --quiet --script
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = [
|
|
# "typer",
|
|
# "rich",
|
|
# "pydantic",
|
|
# "pydantic-settings",
|
|
# "iterfzf",
|
|
# ]
|
|
# ///
|
|
|
|
from __future__ import annotations
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import List, Optional, Tuple
|
|
|
|
|
|
import typer
|
|
|
|
from pydantic import Field
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.prompt import Prompt, Confirm
|
|
from rich.panel import Panel
|
|
from iterfzf import iterfzf
|
|
|
|
app = typer.Typer(
|
|
help="Workspace management tool",
|
|
invoke_without_command=True,
|
|
)
|
|
console = Console()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Settings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
"""
|
|
Global configuration for workspaces.
|
|
|
|
Resolution for workspaces_name:
|
|
1. Command-line flag --workspaces-name
|
|
2. Environment variable WORKSPACES_NAME
|
|
3. Default "git"
|
|
|
|
repos_dir = ~/workspaces_name
|
|
workspaces_dir = ~/workspaces_name + ".workspaces"
|
|
"""
|
|
|
|
workspaces_name: str = Field(
|
|
default="git",
|
|
description="Logical name of the workspace group (e.g. 'git', 'work', 'personal').",
|
|
)
|
|
|
|
# pydantic v2-style config
|
|
model_config = SettingsConfigDict(
|
|
env_prefix="",
|
|
env_file=None,
|
|
env_nested_delimiter="__",
|
|
extra="ignore",
|
|
)
|
|
|
|
@classmethod
|
|
def from_env_and_override(cls, override_workspaces_name: Optional[str]) -> "Settings":
|
|
"""
|
|
Construct settings honoring:
|
|
1. CLI override
|
|
2. WORKSPACES_NAME env
|
|
3. default "git"
|
|
"""
|
|
s = cls()
|
|
if override_workspaces_name is not None:
|
|
s.workspaces_name = override_workspaces_name
|
|
env_val = os.getenv("WORKSPACES_NAME")
|
|
if env_val and override_workspaces_name is None:
|
|
s.workspaces_name = env_val
|
|
return s
|
|
|
|
|
|
def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]:
|
|
"""
|
|
Build Settings and derived paths, honoring CLI override of workspaces_name.
|
|
"""
|
|
base_settings = Settings.from_env_and_override(workspaces_name)
|
|
name = base_settings.workspaces_name
|
|
repos_dir = Path(os.path.expanduser(f"~/{name}")).resolve()
|
|
workspaces_dir = Path(os.path.expanduser(f"~/{name}.workspaces")).resolve()
|
|
return base_settings, repos_dir, workspaces_dir
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Models / helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class GitStatus:
|
|
ahead: int = 0
|
|
behind: int = 0
|
|
dirty: bool = False
|
|
|
|
@property
|
|
def indicator(self) -> str:
|
|
"""
|
|
Build ASCII indicator:
|
|
- clean: "·"
|
|
- ahead 1: "↑1"
|
|
- behind 2: "↓2"
|
|
- both ahead/behind: "↑1 ↓2"
|
|
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
|
|
"""
|
|
parts: List[str] = []
|
|
if self.ahead:
|
|
parts.append(f"↑{self.ahead}")
|
|
if self.behind:
|
|
parts.append(f"↓{self.behind}")
|
|
base = " ".join(parts) if parts else "·"
|
|
if self.dirty:
|
|
base += "*"
|
|
return base
|
|
|
|
|
|
def get_git_status(repo_path: Path) -> GitStatus:
|
|
"""
|
|
Get ahead/behind and dirty info for a Git repo.
|
|
|
|
Uses `git status --porcelain=v2 --branch` and parses:
|
|
- '# branch.ab +A -B' for ahead/behind
|
|
- any non-comment line for dirty
|
|
"""
|
|
try:
|
|
out = subprocess.check_output(
|
|
["git", "status", "--porcelain=v2", "--branch"],
|
|
cwd=repo_path,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
)
|
|
except Exception:
|
|
return GitStatus()
|
|
|
|
ahead = 0
|
|
behind = 0
|
|
dirty = False
|
|
|
|
for line in out.splitlines():
|
|
if line.startswith("# branch.ab"):
|
|
# Example: "# branch.ab +1 -2"
|
|
m = re.search(r"\+(\d+)\s+-(\d+)", line)
|
|
if m:
|
|
ahead = int(m.group(1))
|
|
behind = int(m.group(2))
|
|
elif not line.startswith("#"):
|
|
dirty = True
|
|
|
|
return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
|
|
|
|
|
|
def get_current_branch(repo_path: Path) -> Optional[str]:
|
|
try:
|
|
out = subprocess.check_output(
|
|
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
|
cwd=repo_path,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
)
|
|
return out.strip()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def ensure_git_repo(path: Path) -> bool:
|
|
# Works for repos and worktrees (.git file or dir)
|
|
return (path / ".git").exists()
|
|
|
|
|
|
def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
cwd=cwd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
out, err = proc.communicate()
|
|
return proc.returncode, out, err
|
|
|
|
|
|
def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path:
|
|
"""
|
|
Resolve the directory of a workspace.
|
|
|
|
- If workspace_name given: use workspaces_dir / workspace_name
|
|
- Else: use current working directory (must be inside workspaces_dir).
|
|
"""
|
|
if workspace_name:
|
|
return workspaces_dir / workspace_name
|
|
|
|
cwd = Path.cwd().resolve()
|
|
try:
|
|
cwd.relative_to(workspaces_dir)
|
|
except ValueError:
|
|
console.print(
|
|
f"[red]Not inside workspaces_dir ({workspaces_dir}). "
|
|
"Please use --workspace to specify a workspace.[/red]"
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
# The top-level workspace directory is the first component under workspaces_dir
|
|
rel = cwd.relative_to(workspaces_dir)
|
|
workspace_root = workspaces_dir / rel.parts[0]
|
|
return workspace_root
|
|
|
|
|
|
def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
|
|
"""
|
|
Return (name_from_h1, description_from_rest_of_file).
|
|
If file missing or malformed, fallback appropriately.
|
|
"""
|
|
readme = ws_dir / "readme.md"
|
|
if not readme.exists():
|
|
name = ws_dir.name
|
|
return name, ""
|
|
|
|
text = readme.read_text(encoding="utf-8")
|
|
lines = text.splitlines()
|
|
|
|
if not lines:
|
|
return ws_dir.name, ""
|
|
|
|
# First non-empty line must be '# ...' per spec
|
|
first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None)
|
|
if first_non_empty_idx is None:
|
|
return ws_dir.name, ""
|
|
|
|
first_line = lines[first_non_empty_idx].strip()
|
|
if not first_line.startswith("# "):
|
|
# Fallback
|
|
return ws_dir.name, "\n".join(lines[first_non_empty_idx + 1 :]).strip()
|
|
|
|
name = first_line[2:].strip()
|
|
desc = "\n".join(lines[first_non_empty_idx + 1 :]).strip()
|
|
return name, desc
|
|
|
|
|
|
def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
|
|
ws_dir.mkdir(parents=True, exist_ok=True)
|
|
content = f"# {name}\n\n{description}\n"
|
|
(ws_dir / "readme.md").write_text(content, encoding="utf-8")
|
|
|
|
|
|
# --- name/branch helpers ----------------------------------------------------
|
|
|
|
|
|
def slugify_workspace_name(name: str) -> str:
|
|
"""
|
|
Turn arbitrary workspace name into a safe directory/worktree name.
|
|
|
|
- Lowercase
|
|
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
|
|
- Replace spaces with '-'
|
|
- Replace any char not [a-z0-9._/-] with '-'
|
|
- Collapse multiple '-' and strip leading/trailing '-' and '/'
|
|
"""
|
|
name = name.replace(":", "-")
|
|
name = name.replace(" ", "-")
|
|
safe = []
|
|
for ch in name:
|
|
if re.match(r"[a-zA-Z0-9._/-]", ch):
|
|
safe.append(ch.lower())
|
|
else:
|
|
safe.append("-")
|
|
s = "".join(safe)
|
|
s = re.sub(r"-+", "-", s)
|
|
s = s.strip("-/")
|
|
return s or "workspace"
|
|
|
|
|
|
def branch_name_for_workspace(name: str) -> str:
|
|
"""
|
|
Compute branch name from workspace name.
|
|
|
|
Rules:
|
|
- Start from slugified name (no spaces/specials).
|
|
- Replace *first* '-' with '/' so:
|
|
'fix-my-issue' -> 'fix/my-issue'
|
|
(original 'fix:my issue' -> slug 'fix-my-issue' -> branch 'fix/my-issue')
|
|
"""
|
|
base = slugify_workspace_name(name)
|
|
if "-" in base:
|
|
return base.replace("-", "/", 1)
|
|
return base
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Commands / main callback
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.callback()
|
|
def main(
|
|
ctx: typer.Context,
|
|
workspaces_name: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspaces-name",
|
|
"-W",
|
|
help=(
|
|
"Logical name for this workspace set (e.g. 'git', 'work', 'personal'). "
|
|
"Overrides WORKSPACES_NAME env. Defaults to 'git'."
|
|
),
|
|
),
|
|
):
|
|
"""
|
|
Manage workspaces and associated Git worktrees.
|
|
|
|
If no command is given, this will list workspaces.
|
|
"""
|
|
settings, repos_dir, workspaces_dir = resolve_paths(workspaces_name)
|
|
ctx.obj = {
|
|
"settings": settings,
|
|
"repos_dir": repos_dir,
|
|
"workspaces_dir": workspaces_dir,
|
|
}
|
|
|
|
# Default behavior when no subcommand is provided:
|
|
if ctx.invoked_subcommand is None:
|
|
list_workspaces(ctx)
|
|
raise typer.Exit(0)
|
|
|
|
|
|
def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]:
|
|
obj = ctx.obj or {}
|
|
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
|
|
|
|
|
|
# ---------------- list-workspaces ----------------
|
|
|
|
|
|
@app.command("list")
|
|
def list_workspaces(
|
|
ctx: typer.Context,
|
|
):
|
|
"""
|
|
List all workspaces.
|
|
|
|
Shows:
|
|
- workspace directory name
|
|
- workspace description (from README markdown, everything after H1)
|
|
- included repos with git status indicators
|
|
"""
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
workspaces_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
table = Table(title=f"Workspaces ({workspaces_dir})")
|
|
table.add_column("Workspace (dir)", style="bold")
|
|
table.add_column("Title", overflow="fold")
|
|
table.add_column("Description", overflow="fold")
|
|
table.add_column("Repos", overflow="fold")
|
|
|
|
if not workspaces_dir.exists():
|
|
console.print(f"[yellow]No workspaces_dir found at {workspaces_dir}[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
|
|
title, desc = read_workspace_readme(ws)
|
|
repos: List[str] = []
|
|
for child in sorted(p for p in ws.iterdir() if p.is_dir()):
|
|
if child.name.lower() == "readme.md":
|
|
continue
|
|
if not ensure_git_repo(child):
|
|
continue
|
|
status = get_git_status(child)
|
|
branch = get_current_branch(child) or "?"
|
|
indicator = status.indicator
|
|
repos.append(f"{child.name} [{branch}] {indicator}")
|
|
repos_str = "\n".join(repos) if repos else "-"
|
|
table.add_row(ws.name, title or "-", desc or "-", repos_str)
|
|
|
|
console.print(table)
|
|
|
|
|
|
# ---------------- create-workspace ----------------
|
|
|
|
|
|
@app.command("create")
|
|
@app.command("new", hidden=True)
|
|
def create_workspace(
|
|
ctx: typer.Context,
|
|
name: Optional[str] = typer.Option(
|
|
None,
|
|
"--name",
|
|
"-n",
|
|
help="Name of the new workspace (display name; can contain spaces).",
|
|
),
|
|
description: Optional[str] = typer.Option(
|
|
None,
|
|
"--description",
|
|
"-d",
|
|
help="Description of the workspace. Will be written into readme.md.",
|
|
),
|
|
):
|
|
"""
|
|
Create a new workspace.
|
|
|
|
- Asks for name and description if not provided.
|
|
- Workspace directory uses a slugified version of the name.
|
|
- README uses the original name as '# <name>\\n\\n<description>'.
|
|
"""
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
workspaces_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if not name:
|
|
name = Prompt.ask("Workspace name (can contain spaces)")
|
|
|
|
if not description:
|
|
description = Prompt.ask("Workspace description", default="")
|
|
|
|
dir_name = slugify_workspace_name(name)
|
|
ws_dir = workspaces_dir / dir_name
|
|
if ws_dir.exists():
|
|
console.print(f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
write_workspace_readme(ws_dir, name, description)
|
|
console.print(
|
|
f"[green]Created workspace[/green] title='{name}' dir='{ws_dir.name}' at {ws_dir}"
|
|
)
|
|
|
|
|
|
# ---------------- list-repos ----------------
|
|
|
|
|
|
@app.command("list-repos")
|
|
def list_repos(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to inspect. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
):
|
|
"""
|
|
List repos and branches in the current (or specified) workspace.
|
|
|
|
Shows:
|
|
- repo directory name
|
|
- current branch
|
|
- ahead/behind/dirty indicators
|
|
"""
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
title, _desc = read_workspace_readme(ws_dir)
|
|
table = Table(title=f"Repos in workspace '{title}' (dir: {ws_dir.name})")
|
|
table.add_column("Repo (dir)", style="bold")
|
|
table.add_column("Branch")
|
|
table.add_column("Status")
|
|
|
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
|
if child.name.lower() == "readme.md":
|
|
continue
|
|
if not ensure_git_repo(child):
|
|
continue
|
|
branch = get_current_branch(child) or "?"
|
|
status = get_git_status(child)
|
|
table.add_row(child.name, branch, status.indicator)
|
|
|
|
console.print(table)
|
|
|
|
|
|
# ---------------- add-repo ----------------
|
|
|
|
|
|
def list_all_repos(repos_dir: Path) -> List[Path]:
|
|
"""
|
|
List all directories in repos_dir that appear to be git repos.
|
|
"""
|
|
if not repos_dir.exists():
|
|
return []
|
|
repos = []
|
|
for p in sorted(repos_dir.iterdir()):
|
|
if p.is_dir() and ensure_git_repo(p):
|
|
repos.append(p)
|
|
return repos
|
|
|
|
|
|
def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
|
|
"""
|
|
Use iterfzf (Python library) to pick a repo from a list of paths.
|
|
"""
|
|
if not repos:
|
|
return None
|
|
|
|
names = [r.name for r in repos]
|
|
choice = iterfzf(names, prompt="pick a repo> ")
|
|
|
|
if not choice:
|
|
return None
|
|
|
|
for r in repos:
|
|
if r.name == choice:
|
|
return r
|
|
return None
|
|
|
|
|
|
@app.command("add-repo")
|
|
@app.command("add", hidden=True)
|
|
def add_repo(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to add repo to. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
repo_name: Optional[str] = typer.Option(
|
|
None,
|
|
"--repo",
|
|
"-r",
|
|
help=(
|
|
"Name of repo (directory under repos_dir). "
|
|
"If omitted, uses iterfzf to pick from repos_dir."
|
|
),
|
|
),
|
|
):
|
|
"""
|
|
Add a repo to a workspace.
|
|
|
|
- Lists all directories in repos_dir as repos.
|
|
- Uses iterfzf to pick repo if --repo not given.
|
|
- Creates a worktree for a branch derived from the workspace name
|
|
into workspace_dir / repo_name.
|
|
"""
|
|
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
# Use display title from README to derive branch name
|
|
title, _desc = read_workspace_readme(ws_dir)
|
|
branch = branch_name_for_workspace(title or ws_dir.name)
|
|
|
|
all_repos = list_all_repos(repos_dir)
|
|
if not all_repos:
|
|
console.print(f"[red]No git repos found in {repos_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
repo_path: Optional[Path] = None
|
|
if repo_name:
|
|
for r in all_repos:
|
|
if r.name == repo_name:
|
|
repo_path = r
|
|
break
|
|
if repo_path is None:
|
|
console.print(
|
|
f"[red]Repo '{repo_name}' not found in {repos_dir}. "
|
|
"Use --repo with a valid name or omit to use iterfzf.[/red]"
|
|
)
|
|
raise typer.Exit(1)
|
|
else:
|
|
repo_path = pick_repo_with_iterfzf(all_repos)
|
|
if repo_path is None:
|
|
console.print("[yellow]No repo selected.[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
target_dir = ws_dir / repo_path.name
|
|
if target_dir.exists():
|
|
console.print(
|
|
f"[yellow]Directory {target_dir} already exists. "
|
|
"Assuming repo already added.[/yellow]"
|
|
)
|
|
raise typer.Exit(0)
|
|
|
|
# Ensure branch exists or create it
|
|
code, _out, err = run_cmd(["git", "rev-parse", "--verify", branch], cwd=repo_path)
|
|
if code != 0:
|
|
# create branch from current HEAD
|
|
console.print(
|
|
f"[yellow]Branch '{branch}' does not exist in {repo_path.name}; creating it.[/yellow]"
|
|
)
|
|
code, _out, err = run_cmd(["git", "branch", branch], cwd=repo_path)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to create branch '{branch}' in {repo_path.name}:[/red]\n{err}"
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
# Create worktree (dir name is safe because repo_path.name is)
|
|
ws_dir.mkdir(parents=True, exist_ok=True)
|
|
code, _out, err = run_cmd(
|
|
["git", "worktree", "add", str(target_dir), branch],
|
|
cwd=repo_path,
|
|
)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to create worktree for repo {repo_path.name} "
|
|
f"on branch '{branch}' into {target_dir}:[/red]\n{err}"
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
console.print(
|
|
f"[green]Added repo[/green] {repo_path.name} "
|
|
f"to workspace [bold]{title or ws_dir.name}[/bold] "
|
|
f"(dir: {ws_dir.name}) on branch '{branch}' at {target_dir}"
|
|
)
|
|
|
|
|
|
# ---------------- rm-workspace ----------------
|
|
|
|
|
|
def find_repo_for_worktree(
|
|
worktree_path: Path, repos_dir: Path
|
|
) -> Optional[Path]:
|
|
"""
|
|
Try to find the parent repo for a worktree, assuming it lives in repos_dir
|
|
with the same directory name as the worktree.
|
|
"""
|
|
candidate = repos_dir / worktree_path.name
|
|
if candidate.exists() and ensure_git_repo(candidate):
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def has_unpushed_commits(repo_path: Path, branch: str) -> bool:
|
|
"""
|
|
Detect if branch has commits not on its upstream.
|
|
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
|
|
"""
|
|
# Check if upstream exists
|
|
code, _, _ = run_cmd(
|
|
["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
|
|
cwd=repo_path,
|
|
)
|
|
if code != 0:
|
|
# No upstream configured; treat as unpushed work.
|
|
return True
|
|
|
|
code, out, _ = run_cmd(
|
|
["git", "rev-list", "@{u}..HEAD", "--count"],
|
|
cwd=repo_path,
|
|
)
|
|
if code != 0:
|
|
return True
|
|
try:
|
|
count = int(out.strip() or "0")
|
|
except ValueError:
|
|
return True
|
|
return count > 0
|
|
|
|
|
|
@app.command("rm")
|
|
def remove_workspace(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to remove. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
force: bool = typer.Option(
|
|
False,
|
|
"--force",
|
|
"-f",
|
|
help="Force removal even if there is dirty or unpushed work.",
|
|
),
|
|
):
|
|
"""
|
|
Remove a workspace:
|
|
|
|
- For each repo worktree in the workspace:
|
|
* Check for dirty work or unpushed commits.
|
|
* If any found and not --force, abort.
|
|
* Otherwise, run 'git worktree remove'.
|
|
- Finally, delete the workspace directory.
|
|
"""
|
|
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
title, _desc = read_workspace_readme(ws_dir)
|
|
|
|
# Collect worktrees (subdirs that look like git repos, excluding readme.md)
|
|
worktrees: List[Path] = []
|
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
|
if child.name.lower() == "readme.md":
|
|
continue
|
|
if not ensure_git_repo(child):
|
|
continue
|
|
worktrees.append(child)
|
|
|
|
if not worktrees:
|
|
# No worktrees, just remove workspace dir
|
|
if not Confirm.ask(
|
|
f"[red]No worktrees found.[/red] Remove empty workspace "
|
|
f"[bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})?",
|
|
default=False,
|
|
):
|
|
raise typer.Exit(0)
|
|
shutil.rmtree(ws_dir)
|
|
console.print(f"[green]Removed workspace[/green] {ws_dir}")
|
|
raise typer.Exit(0)
|
|
|
|
# Check for dirty / unpushed changes
|
|
problems: List[str] = []
|
|
for wt in worktrees:
|
|
status = get_git_status(wt)
|
|
branch = get_current_branch(wt) or "?"
|
|
if status.dirty:
|
|
problems.append(f"{wt.name}: dirty working tree on '{branch}'")
|
|
repo = find_repo_for_worktree(wt, repos_dir)
|
|
if repo is not None and branch != "?":
|
|
if has_unpushed_commits(wt, branch):
|
|
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
|
|
|
|
if problems and not force:
|
|
console.print(
|
|
"[red]Refusing to remove workspace; found dirty or unpushed work:[/red]"
|
|
)
|
|
for p in problems:
|
|
console.print(f" - {p}")
|
|
console.print(
|
|
"\nUse [bold]--force[/bold] to remove the workspace and its worktrees anyway."
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
if not Confirm.ask(
|
|
f"Remove workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name}) "
|
|
"and clean up its worktrees?",
|
|
default=False,
|
|
):
|
|
raise typer.Exit(0)
|
|
|
|
# Remove worktrees via git
|
|
for wt in worktrees:
|
|
repo = find_repo_for_worktree(wt, repos_dir)
|
|
display_name = wt.name
|
|
if repo is None:
|
|
console.print(
|
|
f"[yellow]Could not find parent repo for worktree {display_name}; "
|
|
"deleting directory directly.[/yellow]"
|
|
)
|
|
shutil.rmtree(wt)
|
|
continue
|
|
|
|
console.print(f"Removing worktree for [bold]{display_name}[/bold]...")
|
|
code, _out, err = run_cmd(
|
|
["git", "worktree", "remove", "--force" if force else "--detach", str(wt)],
|
|
cwd=repo,
|
|
)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to remove worktree {display_name} via git:[/red]\n{err}"
|
|
)
|
|
# As a last resort, if force is given, nuke the dir.
|
|
if force:
|
|
console.print(
|
|
f"[yellow]Forcing directory removal of {wt} despite git error.[/yellow]"
|
|
)
|
|
shutil.rmtree(wt)
|
|
elif wt.exists():
|
|
# git worktree remove should delete the directory; if not, clean up.
|
|
shutil.rmtree(wt)
|
|
|
|
# Finally, remove the workspace directory itself
|
|
shutil.rmtree(ws_dir)
|
|
console.print(
|
|
f"[green]Removed workspace[/green] title='{title or ws_dir.name}' dir='{ws_dir.name}'"
|
|
)
|
|
|
|
|
|
# ---------------- wip (stage + commit) ----------------
|
|
|
|
|
|
def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
|
|
"""
|
|
Return a list of (status_code, path) for changes in the repo/worktree.
|
|
|
|
Uses 'git status --porcelain'.
|
|
"""
|
|
code, out, err = run_cmd(["git", "status", "--porcelain"], cwd=repo_path)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}"
|
|
)
|
|
return []
|
|
|
|
changes: List[Tuple[str, str]] = []
|
|
for line in out.splitlines():
|
|
if not line.strip():
|
|
continue
|
|
# Format: 'XY path'
|
|
status = line[:2]
|
|
path = line[3:]
|
|
changes.append((status, path))
|
|
return changes
|
|
|
|
|
|
def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]:
|
|
"""
|
|
Interactively choose which files to stage for WIP.
|
|
|
|
- Show list of changed files.
|
|
- Ask user:
|
|
* stage all
|
|
* stage none
|
|
* pick some using iterfzf (multi-select)
|
|
"""
|
|
if not changes:
|
|
return []
|
|
|
|
files = [p for _s, p in changes]
|
|
|
|
console.print(
|
|
Panel(
|
|
"\n".join(files),
|
|
title=f"Changed files in {repo_path.name}",
|
|
subtitle="Use 'all', 'none', or 'pick' when prompted",
|
|
)
|
|
)
|
|
|
|
choice = Prompt.ask(
|
|
"Stage which files?",
|
|
choices=["all", "none", "pick"],
|
|
default="all",
|
|
)
|
|
|
|
if choice == "none":
|
|
return []
|
|
|
|
if choice == "all":
|
|
return files
|
|
|
|
# Multi-select via iterfzf
|
|
selected = iterfzf(files, multi=True, prompt="pick files> ")
|
|
if not selected:
|
|
return []
|
|
# iterfzf may return a single string or list depending on version;
|
|
# normalize to list of strings.
|
|
if isinstance(selected, str):
|
|
selected_files = [selected]
|
|
else:
|
|
selected_files = list(selected)
|
|
|
|
return selected_files
|
|
|
|
@app.command("wip")
|
|
def wip_workspace(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to WIP-commit. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
):
|
|
"""
|
|
For each repo in the workspace:
|
|
|
|
- Show list of changed files.
|
|
- Ask whether to stage all, none, or pick some files.
|
|
- Stage chosen files.
|
|
- Make commit: 'wip: <workspace display name>'.
|
|
"""
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
title, _desc = read_workspace_readme(ws_dir)
|
|
commit_message = f"wip: {title or ws_dir.name}"
|
|
commit_workspace(ctx, workspace, message=commit_message)
|
|
|
|
@app.command("commit")
|
|
def commit_workspace(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to WIP-commit. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
message: Optional[str] = typer.Option(
|
|
None,
|
|
"--message",
|
|
"-m",
|
|
help="Commit message to use.",
|
|
),
|
|
):
|
|
"""
|
|
For each repo in the workspace:
|
|
|
|
- Show list of changed files.
|
|
- Ask whether to stage all, none, or pick some files.
|
|
- Stage chosen files.
|
|
"""
|
|
if not message:
|
|
console.print(
|
|
"[red]No commit message provided. Exiting.[/red]",
|
|
file=sys.stderr,
|
|
)
|
|
raise typer.Exit(1)
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
title, _desc = read_workspace_readme(ws_dir)
|
|
|
|
worktrees: List[Path] = []
|
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
|
if child.name.lower() == "readme.md":
|
|
continue
|
|
if not ensure_git_repo(child):
|
|
continue
|
|
worktrees.append(child)
|
|
|
|
if not worktrees:
|
|
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
for wt in worktrees:
|
|
console.print(f"\n[bold]Repo:[/bold] {wt.name}")
|
|
changes = git_status_porcelain(wt)
|
|
if not changes:
|
|
console.print(" [green]No changes.[/green]")
|
|
continue
|
|
|
|
files_to_stage = choose_files_for_wip(wt, changes)
|
|
if not files_to_stage:
|
|
console.print(" [yellow]Nothing selected to stage.[/yellow]")
|
|
continue
|
|
|
|
# Stage selected files
|
|
for f in files_to_stage:
|
|
code, _out, err = run_cmd(["git", "add", f], cwd=wt)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}"
|
|
)
|
|
|
|
# Check if there is anything to commit
|
|
code, out, _err = run_cmd(["git", "diff", "--cached", "--name-only"], cwd=wt)
|
|
if code != 0 or not out.strip():
|
|
console.print(" [yellow]No staged changes to commit.[/yellow]")
|
|
continue
|
|
|
|
# Commit
|
|
code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
|
|
)
|
|
else:
|
|
console.print(
|
|
f" [green]Created commit in {wt.name}:[/green] '{message}'"
|
|
)
|
|
|
|
|
|
# ---------------- push ----------------
|
|
|
|
|
|
@app.command("push")
|
|
def push_workspace(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to push. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
remote: str = typer.Option(
|
|
"origin",
|
|
"--remote",
|
|
"-r",
|
|
help="Remote name to push to (default: origin).",
|
|
),
|
|
):
|
|
"""
|
|
For each repo in the workspace, run 'git push <remote> <current-branch>'.
|
|
|
|
Skips repos with no current branch or when push fails.
|
|
"""
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
title, _desc = read_workspace_readme(ws_dir)
|
|
console.print(
|
|
f"Pushing all repos in workspace [bold]{title or ws_dir.name}[/bold] "
|
|
f"(dir: {ws_dir.name}) to remote '{remote}'"
|
|
)
|
|
|
|
worktrees: List[Path] = []
|
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
|
if child.name.lower() == "readme.md":
|
|
continue
|
|
if not ensure_git_repo(child):
|
|
continue
|
|
worktrees.append(child)
|
|
|
|
if not worktrees:
|
|
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
for wt in worktrees:
|
|
branch = get_current_branch(wt)
|
|
if not branch or branch == "HEAD":
|
|
console.print(
|
|
f"[yellow]Skipping {wt.name}: no current branch (detached HEAD?).[/yellow]"
|
|
)
|
|
continue
|
|
|
|
console.print(f"\n[bold]Repo:[/bold] {wt.name} [bold]Branch:[/bold] {branch}")
|
|
code, _out, err = run_cmd(["git", "push", remote, branch], cwd=wt)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to push {wt.name} ({branch}) to {remote}:[/red]\n{err}"
|
|
)
|
|
else:
|
|
console.print(
|
|
f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]"
|
|
)
|
|
|
|
|
|
# ---------------- status (current workspace) ----------------
|
|
|
|
|
|
@app.command("status")
|
|
def status_workspace(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to show status for. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
):
|
|
"""
|
|
Show detailed status for the current (or specified) workspace.
|
|
|
|
For each repo in the workspace:
|
|
- repo directory name
|
|
- branch
|
|
- ahead/behind/dirty indicator
|
|
- number of changed files
|
|
"""
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
title, desc = read_workspace_readme(ws_dir)
|
|
|
|
# if desc:
|
|
# console.print(Panel(desc, title=title))
|
|
|
|
table = Table(
|
|
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
|
|
title_justify="left",
|
|
show_lines=False,
|
|
)
|
|
table.add_column("Repo (dir)", style="bold")
|
|
table.add_column("Branch")
|
|
table.add_column("Ahead/Behind/Dirty")
|
|
table.add_column("Changed Files", justify="right")
|
|
|
|
files_table = Table(show_lines=False)
|
|
files_table.add_column("Repo (dir)", style="bold")
|
|
files_table.add_column("File", justify="left")
|
|
files_table.add_column("Status", justify="right")
|
|
|
|
worktrees: List[Path] = []
|
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
|
if child.name.lower() == "readme.md":
|
|
continue
|
|
if not ensure_git_repo(child):
|
|
continue
|
|
worktrees.append(child)
|
|
|
|
if not worktrees:
|
|
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
for wt in worktrees:
|
|
branch = get_current_branch(wt) or "?"
|
|
status = get_git_status(wt)
|
|
changes = git_status_porcelain(wt)
|
|
table.add_row(
|
|
wt.name,
|
|
branch,
|
|
status.indicator,
|
|
str(len(changes)),
|
|
)
|
|
|
|
if changes:
|
|
for f, s in changes:
|
|
files_table.add_row(wt.name, f, s)
|
|
|
|
console.print(table)
|
|
|
|
console.print("\nFiles with changes:")
|
|
if files_table.rows:
|
|
console.print(files_table)
|
|
|
|
@app.command("diff")
|
|
def diff_workspace(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to diff. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
staged: bool = typer.Option(
|
|
False,
|
|
"--staged",
|
|
"-s",
|
|
help="Show only staged changes (git diff --cached).",
|
|
),
|
|
):
|
|
"""
|
|
Show git diff for all repos in the workspace.
|
|
|
|
- If --staged: uses `git diff --cached` (only staged changes).
|
|
- Otherwise: uses `git diff` (staged + unstaged).
|
|
"""
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
if not ws_dir.exists():
|
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
title, _desc = read_workspace_readme(ws_dir)
|
|
console.print(
|
|
f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})"
|
|
)
|
|
|
|
# Collect worktrees (repos) in this workspace
|
|
worktrees: List[Path] = []
|
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
|
if child.name.lower() == "readme.md":
|
|
continue
|
|
if not ensure_git_repo(child):
|
|
continue
|
|
worktrees.append(child)
|
|
|
|
if not worktrees:
|
|
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
any_diffs = False
|
|
|
|
for wt in worktrees:
|
|
# Quick check: skip repos with no changes at all
|
|
changes = git_status_porcelain(wt)
|
|
if not changes:
|
|
continue
|
|
|
|
any_diffs = True
|
|
console.rule(f"[bold]{wt.name}[/bold]")
|
|
|
|
cmd = ["git", "diff"]
|
|
if staged:
|
|
cmd.append("--cached")
|
|
|
|
code, out, err = run_cmd(cmd, cwd=wt)
|
|
if code != 0:
|
|
console.print(
|
|
f"[red]Failed to get diff for {wt.name}:[/red]\n{err}"
|
|
)
|
|
continue
|
|
|
|
if not out.strip():
|
|
console.print("[dim]No diff output.[/dim]")
|
|
else:
|
|
# Use markup=False so diff characters like [ ] don't get eaten by Rich
|
|
console.print(out, markup=False)
|
|
|
|
if not any_diffs:
|
|
console.print("[green]No changes to diff in any repo.[/green]")
|
|
|
|
|
|
def not_in_tmux() -> bool:
|
|
"""Return True if not inside tmux or zellij."""
|
|
return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ")
|
|
|
|
|
|
def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess:
|
|
"""Run a tmux command."""
|
|
env = os.environ.copy()
|
|
if clear_tmux_env:
|
|
env["TMUX"] = ""
|
|
return subprocess.run(
|
|
["tmux", *args],
|
|
env=env,
|
|
check=False,
|
|
)
|
|
|
|
|
|
def session_exists(session_name: str) -> bool:
|
|
"""Check if the tmux session exists."""
|
|
result = run_tmux(["has-session", "-t", f"={session_name}"])
|
|
return result.returncode == 0
|
|
|
|
|
|
def pick_project(base_dir: Path) -> Optional[str]:
|
|
"""Use fzf to pick a subdirectory (project) from base_dir."""
|
|
if not base_dir.is_dir():
|
|
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
|
|
return None
|
|
|
|
subdirs = sorted(
|
|
[p.name for p in base_dir.iterdir() if p.is_dir()]
|
|
)
|
|
if not subdirs:
|
|
typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True)
|
|
return None
|
|
|
|
if not shutil.which("fzf"):
|
|
typer.echo("[ta] fzf not found in PATH.", err=True)
|
|
return None
|
|
|
|
proc = subprocess.run(
|
|
["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"],
|
|
input="\n".join(subdirs),
|
|
text=True,
|
|
capture_output=True,
|
|
)
|
|
|
|
if proc.returncode != 0:
|
|
# Cancelled or failed
|
|
return None
|
|
|
|
choice = proc.stdout.strip()
|
|
return choice or None
|
|
|
|
|
|
def create_detached_session(
|
|
session_name: str,
|
|
path_name: Path,
|
|
start_mode: bool,
|
|
) -> bool:
|
|
"""
|
|
Create a detached session.
|
|
|
|
- If start_mode: just a single window.
|
|
- Else: split layout with nvim on top.
|
|
"""
|
|
# Run tmux as if not in tmux (clear TMUX env)
|
|
if start_mode:
|
|
r = run_tmux(
|
|
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
|
clear_tmux_env=True,
|
|
)
|
|
return r.returncode == 0
|
|
else:
|
|
r = run_tmux(
|
|
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
|
clear_tmux_env=True,
|
|
)
|
|
if r.returncode != 0:
|
|
return False
|
|
|
|
r = run_tmux(
|
|
[
|
|
"split-window",
|
|
"-vb",
|
|
"-t",
|
|
session_name,
|
|
"-c",
|
|
str(path_name),
|
|
"-p",
|
|
"70",
|
|
],
|
|
clear_tmux_env=True,
|
|
)
|
|
if r.returncode != 0:
|
|
return False
|
|
|
|
r = run_tmux(
|
|
[
|
|
"send-keys",
|
|
"-t",
|
|
session_name,
|
|
"nvim '+Telescope find_files'",
|
|
"Enter",
|
|
],
|
|
clear_tmux_env=True,
|
|
)
|
|
return r.returncode == 0
|
|
|
|
|
|
def create_if_needed_and_attach(
|
|
session_name: str,
|
|
path_name: Path,
|
|
start_mode: bool,
|
|
) -> bool:
|
|
"""
|
|
Mimic the bash logic:
|
|
|
|
- If not in tmux: `tmux new-session -As ... -c path_name`
|
|
- Else:
|
|
- If session doesn't exist: create_detached_session()
|
|
- Then `tmux switch-client -t session_name`
|
|
"""
|
|
if not_in_tmux():
|
|
r = run_tmux(
|
|
["new-session", "-As", session_name, "-c", str(path_name)],
|
|
)
|
|
return r.returncode == 0
|
|
|
|
# Inside tmux
|
|
if not session_exists(session_name):
|
|
if not create_detached_session(session_name, path_name, start_mode):
|
|
return False
|
|
|
|
r = run_tmux(["switch-client", "-t", session_name])
|
|
return r.returncode == 0
|
|
|
|
|
|
def attach_to_first_session() -> None:
|
|
"""Fallback: attach to first tmux session and open choose-tree."""
|
|
# Attach to the first listed session
|
|
list_proc = subprocess.run(
|
|
["tmux", "list-sessions", "-F", "#{session_name}"],
|
|
text=True,
|
|
capture_output=True,
|
|
)
|
|
if list_proc.returncode != 0 or not list_proc.stdout.strip():
|
|
typer.echo("[ta] No tmux sessions found to attach to.", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
first_session = list_proc.stdout.strip().splitlines()[0]
|
|
|
|
attach_proc = run_tmux(["attach-session", "-t", first_session])
|
|
if attach_proc.returncode != 0:
|
|
raise typer.Exit(attach_proc.returncode)
|
|
|
|
# After attach, show choose-tree (this will run in the attached session)
|
|
run_tmux(["choose-tree", "-Za"])
|
|
|
|
tmux_app = typer.Typer(
|
|
name="tmux",
|
|
help="tmux commands",
|
|
add_completion=False,
|
|
)
|
|
|
|
@tmux_app.command('attach')
|
|
def attach(
|
|
ctx: typer.Context,
|
|
workspace: Optional[str] = typer.Option(
|
|
None,
|
|
"--workspace",
|
|
"-w",
|
|
help=(
|
|
"Workspace directory name to show status for. "
|
|
"If omitted, uses the workspace containing the current directory."
|
|
),
|
|
),
|
|
):
|
|
"""
|
|
Attach or create a session for a repo in a workspace.
|
|
"""
|
|
# ws_dir = get_workspace_dir(workspace)
|
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
|
if not ws_dir:
|
|
console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]")
|
|
raise typer.Exit(1)
|
|
# pick repo in workspace
|
|
repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()])
|
|
if not repo:
|
|
console.print("[red]No repo selected. Exiting.[/red]")
|
|
raise typer.Exit(1)
|
|
print(ws_dir, repo)
|
|
session_name = f'{ws_dir.name}|{repo.name}'
|
|
print(f'{ws_dir.name}|{repo.name}')
|
|
create_if_needed_and_attach(session_name, repo, False)
|
|
|
|
|
|
app.add_typer(tmux_app)
|
|
|
|
if __name__ == "__main__":
|
|
app()
|