Compare commits

...

5 commits

2 changed files with 94 additions and 144 deletions

View file

@ -48,7 +48,7 @@ select = [
"I", # isort "I", # isort
"N", # pep8-naming "N", # pep8-naming
# "D", # pydocstyle # "D", # pydocstyle
# "UP", # pyupgrade "UP", # pyupgrade
"YTT", # flake8-2020 "YTT", # flake8-2020
# "ANN", # flake8-annotations # "ANN", # flake8-annotations
# "S", # flake8-bandit # "S", # flake8-bandit
@ -66,7 +66,7 @@ select = [
"ISC", # flake8-implicit-str-concat "ISC", # flake8-implicit-str-concat
"ICN", # flake8-import-conventions "ICN", # flake8-import-conventions
"G", # flake8-logging-format "G", # flake8-logging-format
# "INP", # flake8-no-pep420 "INP", # flake8-no-pep420
"PIE", # flake8-pie "PIE", # flake8-pie
"T20", # flake8-print "T20", # flake8-print
"PYI", # flake8-pyi "PYI", # flake8-pyi
@ -74,23 +74,23 @@ select = [
"Q", # flake8-quotes "Q", # flake8-quotes
"RSE", # flake8-raise "RSE", # flake8-raise
"RET", # flake8-return "RET", # flake8-return
# "SLF", # flake8-self "SLF", # flake8-self
# "SIM", # flake8-simplify "SIM", # flake8-simplify
"TID", # flake8-tidy-imports "TID", # flake8-tidy-imports
"TCH", # flake8-type-checking "TCH", # flake8-type-checking
# "INT", # flake8-gettext "INT", # flake8-gettext
# "ARG", # flake8-unused-arguments "ARG", # flake8-unused-arguments
"PTH", # flake8-use-pathlib "PTH", # flake8-use-pathlib
# "ERA", # eradicate "ERA", # eradicate
"PD", # pandas-vet "PD", # pandas-vet
"PGH", # pygrep-hooks "PGH", # pygrep-hooks
# "PL", # Pylint # "PL", # Pylint
# "PLC", # Convention "PLC", # Convention
"PLE", # Error "PLE", # Error
# "PLR", # Refactor # "PLR", # Refactor
"PLW", # Warning "PLW", # Warning
# "TRY", # tryceratops "TRY", # tryceratops
"NPY", # NumPy-specific rules "NPY", # NumPy-specific rules
# "RUF", # Ruff-specific rules "RUF", # Ruff-specific rules
] ]

View file

@ -18,9 +18,6 @@ import shutil
import subprocess import subprocess
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple
import typer import typer
from iterfzf import iterfzf from iterfzf import iterfzf
@ -47,8 +44,7 @@ console = Console()
class Settings(BaseSettings): class Settings(BaseSettings):
""" """Global configuration for workspaces.
Global configuration for workspaces.
Resolution for workspaces_name: Resolution for workspaces_name:
1. Command-line flag --workspaces-name 1. Command-line flag --workspaces-name
@ -74,10 +70,9 @@ class Settings(BaseSettings):
@classmethod @classmethod
def from_env_and_override( def from_env_and_override(
cls, override_workspaces_name: Optional[str] cls, override_workspaces_name: str | None
) -> "Settings": ) -> Settings:
""" """Construct settings honoring:
Construct settings honoring:
1. CLI override 1. CLI override
2. WORKSPACES_NAME env 2. WORKSPACES_NAME env
3. default "git" 3. default "git"
@ -91,9 +86,8 @@ class Settings(BaseSettings):
return s return s
def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]: def resolve_paths(workspaces_name: str | None) -> tuple[Settings, Path, Path]:
""" """Build Settings and derived paths, honoring CLI override of workspaces_name.
Build Settings and derived paths, honoring CLI override of workspaces_name.
""" """
base_settings = Settings.from_env_and_override(workspaces_name) base_settings = Settings.from_env_and_override(workspaces_name)
name = base_settings.workspaces_name name = base_settings.workspaces_name
@ -115,15 +109,14 @@ class GitStatus:
@property @property
def indicator(self) -> str: def indicator(self) -> str:
""" """Build ASCII indicator:
Build ASCII indicator:
- clean: "·" - clean: "·"
- ahead 1: "↑1" - ahead 1: "↑1"
- behind 2: "↓2" - behind 2: "↓2"
- both ahead/behind: "↑1 ↓2" - both ahead/behind: "↑1 ↓2"
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*" - add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
""" """
parts: List[str] = [] parts: list[str] = []
if self.ahead: if self.ahead:
parts.append(f"{self.ahead}") parts.append(f"{self.ahead}")
if self.behind: if self.behind:
@ -135,8 +128,7 @@ class GitStatus:
def get_git_status(repo_path: Path) -> GitStatus: def get_git_status(repo_path: Path) -> GitStatus:
""" """Get ahead/behind and dirty info for a Git repo.
Get ahead/behind and dirty info for a Git repo.
Uses `git status --porcelain=v2 --branch` and parses: Uses `git status --porcelain=v2 --branch` and parses:
- '# branch.ab +A -B' for ahead/behind - '# branch.ab +A -B' for ahead/behind
@ -158,7 +150,8 @@ def get_git_status(repo_path: Path) -> GitStatus:
for line in out.splitlines(): for line in out.splitlines():
if line.startswith("# branch.ab"): if line.startswith("# branch.ab"):
# Example: "# branch.ab +1 -2" # Example:
# branch.ab +1 -2
m = re.search(r"\+(\d+)\s+-(\d+)", line) m = re.search(r"\+(\d+)\s+-(\d+)", line)
if m: if m:
ahead = int(m.group(1)) ahead = int(m.group(1))
@ -169,7 +162,7 @@ def get_git_status(repo_path: Path) -> GitStatus:
return GitStatus(ahead=ahead, behind=behind, dirty=dirty) return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
def get_current_branch(repo_path: Path) -> Optional[str]: def get_current_branch(repo_path: Path) -> str | None:
try: try:
out = subprocess.check_output( out = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"], ["git", "rev-parse", "--abbrev-ref", "HEAD"],
@ -187,7 +180,7 @@ def ensure_git_repo(path: Path) -> bool:
return (path / ".git").exists() return (path / ".git").exists()
def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]: def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
proc = subprocess.Popen( proc = subprocess.Popen(
cmd, cmd,
cwd=cwd, cwd=cwd,
@ -199,9 +192,8 @@ def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
return proc.returncode, out, err return proc.returncode, out, err
def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path: def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path:
""" """Resolve the directory of a workspace.
Resolve the directory of a workspace.
- If workspace_name given: use workspaces_dir / workspace_name - If workspace_name given: use workspaces_dir / workspace_name
- Else: use current working directory (must be inside workspaces_dir). - Else: use current working directory (must be inside workspaces_dir).
@ -226,9 +218,8 @@ def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> P
return workspace_root return workspace_root
def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]: def read_workspace_readme(ws_dir: Path) -> tuple[str, str]:
""" """Return (name_from_h1, description_from_rest_of_file).
Return (name_from_h1, description_from_rest_of_file).
If file missing or malformed, fallback appropriately. If file missing or malformed, fallback appropriately.
""" """
readme = ws_dir / "readme.md" readme = ws_dir / "readme.md"
@ -269,8 +260,7 @@ def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
def slugify_workspace_name(name: str) -> str: def slugify_workspace_name(name: str) -> str:
""" """Turn arbitrary workspace name into a safe directory/worktree name.
Turn arbitrary workspace name into a safe directory/worktree name.
- Lowercase - Lowercase
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later) - Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
@ -293,8 +283,7 @@ def slugify_workspace_name(name: str) -> str:
def branch_name_for_workspace(name: str) -> str: def branch_name_for_workspace(name: str) -> str:
""" """Compute branch name from workspace name.
Compute branch name from workspace name.
Rules: Rules:
- Start from slugified name (no spaces/specials). - Start from slugified name (no spaces/specials).
@ -316,7 +305,7 @@ def branch_name_for_workspace(name: str) -> str:
@app.callback() @app.callback()
def main( def main(
ctx: typer.Context, ctx: typer.Context,
workspaces_name: Optional[str] = typer.Option( workspaces_name: str | None = typer.Option(
None, None,
"--workspaces-name", "--workspaces-name",
"-W", "-W",
@ -326,8 +315,7 @@ def main(
), ),
), ),
): ):
""" """Manage workspaces and associated Git worktrees.
Manage workspaces and associated Git worktrees.
If no command is given, this will list workspaces. If no command is given, this will list workspaces.
""" """
@ -344,7 +332,7 @@ def main(
raise typer.Exit(0) raise typer.Exit(0)
def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]: def get_ctx_paths(ctx: typer.Context) -> tuple[Settings, Path, Path]:
obj = ctx.obj or {} obj = ctx.obj or {}
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"] return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
@ -362,8 +350,7 @@ def list_workspaces(workspaces_dir: Path):
def cli_list_workspaces( def cli_list_workspaces(
ctx: typer.Context, ctx: typer.Context,
): ):
""" """List all workspaces.
List all workspaces.
Shows: Shows:
- workspace directory name - workspace directory name
@ -385,7 +372,7 @@ def cli_list_workspaces(
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()): for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
title, desc = read_workspace_readme(ws) title, desc = read_workspace_readme(ws)
repos: List[str] = [] repos: list[str] = []
for child in sorted(p for p in ws.iterdir() if p.is_dir()): for child in sorted(p for p in ws.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md": if child.name.lower() == "readme.md":
continue continue
@ -408,21 +395,20 @@ def cli_list_workspaces(
@app.command("new", hidden=True) @app.command("new", hidden=True)
def create_workspace( def create_workspace(
ctx: typer.Context, ctx: typer.Context,
name: Optional[str] = typer.Option( name: str | None = typer.Option(
None, None,
"--name", "--name",
"-n", "-n",
help="Name of the new workspace (display name; can contain spaces).", help="Name of the new workspace (display name; can contain spaces).",
), ),
description: Optional[str] = typer.Option( description: str | None = typer.Option(
None, None,
"--description", "--description",
"-d", "-d",
help="Description of the workspace. Will be written into readme.md.", help="Description of the workspace. Will be written into readme.md.",
), ),
): ):
""" """Create a new workspace.
Create a new workspace.
- Asks for name and description if not provided. - Asks for name and description if not provided.
- Workspace directory uses a slugified version of the name. - Workspace directory uses a slugified version of the name.
@ -457,7 +443,7 @@ def create_workspace(
@app.command("list-repos") @app.command("list-repos")
def list_repos( def list_repos(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -467,8 +453,7 @@ def list_repos(
), ),
), ),
): ):
""" """List repos and branches in the current (or specified) workspace.
List repos and branches in the current (or specified) workspace.
Shows: Shows:
- repo directory name - repo directory name
@ -505,9 +490,8 @@ def list_repos(
# ---------------- add-repo ---------------- # ---------------- add-repo ----------------
def list_all_repos(repos_dir: Path) -> List[Path]: def list_all_repos(repos_dir: Path) -> list[Path]:
""" """List all directories in repos_dir that appear to be git repos.
List all directories in repos_dir that appear to be git repos.
""" """
if not repos_dir.exists(): if not repos_dir.exists():
return [] return []
@ -518,9 +502,8 @@ def list_all_repos(repos_dir: Path) -> List[Path]:
return repos return repos
def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]: def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None:
""" """Use iterfzf (Python library) to pick a repo from a list of paths.
Use iterfzf (Python library) to pick a repo from a list of paths.
""" """
if not repos: if not repos:
return None return None
@ -537,9 +520,8 @@ def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
return None return None
def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]: def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None:
""" """Use iterfzf (Python library) to pick a workspace from a list of paths.
Use iterfzf (Python library) to pick a workspace from a list of paths.
""" """
names = [w.name for w in workspaces] names = [w.name for w in workspaces]
choice = iterfzf(names, prompt="pick a workspace> ") choice = iterfzf(names, prompt="pick a workspace> ")
@ -557,7 +539,7 @@ def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
@app.command("add", hidden=True) @app.command("add", hidden=True)
def add_repo( def add_repo(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -566,7 +548,7 @@ def add_repo(
"If omitted, uses the workspace containing the current directory." "If omitted, uses the workspace containing the current directory."
), ),
), ),
repo_name: Optional[str] = typer.Option( repo_name: str | None = typer.Option(
None, None,
"--repo", "--repo",
"-r", "-r",
@ -576,8 +558,7 @@ def add_repo(
), ),
), ),
): ):
""" """Add a repo to a workspace.
Add a repo to a workspace.
- Lists all directories in repos_dir as repos. - Lists all directories in repos_dir as repos.
- Uses iterfzf to pick repo if --repo not given. - Uses iterfzf to pick repo if --repo not given.
@ -601,7 +582,7 @@ def add_repo(
console.print(f"[red]No git repos found in {repos_dir}[/red]") console.print(f"[red]No git repos found in {repos_dir}[/red]")
raise typer.Exit(1) raise typer.Exit(1)
repo_path: Optional[Path] = None repo_path: Path | None = None
if repo_name: if repo_name:
for r in all_repos: for r in all_repos:
if r.name == repo_name: if r.name == repo_name:
@ -664,9 +645,8 @@ def add_repo(
# ---------------- rm-workspace ---------------- # ---------------- rm-workspace ----------------
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Path]: def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None:
""" """Try to find the parent repo for a worktree, assuming it lives in repos_dir
Try to find the parent repo for a worktree, assuming it lives in repos_dir
with the same directory name as the worktree. with the same directory name as the worktree.
""" """
candidate = repos_dir / worktree_path.name candidate = repos_dir / worktree_path.name
@ -675,9 +655,8 @@ def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Pat
return None return None
def has_unpushed_commits(repo_path: Path, branch: str) -> bool: def has_unpushed_commits(repo_path: Path) -> bool:
""" """Detect if branch has commits not on its upstream.
Detect if branch has commits not on its upstream.
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits. Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
""" """
# Check if upstream exists # Check if upstream exists
@ -707,8 +686,7 @@ def is_branch_integrated_into_main(
branch: str, branch: str,
main_ref: str = "origin/main", main_ref: str = "origin/main",
) -> bool: ) -> bool:
""" """Heuristic: is `branch`'s content already in `main_ref`?
Heuristic: is `branch`'s content already in `main_ref`?
Returns True if: Returns True if:
- branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR - branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR
@ -760,7 +738,7 @@ def is_branch_integrated_into_main(
@app.command("rm") @app.command("rm")
def remove_workspace( def remove_workspace(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -781,8 +759,7 @@ def remove_workspace(
help="Ref to consider as the integration target (default: origin/main).", help="Ref to consider as the integration target (default: origin/main).",
), ),
): ):
""" """Remove a workspace:
Remove a workspace:
- For each repo worktree in the workspace: - For each repo worktree in the workspace:
* Check for dirty work or unpushed commits. * Check for dirty work or unpushed commits.
@ -802,7 +779,7 @@ def remove_workspace(
title, _desc = read_workspace_readme(ws_dir) title, _desc = read_workspace_readme(ws_dir)
# Collect worktrees (subdirs that look like git repos, excluding readme.md) # Collect worktrees (subdirs that look like git repos, excluding readme.md)
worktrees: List[Path] = [] worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md": if child.name.lower() == "readme.md":
continue continue
@ -823,7 +800,7 @@ def remove_workspace(
raise typer.Exit(0) raise typer.Exit(0)
# Check for dirty / unpushed changes # Check for dirty / unpushed changes
problems: List[str] = [] problems: list[str] = []
for wt in worktrees: for wt in worktrees:
status = get_git_status(wt) status = get_git_status(wt)
branch = get_current_branch(wt) or "?" branch = get_current_branch(wt) or "?"
@ -837,9 +814,8 @@ def remove_workspace(
problems.append(f"{wt.name}: dirty working tree on '{branch}'") problems.append(f"{wt.name}: dirty working tree on '{branch}'")
# Only care about "unpushed commits" if the branch is NOT integrated into main. # Only care about "unpushed commits" if the branch is NOT integrated into main.
if repo is not None and branch != "?" and not integrated: if repo is not None and branch != "?" and not integrated and has_unpushed_commits(wt):
if has_unpushed_commits(wt, branch): problems.append(f"{wt.name}: unpushed commits on '{branch}'")
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
if problems and not force: if problems and not force:
console.print( console.print(
"[red]Refusing to remove workspace; found dirty or unpushed work:[/red]" "[red]Refusing to remove workspace; found dirty or unpushed work:[/red]"
@ -905,9 +881,8 @@ def remove_workspace(
# ---------------- wip (stage + commit) ---------------- # ---------------- wip (stage + commit) ----------------
def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]: def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]:
""" """Return a list of (status_code, path) for changes in the repo/worktree.
Return a list of (status_code, path) for changes in the repo/worktree.
Uses 'git status --porcelain'. Uses 'git status --porcelain'.
""" """
@ -916,20 +891,18 @@ def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}") console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}")
return [] return []
changes: List[Tuple[str, str]] = [] changes: list[tuple[str, str]] = []
for line in out.splitlines(): for line in out.splitlines():
if not line.strip(): if not line.strip():
continue continue
# Format: 'XY path'
status = line[:2] status = line[:2]
path = line[3:] path = line[3:]
changes.append((status, path)) changes.append((status, path))
return changes return changes
def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]: def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> list[str]:
""" """Interactively choose which files to stage for WIP.
Interactively choose which files to stage for WIP.
- Show list of changed files. - Show list of changed files.
- Ask user: - Ask user:
@ -968,18 +941,13 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
return [] return []
# iterfzf may return a single string or list depending on version; # iterfzf may return a single string or list depending on version;
# normalize to list of strings. # normalize to list of strings.
if isinstance(selected, str): return [selected] if isinstance(selected, str) else list(selected)
selected_files = [selected]
else:
selected_files = list(selected)
return selected_files
@app.command("wip") @app.command("wip")
def wip_workspace( def wip_workspace(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -989,8 +957,7 @@ def wip_workspace(
), ),
), ),
): ):
""" """For each repo in the workspace:
For each repo in the workspace:
- Show list of changed files. - Show list of changed files.
- Ask whether to stage all, none, or pick some files. - Ask whether to stage all, none, or pick some files.
@ -1012,7 +979,7 @@ def wip_workspace(
@app.command("commit") @app.command("commit")
def commit_workspace( def commit_workspace(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -1021,15 +988,14 @@ def commit_workspace(
"If omitted, uses the workspace containing the current directory." "If omitted, uses the workspace containing the current directory."
), ),
), ),
message: Optional[str] = typer.Option( message: str | None = typer.Option(
None, None,
"--message", "--message",
"-m", "-m",
help="Commit message to use.", help="Commit message to use.",
), ),
): ):
""" """For each repo in the workspace:
For each repo in the workspace:
- Show list of changed files. - Show list of changed files.
- Ask whether to stage all, none, or pick some files. - Ask whether to stage all, none, or pick some files.
@ -1047,9 +1013,8 @@ def commit_workspace(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
) )
raise typer.Exit(1) raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
worktrees: List[Path] = [] worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md": if child.name.lower() == "readme.md":
continue continue
@ -1099,7 +1064,7 @@ def commit_workspace(
@app.command("push") @app.command("push")
def push_workspace( def push_workspace(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -1115,8 +1080,7 @@ def push_workspace(
help="Remote name to push to (default: origin).", help="Remote name to push to (default: origin).",
), ),
): ):
""" """For each repo in the workspace, run 'git push <remote> <current-branch>'.
For each repo in the workspace, run 'git push <remote> <current-branch>'.
Skips repos with no current branch or when push fails. Skips repos with no current branch or when push fails.
""" """
@ -1134,7 +1098,7 @@ def push_workspace(
f"(dir: {ws_dir.name}) to remote '{remote}'" f"(dir: {ws_dir.name}) to remote '{remote}'"
) )
worktrees: List[Path] = [] worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md": if child.name.lower() == "readme.md":
continue continue
@ -1170,7 +1134,7 @@ def push_workspace(
@app.command("status") @app.command("status")
def status_workspace( def status_workspace(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -1180,8 +1144,7 @@ def status_workspace(
), ),
), ),
): ):
""" """Show detailed status for the current (or specified) workspace.
Show detailed status for the current (or specified) workspace.
For each repo in the workspace: For each repo in the workspace:
- repo directory name - repo directory name
@ -1199,8 +1162,6 @@ def status_workspace(
title, desc = read_workspace_readme(ws_dir) title, desc = read_workspace_readme(ws_dir)
# if desc:
# console.print(Panel(desc, title=title))
table = Table( table = Table(
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
@ -1217,7 +1178,7 @@ def status_workspace(
files_table.add_column("File", justify="left") files_table.add_column("File", justify="left")
files_table.add_column("Status", justify="right") files_table.add_column("Status", justify="right")
worktrees: List[Path] = [] worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md": if child.name.lower() == "readme.md":
continue continue
@ -1254,7 +1215,7 @@ def status_workspace(
@app.command("diff") @app.command("diff")
def diff_workspace( def diff_workspace(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -1270,8 +1231,7 @@ def diff_workspace(
help="Show only staged changes (git diff --cached).", help="Show only staged changes (git diff --cached).",
), ),
): ):
""" """Show git diff for all repos in the workspace.
Show git diff for all repos in the workspace.
Uses rich Syntax highlighter for diffs. Uses rich Syntax highlighter for diffs.
""" """
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
@ -1289,7 +1249,7 @@ def diff_workspace(
) )
# Collect repos # Collect repos
worktrees: List[Path] = [] worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md": if child.name.lower() == "readme.md":
continue continue
@ -1366,7 +1326,7 @@ def run_tmux(
) )
def get_tmux_sessions() -> List[str]: def get_tmux_sessions() -> list[str]:
"""List all tmux sessions.""" """List all tmux sessions."""
result = run_tmux(["list-sessions", "-F", "#{session_name}"]) result = run_tmux(["list-sessions", "-F", "#{session_name}"])
return result.stdout.decode().splitlines() return result.stdout.decode().splitlines()
@ -1378,7 +1338,7 @@ def session_exists(session_name: str) -> bool:
return result.returncode == 0 return result.returncode == 0
def pick_project(base_dir: Path) -> Optional[str]: def pick_project(base_dir: Path) -> str | None:
"""Use fzf to pick a subdirectory (project) from base_dir.""" """Use fzf to pick a subdirectory (project) from base_dir."""
if not base_dir.is_dir(): if not base_dir.is_dir():
typer.echo(f"[ta] {base_dir} is not a directory.", err=True) typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
@ -1414,8 +1374,7 @@ def create_detached_session(
path_name: Path, path_name: Path,
start_mode: bool, start_mode: bool,
) -> bool: ) -> bool:
""" """Create a detached session.
Create a detached session.
- If start_mode: just a single window. - If start_mode: just a single window.
- Else: split layout with nvim on top. - Else: split layout with nvim on top.
@ -1468,8 +1427,7 @@ def create_if_needed_and_attach(
path_name: Path, path_name: Path,
start_mode: bool, start_mode: bool,
) -> bool: ) -> bool:
""" """Mimic the bash logic:
Mimic the bash logic:
- If not in tmux: `tmux new-session -As ... -c path_name` - If not in tmux: `tmux new-session -As ... -c path_name`
- Else: - Else:
@ -1483,9 +1441,8 @@ def create_if_needed_and_attach(
return r.returncode == 0 return r.returncode == 0
# Inside tmux # Inside tmux
if not session_exists(session_name): if not session_exists(session_name) and not create_detached_session(session_name, path_name, start_mode):
if not create_detached_session(session_name, path_name, start_mode): return False
return False
r = run_tmux(["switch-client", "-t", session_name]) r = run_tmux(["switch-client", "-t", session_name])
return r.returncode == 0 return r.returncode == 0
@ -1536,7 +1493,7 @@ tmux_app = typer.Typer(
@tmux_app.command("attach") @tmux_app.command("attach")
def tmux_cli_attach( def tmux_cli_attach(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -1546,10 +1503,8 @@ def tmux_cli_attach(
), ),
), ),
): ):
"""Attach or create a session for a repo in a workspace.
""" """
Attach or create a session for a repo in a workspace.
"""
# ws_dir = get_workspace_dir(workspace)
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace) ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir: if not ws_dir:
@ -1567,8 +1522,7 @@ def tmux_cli_attach(
@tmux_app.command("list-sessions") @tmux_app.command("list-sessions")
def tmux_cli_list_sessions( def tmux_cli_list_sessions(
ctx: typer.Context, workspace: str | None = typer.Option(
workspace: Optional[str] = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -1595,8 +1549,7 @@ def tmux_cli_list_sessions(
@tmux_app.command("remove") @tmux_app.command("remove")
def tmux_cli_remove( def tmux_cli_remove(
ctx: typer.Context, workspace: str | None = typer.Option(
workspace: Optional[str] = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",
@ -1624,12 +1577,11 @@ def tmux_cli_remove(
tmux_sessions = [ tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith("ω") session for session in get_tmux_sessions() if session.startswith("ω")
] ]
if not dry_run: if not dry_run and not Confirm.ask(
if not Confirm.ask(
f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n", f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n",
default=False, default=False,
): ):
raise typer.Exit(1) raise typer.Exit(1)
if dry_run: if dry_run:
console.print(tmux_sessions) console.print(tmux_sessions)
@ -1638,8 +1590,6 @@ def tmux_cli_remove(
for session_name in tmux_sessions: for session_name in tmux_sessions:
remove_session(session_name) remove_session(session_name)
#
# remove_session_if_exists(session_name)
app.add_typer(tmux_app) app.add_typer(tmux_app)
@ -1648,7 +1598,7 @@ app.add_typer(tmux_app)
@app.command("attach") @app.command("attach")
def attach( def attach(
ctx: typer.Context, ctx: typer.Context,
workspace: Optional[str] = typer.Option( workspace: str | None = typer.Option(
None, None,
"--workspace", "--workspace",
"-w", "-w",