1506 lines
44 KiB
Python
Executable file
1506 lines
44 KiB
Python
Executable file
#!/usr/bin/env -S uv run --quiet --script
|
||
# /// script
|
||
# requires-python = ">=3.12"
|
||
# dependencies = [
|
||
# "typer",
|
||
# "rich",
|
||
# "pydantic",
|
||
# "pydantic-settings",
|
||
# "iterfzf",
|
||
# ]
|
||
# ///
|
||
|
||
from __future__ import annotations
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import List, Optional, Tuple
|
||
|
||
|
||
import typer
|
||
|
||
from pydantic import Field
|
||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||
from rich.console import Console
|
||
from rich.table import Table
|
||
from rich.prompt import Prompt, Confirm
|
||
from rich.panel import Panel
|
||
from rich.syntax import Syntax
|
||
from rich.panel import Panel
|
||
from iterfzf import iterfzf
|
||
|
||
app = typer.Typer(
|
||
help="Workspace management tool",
|
||
invoke_without_command=True,
|
||
)
|
||
console = Console()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Settings
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class Settings(BaseSettings):
|
||
"""
|
||
Global configuration for workspaces.
|
||
|
||
Resolution for workspaces_name:
|
||
1. Command-line flag --workspaces-name
|
||
2. Environment variable WORKSPACES_NAME
|
||
3. Default "git"
|
||
|
||
repos_dir = ~/workspaces_name
|
||
workspaces_dir = ~/workspaces_name + ".workspaces"
|
||
"""
|
||
|
||
workspaces_name: str = Field(
|
||
default="git",
|
||
description="Logical name of the workspace group (e.g. 'git', 'work', 'personal').",
|
||
)
|
||
|
||
# pydantic v2-style config
|
||
model_config = SettingsConfigDict(
|
||
env_prefix="",
|
||
env_file=None,
|
||
env_nested_delimiter="__",
|
||
extra="ignore",
|
||
)
|
||
|
||
@classmethod
|
||
def from_env_and_override(cls, override_workspaces_name: Optional[str]) -> "Settings":
|
||
"""
|
||
Construct settings honoring:
|
||
1. CLI override
|
||
2. WORKSPACES_NAME env
|
||
3. default "git"
|
||
"""
|
||
s = cls()
|
||
if override_workspaces_name is not None:
|
||
s.workspaces_name = override_workspaces_name
|
||
env_val = os.getenv("WORKSPACES_NAME")
|
||
if env_val and override_workspaces_name is None:
|
||
s.workspaces_name = env_val
|
||
return s
|
||
|
||
|
||
def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]:
|
||
"""
|
||
Build Settings and derived paths, honoring CLI override of workspaces_name.
|
||
"""
|
||
base_settings = Settings.from_env_and_override(workspaces_name)
|
||
name = base_settings.workspaces_name
|
||
repos_dir = Path(os.path.expanduser(f"~/{name}")).resolve()
|
||
workspaces_dir = Path(os.path.expanduser(f"~/{name}.workspaces")).resolve()
|
||
return base_settings, repos_dir, workspaces_dir
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Models / helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class GitStatus:
|
||
ahead: int = 0
|
||
behind: int = 0
|
||
dirty: bool = False
|
||
|
||
@property
|
||
def indicator(self) -> str:
|
||
"""
|
||
Build ASCII indicator:
|
||
- clean: "·"
|
||
- ahead 1: "↑1"
|
||
- behind 2: "↓2"
|
||
- both ahead/behind: "↑1 ↓2"
|
||
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
|
||
"""
|
||
parts: List[str] = []
|
||
if self.ahead:
|
||
parts.append(f"↑{self.ahead}")
|
||
if self.behind:
|
||
parts.append(f"↓{self.behind}")
|
||
base = " ".join(parts) if parts else "·"
|
||
if self.dirty:
|
||
base += "*"
|
||
return base
|
||
|
||
|
||
def get_git_status(repo_path: Path) -> GitStatus:
|
||
"""
|
||
Get ahead/behind and dirty info for a Git repo.
|
||
|
||
Uses `git status --porcelain=v2 --branch` and parses:
|
||
- '# branch.ab +A -B' for ahead/behind
|
||
- any non-comment line for dirty
|
||
"""
|
||
try:
|
||
out = subprocess.check_output(
|
||
["git", "status", "--porcelain=v2", "--branch"],
|
||
cwd=repo_path,
|
||
stderr=subprocess.DEVNULL,
|
||
text=True,
|
||
)
|
||
except Exception:
|
||
return GitStatus()
|
||
|
||
ahead = 0
|
||
behind = 0
|
||
dirty = False
|
||
|
||
for line in out.splitlines():
|
||
if line.startswith("# branch.ab"):
|
||
# Example: "# branch.ab +1 -2"
|
||
m = re.search(r"\+(\d+)\s+-(\d+)", line)
|
||
if m:
|
||
ahead = int(m.group(1))
|
||
behind = int(m.group(2))
|
||
elif not line.startswith("#"):
|
||
dirty = True
|
||
|
||
return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
|
||
|
||
|
||
def get_current_branch(repo_path: Path) -> Optional[str]:
|
||
try:
|
||
out = subprocess.check_output(
|
||
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
||
cwd=repo_path,
|
||
stderr=subprocess.DEVNULL,
|
||
text=True,
|
||
)
|
||
return out.strip()
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def ensure_git_repo(path: Path) -> bool:
|
||
# Works for repos and worktrees (.git file or dir)
|
||
return (path / ".git").exists()
|
||
|
||
|
||
def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
cwd=cwd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
out, err = proc.communicate()
|
||
return proc.returncode, out, err
|
||
|
||
|
||
def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path:
|
||
"""
|
||
Resolve the directory of a workspace.
|
||
|
||
- If workspace_name given: use workspaces_dir / workspace_name
|
||
- Else: use current working directory (must be inside workspaces_dir).
|
||
"""
|
||
if workspace_name:
|
||
return workspaces_dir / workspace_name
|
||
|
||
cwd = Path.cwd().resolve()
|
||
try:
|
||
cwd.relative_to(workspaces_dir)
|
||
except ValueError:
|
||
console.print(
|
||
f"[red]Not inside workspaces_dir ({workspaces_dir}). "
|
||
"Please use --workspace to specify a workspace.[/red]"
|
||
)
|
||
raise typer.Exit(1)
|
||
|
||
# The top-level workspace directory is the first component under workspaces_dir
|
||
rel = cwd.relative_to(workspaces_dir)
|
||
workspace_root = workspaces_dir / rel.parts[0]
|
||
return workspace_root
|
||
|
||
|
||
def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
|
||
"""
|
||
Return (name_from_h1, description_from_rest_of_file).
|
||
If file missing or malformed, fallback appropriately.
|
||
"""
|
||
readme = ws_dir / "readme.md"
|
||
if not readme.exists():
|
||
name = ws_dir.name
|
||
return name, ""
|
||
|
||
text = readme.read_text(encoding="utf-8")
|
||
lines = text.splitlines()
|
||
|
||
if not lines:
|
||
return ws_dir.name, ""
|
||
|
||
# First non-empty line must be '# ...' per spec
|
||
first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None)
|
||
if first_non_empty_idx is None:
|
||
return ws_dir.name, ""
|
||
|
||
first_line = lines[first_non_empty_idx].strip()
|
||
if not first_line.startswith("# "):
|
||
# Fallback
|
||
return ws_dir.name, "\n".join(lines[first_non_empty_idx + 1 :]).strip()
|
||
|
||
name = first_line[2:].strip()
|
||
desc = "\n".join(lines[first_non_empty_idx + 1 :]).strip()
|
||
return name, desc
|
||
|
||
|
||
def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
|
||
ws_dir.mkdir(parents=True, exist_ok=True)
|
||
content = f"# {name}\n\n{description}\n"
|
||
(ws_dir / "readme.md").write_text(content, encoding="utf-8")
|
||
|
||
|
||
# --- name/branch helpers ----------------------------------------------------
|
||
|
||
|
||
def slugify_workspace_name(name: str) -> str:
|
||
"""
|
||
Turn arbitrary workspace name into a safe directory/worktree name.
|
||
|
||
- Lowercase
|
||
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
|
||
- Replace spaces with '-'
|
||
- Replace any char not [a-z0-9._/-] with '-'
|
||
- Collapse multiple '-' and strip leading/trailing '-' and '/'
|
||
"""
|
||
name = name.replace(":", "-")
|
||
name = name.replace(" ", "-")
|
||
safe = []
|
||
for ch in name:
|
||
if re.match(r"[a-zA-Z0-9._/-]", ch):
|
||
safe.append(ch.lower())
|
||
else:
|
||
safe.append("-")
|
||
s = "".join(safe)
|
||
s = re.sub(r"-+", "-", s)
|
||
s = s.strip("-/")
|
||
return s or "workspace"
|
||
|
||
|
||
def branch_name_for_workspace(name: str) -> str:
|
||
"""
|
||
Compute branch name from workspace name.
|
||
|
||
Rules:
|
||
- Start from slugified name (no spaces/specials).
|
||
- Replace *first* '-' with '/' so:
|
||
'fix-my-issue' -> 'fix/my-issue'
|
||
(original 'fix:my issue' -> slug 'fix-my-issue' -> branch 'fix/my-issue')
|
||
"""
|
||
base = slugify_workspace_name(name)
|
||
if "-" in base:
|
||
return base.replace("-", "/", 1)
|
||
return base
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Commands / main callback
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@app.callback()
|
||
def main(
|
||
ctx: typer.Context,
|
||
workspaces_name: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspaces-name",
|
||
"-W",
|
||
help=(
|
||
"Logical name for this workspace set (e.g. 'git', 'work', 'personal'). "
|
||
"Overrides WORKSPACES_NAME env. Defaults to 'git'."
|
||
),
|
||
),
|
||
):
|
||
"""
|
||
Manage workspaces and associated Git worktrees.
|
||
|
||
If no command is given, this will list workspaces.
|
||
"""
|
||
settings, repos_dir, workspaces_dir = resolve_paths(workspaces_name)
|
||
ctx.obj = {
|
||
"settings": settings,
|
||
"repos_dir": repos_dir,
|
||
"workspaces_dir": workspaces_dir,
|
||
}
|
||
|
||
# Default behavior when no subcommand is provided:
|
||
if ctx.invoked_subcommand is None:
|
||
list_workspaces(ctx)
|
||
raise typer.Exit(0)
|
||
|
||
|
||
def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]:
|
||
obj = ctx.obj or {}
|
||
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
|
||
|
||
|
||
# ---------------- list-workspaces ----------------
|
||
|
||
|
||
@app.command("list")
|
||
def list_workspaces(
|
||
ctx: typer.Context,
|
||
):
|
||
"""
|
||
List all workspaces.
|
||
|
||
Shows:
|
||
- workspace directory name
|
||
- workspace description (from README markdown, everything after H1)
|
||
- included repos with git status indicators
|
||
"""
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
workspaces_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
table = Table(title=f"Workspaces ({workspaces_dir})")
|
||
table.add_column("Workspace (dir)", style="bold")
|
||
table.add_column("Title", overflow="fold")
|
||
table.add_column("Description", overflow="fold")
|
||
table.add_column("Repos", overflow="fold")
|
||
|
||
if not workspaces_dir.exists():
|
||
console.print(f"[yellow]No workspaces_dir found at {workspaces_dir}[/yellow]")
|
||
raise typer.Exit(0)
|
||
|
||
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
|
||
title, desc = read_workspace_readme(ws)
|
||
repos: List[str] = []
|
||
for child in sorted(p for p in ws.iterdir() if p.is_dir()):
|
||
if child.name.lower() == "readme.md":
|
||
continue
|
||
if not ensure_git_repo(child):
|
||
continue
|
||
status = get_git_status(child)
|
||
branch = get_current_branch(child) or "?"
|
||
indicator = status.indicator
|
||
repos.append(f"{child.name} [{branch}] {indicator}")
|
||
repos_str = "\n".join(repos) if repos else "-"
|
||
table.add_row(ws.name, title or "-", desc or "-", repos_str)
|
||
|
||
console.print(table)
|
||
|
||
|
||
# ---------------- create-workspace ----------------
|
||
|
||
|
||
@app.command("create")
|
||
@app.command("new", hidden=True)
|
||
def create_workspace(
|
||
ctx: typer.Context,
|
||
name: Optional[str] = typer.Option(
|
||
None,
|
||
"--name",
|
||
"-n",
|
||
help="Name of the new workspace (display name; can contain spaces).",
|
||
),
|
||
description: Optional[str] = typer.Option(
|
||
None,
|
||
"--description",
|
||
"-d",
|
||
help="Description of the workspace. Will be written into readme.md.",
|
||
),
|
||
):
|
||
"""
|
||
Create a new workspace.
|
||
|
||
- Asks for name and description if not provided.
|
||
- Workspace directory uses a slugified version of the name.
|
||
- README uses the original name as '# <name>\\n\\n<description>'.
|
||
"""
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
workspaces_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
if not name:
|
||
name = Prompt.ask("Workspace name (can contain spaces)")
|
||
|
||
if not description:
|
||
description = Prompt.ask("Workspace description", default="")
|
||
|
||
dir_name = slugify_workspace_name(name)
|
||
ws_dir = workspaces_dir / dir_name
|
||
if ws_dir.exists():
|
||
console.print(f"[red]Workspace dir '{dir_name}' already exists at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
write_workspace_readme(ws_dir, name, description)
|
||
console.print(
|
||
f"[green]Created workspace[/green] title='{name}' dir='{ws_dir.name}' at {ws_dir}"
|
||
)
|
||
|
||
|
||
# ---------------- list-repos ----------------
|
||
|
||
|
||
@app.command("list-repos")
|
||
def list_repos(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to inspect. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
):
|
||
"""
|
||
List repos and branches in the current (or specified) workspace.
|
||
|
||
Shows:
|
||
- repo directory name
|
||
- current branch
|
||
- ahead/behind/dirty indicators
|
||
"""
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
title, _desc = read_workspace_readme(ws_dir)
|
||
table = Table(title=f"Repos in workspace '{title}' (dir: {ws_dir.name})")
|
||
table.add_column("Repo (dir)", style="bold")
|
||
table.add_column("Branch")
|
||
table.add_column("Status")
|
||
|
||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||
if child.name.lower() == "readme.md":
|
||
continue
|
||
if not ensure_git_repo(child):
|
||
continue
|
||
branch = get_current_branch(child) or "?"
|
||
status = get_git_status(child)
|
||
table.add_row(child.name, branch, status.indicator)
|
||
|
||
console.print(table)
|
||
|
||
|
||
# ---------------- add-repo ----------------
|
||
|
||
|
||
def list_all_repos(repos_dir: Path) -> List[Path]:
|
||
"""
|
||
List all directories in repos_dir that appear to be git repos.
|
||
"""
|
||
if not repos_dir.exists():
|
||
return []
|
||
repos = []
|
||
for p in sorted(repos_dir.iterdir()):
|
||
if p.is_dir() and ensure_git_repo(p):
|
||
repos.append(p)
|
||
return repos
|
||
|
||
|
||
def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
|
||
"""
|
||
Use iterfzf (Python library) to pick a repo from a list of paths.
|
||
"""
|
||
if not repos:
|
||
return None
|
||
|
||
names = [r.name for r in repos]
|
||
choice = iterfzf(names, prompt="pick a repo> ")
|
||
|
||
if not choice:
|
||
return None
|
||
|
||
for r in repos:
|
||
if r.name == choice:
|
||
return r
|
||
return None
|
||
|
||
|
||
@app.command("add-repo")
|
||
@app.command("add", hidden=True)
|
||
def add_repo(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to add repo to. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
repo_name: Optional[str] = typer.Option(
|
||
None,
|
||
"--repo",
|
||
"-r",
|
||
help=(
|
||
"Name of repo (directory under repos_dir). "
|
||
"If omitted, uses iterfzf to pick from repos_dir."
|
||
),
|
||
),
|
||
):
|
||
"""
|
||
Add a repo to a workspace.
|
||
|
||
- Lists all directories in repos_dir as repos.
|
||
- Uses iterfzf to pick repo if --repo not given.
|
||
- Creates a worktree for a branch derived from the workspace name
|
||
into workspace_dir / repo_name.
|
||
"""
|
||
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
# Use display title from README to derive branch name
|
||
title, _desc = read_workspace_readme(ws_dir)
|
||
branch = branch_name_for_workspace(title or ws_dir.name)
|
||
|
||
all_repos = list_all_repos(repos_dir)
|
||
if not all_repos:
|
||
console.print(f"[red]No git repos found in {repos_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
repo_path: Optional[Path] = None
|
||
if repo_name:
|
||
for r in all_repos:
|
||
if r.name == repo_name:
|
||
repo_path = r
|
||
break
|
||
if repo_path is None:
|
||
console.print(
|
||
f"[red]Repo '{repo_name}' not found in {repos_dir}. "
|
||
"Use --repo with a valid name or omit to use iterfzf.[/red]"
|
||
)
|
||
raise typer.Exit(1)
|
||
else:
|
||
repo_path = pick_repo_with_iterfzf(all_repos)
|
||
if repo_path is None:
|
||
console.print("[yellow]No repo selected.[/yellow]")
|
||
raise typer.Exit(0)
|
||
|
||
target_dir = ws_dir / repo_path.name
|
||
if target_dir.exists():
|
||
console.print(
|
||
f"[yellow]Directory {target_dir} already exists. "
|
||
"Assuming repo already added.[/yellow]"
|
||
)
|
||
raise typer.Exit(0)
|
||
|
||
# Ensure branch exists or create it
|
||
code, _out, err = run_cmd(["git", "rev-parse", "--verify", branch], cwd=repo_path)
|
||
if code != 0:
|
||
# create branch from current HEAD
|
||
console.print(
|
||
f"[yellow]Branch '{branch}' does not exist in {repo_path.name}; creating it.[/yellow]"
|
||
)
|
||
code, _out, err = run_cmd(["git", "branch", branch], cwd=repo_path)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to create branch '{branch}' in {repo_path.name}:[/red]\n{err}"
|
||
)
|
||
raise typer.Exit(1)
|
||
|
||
# Create worktree (dir name is safe because repo_path.name is)
|
||
ws_dir.mkdir(parents=True, exist_ok=True)
|
||
code, _out, err = run_cmd(
|
||
["git", "worktree", "add", str(target_dir), branch],
|
||
cwd=repo_path,
|
||
)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to create worktree for repo {repo_path.name} "
|
||
f"on branch '{branch}' into {target_dir}:[/red]\n{err}"
|
||
)
|
||
raise typer.Exit(1)
|
||
|
||
console.print(
|
||
f"[green]Added repo[/green] {repo_path.name} "
|
||
f"to workspace [bold]{title or ws_dir.name}[/bold] "
|
||
f"(dir: {ws_dir.name}) on branch '{branch}' at {target_dir}"
|
||
)
|
||
|
||
|
||
# ---------------- rm-workspace ----------------
|
||
|
||
|
||
def find_repo_for_worktree(
|
||
worktree_path: Path, repos_dir: Path
|
||
) -> Optional[Path]:
|
||
"""
|
||
Try to find the parent repo for a worktree, assuming it lives in repos_dir
|
||
with the same directory name as the worktree.
|
||
"""
|
||
candidate = repos_dir / worktree_path.name
|
||
if candidate.exists() and ensure_git_repo(candidate):
|
||
return candidate
|
||
return None
|
||
|
||
|
||
def has_unpushed_commits(repo_path: Path, branch: str) -> bool:
|
||
"""
|
||
Detect if branch has commits not on its upstream.
|
||
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
|
||
"""
|
||
# Check if upstream exists
|
||
code, _, _ = run_cmd(
|
||
["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
|
||
cwd=repo_path,
|
||
)
|
||
if code != 0:
|
||
# No upstream configured; treat as unpushed work.
|
||
return True
|
||
|
||
code, out, _ = run_cmd(
|
||
["git", "rev-list", "@{u}..HEAD", "--count"],
|
||
cwd=repo_path,
|
||
)
|
||
if code != 0:
|
||
return True
|
||
try:
|
||
count = int(out.strip() or "0")
|
||
except ValueError:
|
||
return True
|
||
return count > 0
|
||
|
||
|
||
def is_branch_integrated_into_main(
|
||
repo_path: Path,
|
||
branch: str,
|
||
main_ref: str = "origin/main",
|
||
) -> bool:
|
||
"""
|
||
Heuristic: is `branch`'s content already in `main_ref`?
|
||
|
||
Returns True if:
|
||
- branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR
|
||
- the branch tip's tree hash appears somewhere in main_ref's history
|
||
(common for squash merges and cherry-picks).
|
||
|
||
This does not know about PRs, only Git history.
|
||
"""
|
||
# Make sure we have latest main (best-effort; ignore fetch errors)
|
||
_code, _out, _err = run_cmd(["git", "fetch", "--quiet", "origin"], cwd=repo_path)
|
||
|
||
# 1) Simple case: branch is ancestor of main_ref
|
||
code, _out, _err = run_cmd(
|
||
["git", "merge-base", "--is-ancestor", branch, main_ref],
|
||
cwd=repo_path,
|
||
)
|
||
if code == 0:
|
||
return True
|
||
|
||
# 2) Squash / cherry-pick heuristic: same tree exists on main_ref
|
||
code, out, err = run_cmd(
|
||
["git", "show", "-s", "--format=%T", branch],
|
||
cwd=repo_path,
|
||
)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to get tree for branch {branch} in {repo_path.name}:[/red]\n{err}"
|
||
)
|
||
return False
|
||
|
||
branch_tree = out.strip()
|
||
if not branch_tree:
|
||
return False
|
||
|
||
code, out, err = run_cmd(
|
||
["git", "log", "--format=%T", main_ref],
|
||
cwd=repo_path,
|
||
)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to walk {main_ref} in {repo_path.name}:[/red]\n{err}"
|
||
)
|
||
return False
|
||
|
||
main_trees = {line.strip() for line in out.splitlines() if line.strip()}
|
||
return branch_tree in main_trees
|
||
|
||
@app.command("rm")
|
||
def remove_workspace(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to remove. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
force: bool = typer.Option(
|
||
False,
|
||
"--force",
|
||
"-f",
|
||
help="Force removal even if there is dirty or unpushed work.",
|
||
),
|
||
main_ref: str = typer.Option(
|
||
"origin/main",
|
||
"--main-ref",
|
||
help="Ref to consider as the integration target (default: origin/main).",
|
||
),
|
||
):
|
||
"""
|
||
Remove a workspace:
|
||
|
||
- For each repo worktree in the workspace:
|
||
* Check for dirty work or unpushed commits.
|
||
* If any found and not --force, abort.
|
||
* Otherwise, run 'git worktree remove'.
|
||
- Finally, delete the workspace directory.
|
||
"""
|
||
_settings, repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
title, _desc = read_workspace_readme(ws_dir)
|
||
|
||
# Collect worktrees (subdirs that look like git repos, excluding readme.md)
|
||
worktrees: List[Path] = []
|
||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||
if child.name.lower() == "readme.md":
|
||
continue
|
||
if not ensure_git_repo(child):
|
||
continue
|
||
worktrees.append(child)
|
||
|
||
if not worktrees:
|
||
# No worktrees, just remove workspace dir
|
||
if not Confirm.ask(
|
||
f"[red]No worktrees found.[/red] Remove empty workspace "
|
||
f"[bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})?",
|
||
default=False,
|
||
):
|
||
raise typer.Exit(0)
|
||
shutil.rmtree(ws_dir)
|
||
console.print(f"[green]Removed workspace[/green] {ws_dir}")
|
||
raise typer.Exit(0)
|
||
|
||
# Check for dirty / unpushed changes
|
||
problems: List[str] = []
|
||
for wt in worktrees:
|
||
status = get_git_status(wt)
|
||
branch = get_current_branch(wt) or "?"
|
||
repo = find_repo_for_worktree(wt, repos_dir)
|
||
|
||
integrated = False
|
||
if repo is not None and branch != "?":
|
||
integrated = is_branch_integrated_into_main(wt, branch, main_ref=main_ref)
|
||
|
||
if status.dirty:
|
||
problems.append(f"{wt.name}: dirty working tree on '{branch}'")
|
||
|
||
# Only care about "unpushed commits" if the branch is NOT integrated into main.
|
||
if repo is not None and branch != "?" and not integrated:
|
||
if has_unpushed_commits(wt, branch):
|
||
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
|
||
if problems and not force:
|
||
console.print(
|
||
"[red]Refusing to remove workspace; found dirty or unpushed work:[/red]"
|
||
)
|
||
for p in problems:
|
||
console.print(f" - {p}")
|
||
console.print(
|
||
"\nUse [bold]--force[/bold] to remove the workspace and its worktrees anyway."
|
||
)
|
||
raise typer.Exit(1)
|
||
|
||
if not Confirm.ask(
|
||
f"Remove workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name}) "
|
||
"and clean up its worktrees?",
|
||
default=False,
|
||
):
|
||
raise typer.Exit(0)
|
||
|
||
# Remove worktrees via git
|
||
for wt in worktrees:
|
||
repo = find_repo_for_worktree(wt, repos_dir)
|
||
display_name = wt.name
|
||
if repo is None:
|
||
console.print(
|
||
f"[yellow]Could not find parent repo for worktree {display_name}; "
|
||
"deleting directory directly.[/yellow]"
|
||
)
|
||
shutil.rmtree(wt)
|
||
continue
|
||
|
||
console.print(f"Removing worktree for [bold]{display_name}[/bold]...")
|
||
|
||
args = ["git", "worktree", "remove"]
|
||
if force:
|
||
args.append("--force")
|
||
args.append(str(wt))
|
||
|
||
code, _out, err = run_cmd(args, cwd=repo)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to remove worktree {display_name} via git:[/red]\n{err}"
|
||
)
|
||
# As a last resort, if force is given, nuke the dir.
|
||
if force:
|
||
console.print(
|
||
f"[yellow]Forcing directory removal of {wt} despite git error.[/yellow]"
|
||
)
|
||
shutil.rmtree(wt)
|
||
elif wt.exists():
|
||
# git worktree remove *should* delete the directory; if not, clean up.
|
||
shutil.rmtree(wt)
|
||
# Finally, remove the workspace directory itself
|
||
shutil.rmtree(ws_dir)
|
||
console.print(
|
||
f"[green]Removed workspace[/green] title='{title or ws_dir.name}' dir='{ws_dir.name}'"
|
||
)
|
||
|
||
|
||
# ---------------- wip (stage + commit) ----------------
|
||
|
||
|
||
def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
|
||
"""
|
||
Return a list of (status_code, path) for changes in the repo/worktree.
|
||
|
||
Uses 'git status --porcelain'.
|
||
"""
|
||
code, out, err = run_cmd(["git", "status", "--porcelain"], cwd=repo_path)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}"
|
||
)
|
||
return []
|
||
|
||
changes: List[Tuple[str, str]] = []
|
||
for line in out.splitlines():
|
||
if not line.strip():
|
||
continue
|
||
# Format: 'XY path'
|
||
status = line[:2]
|
||
path = line[3:]
|
||
changes.append((status, path))
|
||
return changes
|
||
|
||
|
||
def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]:
|
||
"""
|
||
Interactively choose which files to stage for WIP.
|
||
|
||
- Show list of changed files.
|
||
- Ask user:
|
||
* stage all
|
||
* stage none
|
||
* pick some using iterfzf (multi-select)
|
||
"""
|
||
if not changes:
|
||
return []
|
||
|
||
files = [p for _s, p in changes]
|
||
|
||
console.print(
|
||
Panel(
|
||
"\n".join(files),
|
||
title=f"Changed files in {repo_path.name}",
|
||
subtitle="Use 'all', 'none', or 'pick' when prompted",
|
||
)
|
||
)
|
||
|
||
choice = Prompt.ask(
|
||
"Stage which files?",
|
||
choices=["all", "none", "pick"],
|
||
default="all",
|
||
)
|
||
|
||
if choice == "none":
|
||
return []
|
||
|
||
if choice == "all":
|
||
return files
|
||
|
||
# Multi-select via iterfzf
|
||
selected = iterfzf(files, multi=True, prompt="pick files> ")
|
||
if not selected:
|
||
return []
|
||
# iterfzf may return a single string or list depending on version;
|
||
# normalize to list of strings.
|
||
if isinstance(selected, str):
|
||
selected_files = [selected]
|
||
else:
|
||
selected_files = list(selected)
|
||
|
||
return selected_files
|
||
|
||
@app.command("wip")
|
||
def wip_workspace(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to WIP-commit. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
):
|
||
"""
|
||
For each repo in the workspace:
|
||
|
||
- Show list of changed files.
|
||
- Ask whether to stage all, none, or pick some files.
|
||
- Stage chosen files.
|
||
- Make commit: 'wip: <workspace display name>'.
|
||
"""
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
title, _desc = read_workspace_readme(ws_dir)
|
||
commit_message = f"wip: {title or ws_dir.name}"
|
||
commit_workspace(ctx, workspace, message=commit_message)
|
||
|
||
@app.command("commit")
|
||
def commit_workspace(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to WIP-commit. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
message: Optional[str] = typer.Option(
|
||
None,
|
||
"--message",
|
||
"-m",
|
||
help="Commit message to use.",
|
||
),
|
||
):
|
||
"""
|
||
For each repo in the workspace:
|
||
|
||
- Show list of changed files.
|
||
- Ask whether to stage all, none, or pick some files.
|
||
- Stage chosen files.
|
||
"""
|
||
if not message:
|
||
console.print(
|
||
"[red]No commit message provided. Exiting.[/red]",
|
||
file=sys.stderr,
|
||
)
|
||
raise typer.Exit(1)
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
title, _desc = read_workspace_readme(ws_dir)
|
||
|
||
worktrees: List[Path] = []
|
||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||
if child.name.lower() == "readme.md":
|
||
continue
|
||
if not ensure_git_repo(child):
|
||
continue
|
||
worktrees.append(child)
|
||
|
||
if not worktrees:
|
||
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||
raise typer.Exit(0)
|
||
|
||
for wt in worktrees:
|
||
console.print(f"\n[bold]Repo:[/bold] {wt.name}")
|
||
changes = git_status_porcelain(wt)
|
||
if not changes:
|
||
console.print(" [green]No changes.[/green]")
|
||
continue
|
||
|
||
files_to_stage = choose_files_for_wip(wt, changes)
|
||
if not files_to_stage:
|
||
console.print(" [yellow]Nothing selected to stage.[/yellow]")
|
||
continue
|
||
|
||
# Stage selected files
|
||
for f in files_to_stage:
|
||
code, _out, err = run_cmd(["git", "add", f], cwd=wt)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}"
|
||
)
|
||
|
||
# Check if there is anything to commit
|
||
code, out, _err = run_cmd(["git", "diff", "--cached", "--name-only"], cwd=wt)
|
||
if code != 0 or not out.strip():
|
||
console.print(" [yellow]No staged changes to commit.[/yellow]")
|
||
continue
|
||
|
||
# Commit
|
||
code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
|
||
)
|
||
else:
|
||
console.print(
|
||
f" [green]Created commit in {wt.name}:[/green] '{message}'"
|
||
)
|
||
|
||
|
||
# ---------------- push ----------------
|
||
|
||
|
||
@app.command("push")
|
||
def push_workspace(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to push. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
remote: str = typer.Option(
|
||
"origin",
|
||
"--remote",
|
||
"-r",
|
||
help="Remote name to push to (default: origin).",
|
||
),
|
||
):
|
||
"""
|
||
For each repo in the workspace, run 'git push <remote> <current-branch>'.
|
||
|
||
Skips repos with no current branch or when push fails.
|
||
"""
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
title, _desc = read_workspace_readme(ws_dir)
|
||
console.print(
|
||
f"Pushing all repos in workspace [bold]{title or ws_dir.name}[/bold] "
|
||
f"(dir: {ws_dir.name}) to remote '{remote}'"
|
||
)
|
||
|
||
worktrees: List[Path] = []
|
||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||
if child.name.lower() == "readme.md":
|
||
continue
|
||
if not ensure_git_repo(child):
|
||
continue
|
||
worktrees.append(child)
|
||
|
||
if not worktrees:
|
||
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||
raise typer.Exit(0)
|
||
|
||
for wt in worktrees:
|
||
branch = get_current_branch(wt)
|
||
if not branch or branch == "HEAD":
|
||
console.print(
|
||
f"[yellow]Skipping {wt.name}: no current branch (detached HEAD?).[/yellow]"
|
||
)
|
||
continue
|
||
|
||
console.print(f"\n[bold]Repo:[/bold] {wt.name} [bold]Branch:[/bold] {branch}")
|
||
code, _out, err = run_cmd(["git", "push", remote, branch], cwd=wt)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to push {wt.name} ({branch}) to {remote}:[/red]\n{err}"
|
||
)
|
||
else:
|
||
console.print(
|
||
f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]"
|
||
)
|
||
|
||
|
||
# ---------------- status (current workspace) ----------------
|
||
|
||
|
||
@app.command("status")
|
||
def status_workspace(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to show status for. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
):
|
||
"""
|
||
Show detailed status for the current (or specified) workspace.
|
||
|
||
For each repo in the workspace:
|
||
- repo directory name
|
||
- branch
|
||
- ahead/behind/dirty indicator
|
||
- number of changed files
|
||
"""
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
title, desc = read_workspace_readme(ws_dir)
|
||
|
||
# if desc:
|
||
# console.print(Panel(desc, title=title))
|
||
|
||
table = Table(
|
||
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
|
||
title_justify="left",
|
||
show_lines=False,
|
||
)
|
||
table.add_column("Repo (dir)", style="bold")
|
||
table.add_column("Branch")
|
||
table.add_column("Ahead/Behind/Dirty")
|
||
table.add_column("Changed Files", justify="right")
|
||
|
||
files_table = Table(show_lines=False)
|
||
files_table.add_column("Repo (dir)", style="bold")
|
||
files_table.add_column("File", justify="left")
|
||
files_table.add_column("Status", justify="right")
|
||
|
||
worktrees: List[Path] = []
|
||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||
if child.name.lower() == "readme.md":
|
||
continue
|
||
if not ensure_git_repo(child):
|
||
continue
|
||
worktrees.append(child)
|
||
|
||
if not worktrees:
|
||
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||
raise typer.Exit(0)
|
||
|
||
for wt in worktrees:
|
||
branch = get_current_branch(wt) or "?"
|
||
status = get_git_status(wt)
|
||
changes = git_status_porcelain(wt)
|
||
table.add_row(
|
||
wt.name,
|
||
branch,
|
||
status.indicator,
|
||
str(len(changes)),
|
||
)
|
||
|
||
if changes:
|
||
for f, s in changes:
|
||
files_table.add_row(wt.name, f, s)
|
||
|
||
console.print(table)
|
||
|
||
console.print("\nFiles with changes:")
|
||
if files_table.rows:
|
||
console.print(files_table)
|
||
|
||
@app.command("diff")
|
||
def diff_workspace(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to diff. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
staged: bool = typer.Option(
|
||
False,
|
||
"--staged",
|
||
"-s",
|
||
help="Show only staged changes (git diff --cached).",
|
||
),
|
||
):
|
||
"""
|
||
Show git diff for all repos in the workspace.
|
||
Uses rich Syntax highlighter for diffs.
|
||
"""
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
|
||
if not ws_dir.exists():
|
||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
title, _desc = read_workspace_readme(ws_dir)
|
||
console.print(
|
||
f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})"
|
||
)
|
||
|
||
# Collect repos
|
||
worktrees: List[Path] = []
|
||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||
if child.name.lower() == "readme.md":
|
||
continue
|
||
if not ensure_git_repo(child):
|
||
continue
|
||
worktrees.append(child)
|
||
|
||
if not worktrees:
|
||
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||
raise typer.Exit(0)
|
||
|
||
any_diffs = False
|
||
|
||
for wt in worktrees:
|
||
# Skip clean repos
|
||
changes = git_status_porcelain(wt)
|
||
if not changes:
|
||
continue
|
||
|
||
any_diffs = True
|
||
|
||
console.rule(f"[bold]{wt.name}[/bold]")
|
||
|
||
cmd = ["git", "diff"]
|
||
if staged:
|
||
cmd.append("--cached")
|
||
|
||
code, out, err = run_cmd(cmd, cwd=wt)
|
||
if code != 0:
|
||
console.print(
|
||
f"[red]Failed to get diff for {wt.name}:[/red]\n{err}"
|
||
)
|
||
continue
|
||
|
||
if not out.strip():
|
||
console.print("[dim]No diff output.[/dim]")
|
||
continue
|
||
|
||
syntax = Syntax(
|
||
out,
|
||
"diff",
|
||
theme="nord",
|
||
line_numbers=False,
|
||
word_wrap=False,
|
||
)
|
||
|
||
# Wrap each repo diff in a Panel for clarity
|
||
console.print(Panel(syntax, title=wt.name, border_style="cyan"))
|
||
|
||
if not any_diffs:
|
||
console.print("[green]No changes to diff in any repo.[/green]")
|
||
|
||
def not_in_tmux() -> bool:
|
||
"""Return True if not inside tmux or zellij."""
|
||
return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ")
|
||
|
||
|
||
def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess:
|
||
"""Run a tmux command."""
|
||
env = os.environ.copy()
|
||
if clear_tmux_env:
|
||
env["TMUX"] = ""
|
||
return subprocess.run(
|
||
["tmux", *args],
|
||
env=env,
|
||
check=False,
|
||
)
|
||
|
||
|
||
def session_exists(session_name: str) -> bool:
|
||
"""Check if the tmux session exists."""
|
||
result = run_tmux(["has-session", "-t", f"={session_name}"])
|
||
return result.returncode == 0
|
||
|
||
|
||
def pick_project(base_dir: Path) -> Optional[str]:
|
||
"""Use fzf to pick a subdirectory (project) from base_dir."""
|
||
if not base_dir.is_dir():
|
||
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
|
||
return None
|
||
|
||
subdirs = sorted(
|
||
[p.name for p in base_dir.iterdir() if p.is_dir()]
|
||
)
|
||
if not subdirs:
|
||
typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True)
|
||
return None
|
||
|
||
if not shutil.which("fzf"):
|
||
typer.echo("[ta] fzf not found in PATH.", err=True)
|
||
return None
|
||
|
||
proc = subprocess.run(
|
||
["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"],
|
||
input="\n".join(subdirs),
|
||
text=True,
|
||
capture_output=True,
|
||
)
|
||
|
||
if proc.returncode != 0:
|
||
# Cancelled or failed
|
||
return None
|
||
|
||
choice = proc.stdout.strip()
|
||
return choice or None
|
||
|
||
|
||
def create_detached_session(
|
||
session_name: str,
|
||
path_name: Path,
|
||
start_mode: bool,
|
||
) -> bool:
|
||
"""
|
||
Create a detached session.
|
||
|
||
- If start_mode: just a single window.
|
||
- Else: split layout with nvim on top.
|
||
"""
|
||
# Run tmux as if not in tmux (clear TMUX env)
|
||
if start_mode:
|
||
r = run_tmux(
|
||
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
||
clear_tmux_env=True,
|
||
)
|
||
return r.returncode == 0
|
||
else:
|
||
r = run_tmux(
|
||
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
||
clear_tmux_env=True,
|
||
)
|
||
if r.returncode != 0:
|
||
return False
|
||
|
||
r = run_tmux(
|
||
[
|
||
"split-window",
|
||
"-vb",
|
||
"-t",
|
||
session_name,
|
||
"-c",
|
||
str(path_name),
|
||
"-p",
|
||
"70",
|
||
],
|
||
clear_tmux_env=True,
|
||
)
|
||
if r.returncode != 0:
|
||
return False
|
||
|
||
r = run_tmux(
|
||
[
|
||
"send-keys",
|
||
"-t",
|
||
session_name,
|
||
"nvim '+Telescope find_files'",
|
||
"Enter",
|
||
],
|
||
clear_tmux_env=True,
|
||
)
|
||
return r.returncode == 0
|
||
|
||
|
||
def create_if_needed_and_attach(
|
||
session_name: str,
|
||
path_name: Path,
|
||
start_mode: bool,
|
||
) -> bool:
|
||
"""
|
||
Mimic the bash logic:
|
||
|
||
- If not in tmux: `tmux new-session -As ... -c path_name`
|
||
- Else:
|
||
- If session doesn't exist: create_detached_session()
|
||
- Then `tmux switch-client -t session_name`
|
||
"""
|
||
if not_in_tmux():
|
||
r = run_tmux(
|
||
["new-session", "-As", session_name, "-c", str(path_name)],
|
||
)
|
||
return r.returncode == 0
|
||
|
||
# Inside tmux
|
||
if not session_exists(session_name):
|
||
if not create_detached_session(session_name, path_name, start_mode):
|
||
return False
|
||
|
||
r = run_tmux(["switch-client", "-t", session_name])
|
||
return r.returncode == 0
|
||
|
||
|
||
def attach_to_first_session() -> None:
|
||
"""Fallback: attach to first tmux session and open choose-tree."""
|
||
# Attach to the first listed session
|
||
list_proc = subprocess.run(
|
||
["tmux", "list-sessions", "-F", "#{session_name}"],
|
||
text=True,
|
||
capture_output=True,
|
||
)
|
||
if list_proc.returncode != 0 or not list_proc.stdout.strip():
|
||
typer.echo("[ta] No tmux sessions found to attach to.", err=True)
|
||
raise typer.Exit(1)
|
||
|
||
first_session = list_proc.stdout.strip().splitlines()[0]
|
||
|
||
attach_proc = run_tmux(["attach-session", "-t", first_session])
|
||
if attach_proc.returncode != 0:
|
||
raise typer.Exit(attach_proc.returncode)
|
||
|
||
# After attach, show choose-tree (this will run in the attached session)
|
||
run_tmux(["choose-tree", "-Za"])
|
||
|
||
tmux_app = typer.Typer(
|
||
name="tmux",
|
||
help="tmux commands",
|
||
add_completion=False,
|
||
)
|
||
|
||
@tmux_app.command('attach')
|
||
def attach(
|
||
ctx: typer.Context,
|
||
workspace: Optional[str] = typer.Option(
|
||
None,
|
||
"--workspace",
|
||
"-w",
|
||
help=(
|
||
"Workspace directory name to show status for. "
|
||
"If omitted, uses the workspace containing the current directory."
|
||
),
|
||
),
|
||
):
|
||
"""
|
||
Attach or create a session for a repo in a workspace.
|
||
"""
|
||
# ws_dir = get_workspace_dir(workspace)
|
||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||
if not ws_dir:
|
||
console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]")
|
||
raise typer.Exit(1)
|
||
# pick repo in workspace
|
||
repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()])
|
||
if not repo:
|
||
console.print("[red]No repo selected. Exiting.[/red]")
|
||
raise typer.Exit(1)
|
||
print(ws_dir, repo)
|
||
session_name = f'{ws_dir.name}|{repo.name}'
|
||
print(f'{ws_dir.name}|{repo.name}')
|
||
create_if_needed_and_attach(session_name, repo, False)
|
||
|
||
|
||
app.add_typer(tmux_app)
|
||
|
||
if __name__ == "__main__":
|
||
app()
|
||
|
||
|