wip: feat: workspace tmux remove

This commit is contained in:
Waylon Walker 2025-11-26 10:59:52 -06:00
parent a0d308c19f
commit 7bb9b49e9b
2 changed files with 75 additions and 109 deletions

View file

@ -48,7 +48,7 @@ select = [
"I", # isort
"N", # pep8-naming
# "D", # pydocstyle
# "UP", # pyupgrade
"UP", # pyupgrade
"YTT", # flake8-2020
# "ANN", # flake8-annotations
# "S", # flake8-bandit

View file

@ -18,9 +18,6 @@ import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple
import typer
from iterfzf import iterfzf
@ -47,8 +44,7 @@ console = Console()
class Settings(BaseSettings):
"""
Global configuration for workspaces.
"""Global configuration for workspaces.
Resolution for workspaces_name:
1. Command-line flag --workspaces-name
@ -74,10 +70,9 @@ class Settings(BaseSettings):
@classmethod
def from_env_and_override(
cls, override_workspaces_name: Optional[str]
) -> "Settings":
"""
Construct settings honoring:
cls, override_workspaces_name: str | None
) -> Settings:
"""Construct settings honoring:
1. CLI override
2. WORKSPACES_NAME env
3. default "git"
@ -91,9 +86,8 @@ class Settings(BaseSettings):
return s
def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]:
"""
Build Settings and derived paths, honoring CLI override of workspaces_name.
def resolve_paths(workspaces_name: str | None) -> tuple[Settings, Path, Path]:
"""Build Settings and derived paths, honoring CLI override of workspaces_name.
"""
base_settings = Settings.from_env_and_override(workspaces_name)
name = base_settings.workspaces_name
@ -115,15 +109,14 @@ class GitStatus:
@property
def indicator(self) -> str:
"""
Build ASCII indicator:
"""Build ASCII indicator:
- clean: "·"
- ahead 1: "↑1"
- behind 2: "↓2"
- both ahead/behind: "↑1 ↓2"
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
"""
parts: List[str] = []
parts: list[str] = []
if self.ahead:
parts.append(f"{self.ahead}")
if self.behind:
@ -135,8 +128,7 @@ class GitStatus:
def get_git_status(repo_path: Path) -> GitStatus:
"""
Get ahead/behind and dirty info for a Git repo.
"""Get ahead/behind and dirty info for a Git repo.
Uses `git status --porcelain=v2 --branch` and parses:
- '# branch.ab +A -B' for ahead/behind
@ -170,7 +162,7 @@ def get_git_status(repo_path: Path) -> GitStatus:
return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
def get_current_branch(repo_path: Path) -> Optional[str]:
def get_current_branch(repo_path: Path) -> str | None:
try:
out = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
@ -188,7 +180,7 @@ def ensure_git_repo(path: Path) -> bool:
return (path / ".git").exists()
def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
proc = subprocess.Popen(
cmd,
cwd=cwd,
@ -200,9 +192,8 @@ def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
return proc.returncode, out, err
def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path:
"""
Resolve the directory of a workspace.
def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path:
"""Resolve the directory of a workspace.
- If workspace_name given: use workspaces_dir / workspace_name
- Else: use current working directory (must be inside workspaces_dir).
@ -227,9 +218,8 @@ def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> P
return workspace_root
def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
"""
Return (name_from_h1, description_from_rest_of_file).
def read_workspace_readme(ws_dir: Path) -> tuple[str, str]:
"""Return (name_from_h1, description_from_rest_of_file).
If file missing or malformed, fallback appropriately.
"""
readme = ws_dir / "readme.md"
@ -270,8 +260,7 @@ def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
def slugify_workspace_name(name: str) -> str:
"""
Turn arbitrary workspace name into a safe directory/worktree name.
"""Turn arbitrary workspace name into a safe directory/worktree name.
- Lowercase
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
@ -294,8 +283,7 @@ def slugify_workspace_name(name: str) -> str:
def branch_name_for_workspace(name: str) -> str:
"""
Compute branch name from workspace name.
"""Compute branch name from workspace name.
Rules:
- Start from slugified name (no spaces/specials).
@ -317,7 +305,7 @@ def branch_name_for_workspace(name: str) -> str:
@app.callback()
def main(
ctx: typer.Context,
workspaces_name: Optional[str] = typer.Option(
workspaces_name: str | None = typer.Option(
None,
"--workspaces-name",
"-W",
@ -327,8 +315,7 @@ def main(
),
),
):
"""
Manage workspaces and associated Git worktrees.
"""Manage workspaces and associated Git worktrees.
If no command is given, this will list workspaces.
"""
@ -345,7 +332,7 @@ def main(
raise typer.Exit(0)
def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]:
def get_ctx_paths(ctx: typer.Context) -> tuple[Settings, Path, Path]:
obj = ctx.obj or {}
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
@ -363,8 +350,7 @@ def list_workspaces(workspaces_dir: Path):
def cli_list_workspaces(
ctx: typer.Context,
):
"""
List all workspaces.
"""List all workspaces.
Shows:
- workspace directory name
@ -386,7 +372,7 @@ def cli_list_workspaces(
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
title, desc = read_workspace_readme(ws)
repos: List[str] = []
repos: list[str] = []
for child in sorted(p for p in ws.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -409,21 +395,20 @@ def cli_list_workspaces(
@app.command("new", hidden=True)
def create_workspace(
ctx: typer.Context,
name: Optional[str] = typer.Option(
name: str | None = typer.Option(
None,
"--name",
"-n",
help="Name of the new workspace (display name; can contain spaces).",
),
description: Optional[str] = typer.Option(
description: str | None = typer.Option(
None,
"--description",
"-d",
help="Description of the workspace. Will be written into readme.md.",
),
):
"""
Create a new workspace.
"""Create a new workspace.
- Asks for name and description if not provided.
- Workspace directory uses a slugified version of the name.
@ -458,7 +443,7 @@ def create_workspace(
@app.command("list-repos")
def list_repos(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -468,8 +453,7 @@ def list_repos(
),
),
):
"""
List repos and branches in the current (or specified) workspace.
"""List repos and branches in the current (or specified) workspace.
Shows:
- repo directory name
@ -506,9 +490,8 @@ def list_repos(
# ---------------- add-repo ----------------
def list_all_repos(repos_dir: Path) -> List[Path]:
"""
List all directories in repos_dir that appear to be git repos.
def list_all_repos(repos_dir: Path) -> list[Path]:
"""List all directories in repos_dir that appear to be git repos.
"""
if not repos_dir.exists():
return []
@ -519,9 +502,8 @@ def list_all_repos(repos_dir: Path) -> List[Path]:
return repos
def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
"""
Use iterfzf (Python library) to pick a repo from a list of paths.
def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None:
"""Use iterfzf (Python library) to pick a repo from a list of paths.
"""
if not repos:
return None
@ -538,9 +520,8 @@ def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
return None
def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
"""
Use iterfzf (Python library) to pick a workspace from a list of paths.
def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None:
"""Use iterfzf (Python library) to pick a workspace from a list of paths.
"""
names = [w.name for w in workspaces]
choice = iterfzf(names, prompt="pick a workspace> ")
@ -558,7 +539,7 @@ def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
@app.command("add", hidden=True)
def add_repo(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -567,7 +548,7 @@ def add_repo(
"If omitted, uses the workspace containing the current directory."
),
),
repo_name: Optional[str] = typer.Option(
repo_name: str | None = typer.Option(
None,
"--repo",
"-r",
@ -577,8 +558,7 @@ def add_repo(
),
),
):
"""
Add a repo to a workspace.
"""Add a repo to a workspace.
- Lists all directories in repos_dir as repos.
- Uses iterfzf to pick repo if --repo not given.
@ -602,7 +582,7 @@ def add_repo(
console.print(f"[red]No git repos found in {repos_dir}[/red]")
raise typer.Exit(1)
repo_path: Optional[Path] = None
repo_path: Path | None = None
if repo_name:
for r in all_repos:
if r.name == repo_name:
@ -665,9 +645,8 @@ def add_repo(
# ---------------- rm-workspace ----------------
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Path]:
"""
Try to find the parent repo for a worktree, assuming it lives in repos_dir
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None:
"""Try to find the parent repo for a worktree, assuming it lives in repos_dir
with the same directory name as the worktree.
"""
candidate = repos_dir / worktree_path.name
@ -677,8 +656,7 @@ def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Pat
def has_unpushed_commits(repo_path: Path) -> bool:
"""
Detect if branch has commits not on its upstream.
"""Detect if branch has commits not on its upstream.
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
"""
# Check if upstream exists
@ -708,8 +686,7 @@ def is_branch_integrated_into_main(
branch: str,
main_ref: str = "origin/main",
) -> bool:
"""
Heuristic: is `branch`'s content already in `main_ref`?
"""Heuristic: is `branch`'s content already in `main_ref`?
Returns True if:
- branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR
@ -761,7 +738,7 @@ def is_branch_integrated_into_main(
@app.command("rm")
def remove_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -782,8 +759,7 @@ def remove_workspace(
help="Ref to consider as the integration target (default: origin/main).",
),
):
"""
Remove a workspace:
"""Remove a workspace:
- For each repo worktree in the workspace:
* Check for dirty work or unpushed commits.
@ -803,7 +779,7 @@ def remove_workspace(
title, _desc = read_workspace_readme(ws_dir)
# Collect worktrees (subdirs that look like git repos, excluding readme.md)
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -824,7 +800,7 @@ def remove_workspace(
raise typer.Exit(0)
# Check for dirty / unpushed changes
problems: List[str] = []
problems: list[str] = []
for wt in worktrees:
status = get_git_status(wt)
branch = get_current_branch(wt) or "?"
@ -905,9 +881,8 @@ def remove_workspace(
# ---------------- wip (stage + commit) ----------------
def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
"""
Return a list of (status_code, path) for changes in the repo/worktree.
def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]:
"""Return a list of (status_code, path) for changes in the repo/worktree.
Uses 'git status --porcelain'.
"""
@ -916,7 +891,7 @@ def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}")
return []
changes: List[Tuple[str, str]] = []
changes: list[tuple[str, str]] = []
for line in out.splitlines():
if not line.strip():
continue
@ -926,9 +901,8 @@ def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
return changes
def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]:
"""
Interactively choose which files to stage for WIP.
def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> list[str]:
"""Interactively choose which files to stage for WIP.
- Show list of changed files.
- Ask user:
@ -973,7 +947,7 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
@app.command("wip")
def wip_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -983,8 +957,7 @@ def wip_workspace(
),
),
):
"""
For each repo in the workspace:
"""For each repo in the workspace:
- Show list of changed files.
- Ask whether to stage all, none, or pick some files.
@ -1006,7 +979,7 @@ def wip_workspace(
@app.command("commit")
def commit_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1015,15 +988,14 @@ def commit_workspace(
"If omitted, uses the workspace containing the current directory."
),
),
message: Optional[str] = typer.Option(
message: str | None = typer.Option(
None,
"--message",
"-m",
help="Commit message to use.",
),
):
"""
For each repo in the workspace:
"""For each repo in the workspace:
- Show list of changed files.
- Ask whether to stage all, none, or pick some files.
@ -1042,7 +1014,7 @@ def commit_workspace(
)
raise typer.Exit(1)
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1092,7 +1064,7 @@ def commit_workspace(
@app.command("push")
def push_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1108,8 +1080,7 @@ def push_workspace(
help="Remote name to push to (default: origin).",
),
):
"""
For each repo in the workspace, run 'git push <remote> <current-branch>'.
"""For each repo in the workspace, run 'git push <remote> <current-branch>'.
Skips repos with no current branch or when push fails.
"""
@ -1127,7 +1098,7 @@ def push_workspace(
f"(dir: {ws_dir.name}) to remote '{remote}'"
)
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1163,7 +1134,7 @@ def push_workspace(
@app.command("status")
def status_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1173,8 +1144,7 @@ def status_workspace(
),
),
):
"""
Show detailed status for the current (or specified) workspace.
"""Show detailed status for the current (or specified) workspace.
For each repo in the workspace:
- repo directory name
@ -1208,7 +1178,7 @@ def status_workspace(
files_table.add_column("File", justify="left")
files_table.add_column("Status", justify="right")
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1245,7 +1215,7 @@ def status_workspace(
@app.command("diff")
def diff_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1261,8 +1231,7 @@ def diff_workspace(
help="Show only staged changes (git diff --cached).",
),
):
"""
Show git diff for all repos in the workspace.
"""Show git diff for all repos in the workspace.
Uses rich Syntax highlighter for diffs.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
@ -1280,7 +1249,7 @@ def diff_workspace(
)
# Collect repos
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1357,7 +1326,7 @@ def run_tmux(
)
def get_tmux_sessions() -> List[str]:
def get_tmux_sessions() -> list[str]:
"""List all tmux sessions."""
result = run_tmux(["list-sessions", "-F", "#{session_name}"])
return result.stdout.decode().splitlines()
@ -1369,7 +1338,7 @@ def session_exists(session_name: str) -> bool:
return result.returncode == 0
def pick_project(base_dir: Path) -> Optional[str]:
def pick_project(base_dir: Path) -> str | None:
"""Use fzf to pick a subdirectory (project) from base_dir."""
if not base_dir.is_dir():
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
@ -1405,8 +1374,7 @@ def create_detached_session(
path_name: Path,
start_mode: bool,
) -> bool:
"""
Create a detached session.
"""Create a detached session.
- If start_mode: just a single window.
- Else: split layout with nvim on top.
@ -1459,8 +1427,7 @@ def create_if_needed_and_attach(
path_name: Path,
start_mode: bool,
) -> bool:
"""
Mimic the bash logic:
"""Mimic the bash logic:
- If not in tmux: `tmux new-session -As ... -c path_name`
- Else:
@ -1526,7 +1493,7 @@ tmux_app = typer.Typer(
@tmux_app.command("attach")
def tmux_cli_attach(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1536,8 +1503,7 @@ def tmux_cli_attach(
),
),
):
"""
Attach or create a session for a repo in a workspace.
"""Attach or create a session for a repo in a workspace.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
@ -1556,7 +1522,7 @@ def tmux_cli_attach(
@tmux_app.command("list-sessions")
def tmux_cli_list_sessions(
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1583,7 +1549,7 @@ def tmux_cli_list_sessions(
@tmux_app.command("remove")
def tmux_cli_remove(
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1632,7 +1598,7 @@ app.add_typer(tmux_app)
@app.command("attach")
def attach(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",