Compare commits

...

5 commits

2 changed files with 94 additions and 144 deletions

View file

@ -48,7 +48,7 @@ select = [
"I", # isort
"N", # pep8-naming
# "D", # pydocstyle
# "UP", # pyupgrade
"UP", # pyupgrade
"YTT", # flake8-2020
# "ANN", # flake8-annotations
# "S", # flake8-bandit
@ -66,7 +66,7 @@ select = [
"ISC", # flake8-implicit-str-concat
"ICN", # flake8-import-conventions
"G", # flake8-logging-format
# "INP", # flake8-no-pep420
"INP", # flake8-no-pep420
"PIE", # flake8-pie
"T20", # flake8-print
"PYI", # flake8-pyi
@ -74,23 +74,23 @@ select = [
"Q", # flake8-quotes
"RSE", # flake8-raise
"RET", # flake8-return
# "SLF", # flake8-self
# "SIM", # flake8-simplify
"SLF", # flake8-self
"SIM", # flake8-simplify
"TID", # flake8-tidy-imports
"TCH", # flake8-type-checking
# "INT", # flake8-gettext
# "ARG", # flake8-unused-arguments
"INT", # flake8-gettext
"ARG", # flake8-unused-arguments
"PTH", # flake8-use-pathlib
# "ERA", # eradicate
"ERA", # eradicate
"PD", # pandas-vet
"PGH", # pygrep-hooks
# "PL", # Pylint
# "PLC", # Convention
"PLC", # Convention
"PLE", # Error
# "PLR", # Refactor
"PLW", # Warning
# "TRY", # tryceratops
"TRY", # tryceratops
"NPY", # NumPy-specific rules
# "RUF", # Ruff-specific rules
"RUF", # Ruff-specific rules
]

View file

@ -18,9 +18,6 @@ import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple
import typer
from iterfzf import iterfzf
@ -47,8 +44,7 @@ console = Console()
class Settings(BaseSettings):
"""
Global configuration for workspaces.
"""Global configuration for workspaces.
Resolution for workspaces_name:
1. Command-line flag --workspaces-name
@ -74,10 +70,9 @@ class Settings(BaseSettings):
@classmethod
def from_env_and_override(
cls, override_workspaces_name: Optional[str]
) -> "Settings":
"""
Construct settings honoring:
cls, override_workspaces_name: str | None
) -> Settings:
"""Construct settings honoring:
1. CLI override
2. WORKSPACES_NAME env
3. default "git"
@ -91,9 +86,8 @@ class Settings(BaseSettings):
return s
def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]:
"""
Build Settings and derived paths, honoring CLI override of workspaces_name.
def resolve_paths(workspaces_name: str | None) -> tuple[Settings, Path, Path]:
"""Build Settings and derived paths, honoring CLI override of workspaces_name.
"""
base_settings = Settings.from_env_and_override(workspaces_name)
name = base_settings.workspaces_name
@ -115,15 +109,14 @@ class GitStatus:
@property
def indicator(self) -> str:
"""
Build ASCII indicator:
"""Build ASCII indicator:
- clean: "·"
- ahead 1: "↑1"
- behind 2: "↓2"
- both ahead/behind: "↑1 ↓2"
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
"""
parts: List[str] = []
parts: list[str] = []
if self.ahead:
parts.append(f"{self.ahead}")
if self.behind:
@ -135,8 +128,7 @@ class GitStatus:
def get_git_status(repo_path: Path) -> GitStatus:
"""
Get ahead/behind and dirty info for a Git repo.
"""Get ahead/behind and dirty info for a Git repo.
Uses `git status --porcelain=v2 --branch` and parses:
- '# branch.ab +A -B' for ahead/behind
@ -158,7 +150,8 @@ def get_git_status(repo_path: Path) -> GitStatus:
for line in out.splitlines():
if line.startswith("# branch.ab"):
# Example: "# branch.ab +1 -2"
# Example:
# branch.ab +1 -2
m = re.search(r"\+(\d+)\s+-(\d+)", line)
if m:
ahead = int(m.group(1))
@ -169,7 +162,7 @@ def get_git_status(repo_path: Path) -> GitStatus:
return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
def get_current_branch(repo_path: Path) -> Optional[str]:
def get_current_branch(repo_path: Path) -> str | None:
try:
out = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
@ -187,7 +180,7 @@ def ensure_git_repo(path: Path) -> bool:
return (path / ".git").exists()
def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
proc = subprocess.Popen(
cmd,
cwd=cwd,
@ -199,9 +192,8 @@ def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
return proc.returncode, out, err
def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path:
"""
Resolve the directory of a workspace.
def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path:
"""Resolve the directory of a workspace.
- If workspace_name given: use workspaces_dir / workspace_name
- Else: use current working directory (must be inside workspaces_dir).
@ -226,9 +218,8 @@ def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> P
return workspace_root
def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
"""
Return (name_from_h1, description_from_rest_of_file).
def read_workspace_readme(ws_dir: Path) -> tuple[str, str]:
"""Return (name_from_h1, description_from_rest_of_file).
If file missing or malformed, fallback appropriately.
"""
readme = ws_dir / "readme.md"
@ -269,8 +260,7 @@ def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
def slugify_workspace_name(name: str) -> str:
"""
Turn arbitrary workspace name into a safe directory/worktree name.
"""Turn arbitrary workspace name into a safe directory/worktree name.
- Lowercase
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
@ -293,8 +283,7 @@ def slugify_workspace_name(name: str) -> str:
def branch_name_for_workspace(name: str) -> str:
"""
Compute branch name from workspace name.
"""Compute branch name from workspace name.
Rules:
- Start from slugified name (no spaces/specials).
@ -316,7 +305,7 @@ def branch_name_for_workspace(name: str) -> str:
@app.callback()
def main(
ctx: typer.Context,
workspaces_name: Optional[str] = typer.Option(
workspaces_name: str | None = typer.Option(
None,
"--workspaces-name",
"-W",
@ -326,8 +315,7 @@ def main(
),
),
):
"""
Manage workspaces and associated Git worktrees.
"""Manage workspaces and associated Git worktrees.
If no command is given, this will list workspaces.
"""
@ -344,7 +332,7 @@ def main(
raise typer.Exit(0)
def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]:
def get_ctx_paths(ctx: typer.Context) -> tuple[Settings, Path, Path]:
obj = ctx.obj or {}
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
@ -362,8 +350,7 @@ def list_workspaces(workspaces_dir: Path):
def cli_list_workspaces(
ctx: typer.Context,
):
"""
List all workspaces.
"""List all workspaces.
Shows:
- workspace directory name
@ -385,7 +372,7 @@ def cli_list_workspaces(
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
title, desc = read_workspace_readme(ws)
repos: List[str] = []
repos: list[str] = []
for child in sorted(p for p in ws.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -408,21 +395,20 @@ def cli_list_workspaces(
@app.command("new", hidden=True)
def create_workspace(
ctx: typer.Context,
name: Optional[str] = typer.Option(
name: str | None = typer.Option(
None,
"--name",
"-n",
help="Name of the new workspace (display name; can contain spaces).",
),
description: Optional[str] = typer.Option(
description: str | None = typer.Option(
None,
"--description",
"-d",
help="Description of the workspace. Will be written into readme.md.",
),
):
"""
Create a new workspace.
"""Create a new workspace.
- Asks for name and description if not provided.
- Workspace directory uses a slugified version of the name.
@ -457,7 +443,7 @@ def create_workspace(
@app.command("list-repos")
def list_repos(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -467,8 +453,7 @@ def list_repos(
),
),
):
"""
List repos and branches in the current (or specified) workspace.
"""List repos and branches in the current (or specified) workspace.
Shows:
- repo directory name
@ -505,9 +490,8 @@ def list_repos(
# ---------------- add-repo ----------------
def list_all_repos(repos_dir: Path) -> List[Path]:
"""
List all directories in repos_dir that appear to be git repos.
def list_all_repos(repos_dir: Path) -> list[Path]:
"""List all directories in repos_dir that appear to be git repos.
"""
if not repos_dir.exists():
return []
@ -518,9 +502,8 @@ def list_all_repos(repos_dir: Path) -> List[Path]:
return repos
def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
"""
Use iterfzf (Python library) to pick a repo from a list of paths.
def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None:
"""Use iterfzf (Python library) to pick a repo from a list of paths.
"""
if not repos:
return None
@ -537,9 +520,8 @@ def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
return None
def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
"""
Use iterfzf (Python library) to pick a workspace from a list of paths.
def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None:
"""Use iterfzf (Python library) to pick a workspace from a list of paths.
"""
names = [w.name for w in workspaces]
choice = iterfzf(names, prompt="pick a workspace> ")
@ -557,7 +539,7 @@ def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
@app.command("add", hidden=True)
def add_repo(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -566,7 +548,7 @@ def add_repo(
"If omitted, uses the workspace containing the current directory."
),
),
repo_name: Optional[str] = typer.Option(
repo_name: str | None = typer.Option(
None,
"--repo",
"-r",
@ -576,8 +558,7 @@ def add_repo(
),
),
):
"""
Add a repo to a workspace.
"""Add a repo to a workspace.
- Lists all directories in repos_dir as repos.
- Uses iterfzf to pick repo if --repo not given.
@ -601,7 +582,7 @@ def add_repo(
console.print(f"[red]No git repos found in {repos_dir}[/red]")
raise typer.Exit(1)
repo_path: Optional[Path] = None
repo_path: Path | None = None
if repo_name:
for r in all_repos:
if r.name == repo_name:
@ -664,9 +645,8 @@ def add_repo(
# ---------------- rm-workspace ----------------
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Path]:
"""
Try to find the parent repo for a worktree, assuming it lives in repos_dir
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None:
"""Try to find the parent repo for a worktree, assuming it lives in repos_dir
with the same directory name as the worktree.
"""
candidate = repos_dir / worktree_path.name
@ -675,9 +655,8 @@ def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Pat
return None
def has_unpushed_commits(repo_path: Path, branch: str) -> bool:
"""
Detect if branch has commits not on its upstream.
def has_unpushed_commits(repo_path: Path) -> bool:
"""Detect if branch has commits not on its upstream.
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
"""
# Check if upstream exists
@ -707,8 +686,7 @@ def is_branch_integrated_into_main(
branch: str,
main_ref: str = "origin/main",
) -> bool:
"""
Heuristic: is `branch`'s content already in `main_ref`?
"""Heuristic: is `branch`'s content already in `main_ref`?
Returns True if:
- branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR
@ -760,7 +738,7 @@ def is_branch_integrated_into_main(
@app.command("rm")
def remove_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -781,8 +759,7 @@ def remove_workspace(
help="Ref to consider as the integration target (default: origin/main).",
),
):
"""
Remove a workspace:
"""Remove a workspace:
- For each repo worktree in the workspace:
* Check for dirty work or unpushed commits.
@ -802,7 +779,7 @@ def remove_workspace(
title, _desc = read_workspace_readme(ws_dir)
# Collect worktrees (subdirs that look like git repos, excluding readme.md)
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -823,7 +800,7 @@ def remove_workspace(
raise typer.Exit(0)
# Check for dirty / unpushed changes
problems: List[str] = []
problems: list[str] = []
for wt in worktrees:
status = get_git_status(wt)
branch = get_current_branch(wt) or "?"
@ -837,8 +814,7 @@ def remove_workspace(
problems.append(f"{wt.name}: dirty working tree on '{branch}'")
# Only care about "unpushed commits" if the branch is NOT integrated into main.
if repo is not None and branch != "?" and not integrated:
if has_unpushed_commits(wt, branch):
if repo is not None and branch != "?" and not integrated and has_unpushed_commits(wt):
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
if problems and not force:
console.print(
@ -905,9 +881,8 @@ def remove_workspace(
# ---------------- wip (stage + commit) ----------------
def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
"""
Return a list of (status_code, path) for changes in the repo/worktree.
def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]:
"""Return a list of (status_code, path) for changes in the repo/worktree.
Uses 'git status --porcelain'.
"""
@ -916,20 +891,18 @@ def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}")
return []
changes: List[Tuple[str, str]] = []
changes: list[tuple[str, str]] = []
for line in out.splitlines():
if not line.strip():
continue
# Format: 'XY path'
status = line[:2]
path = line[3:]
changes.append((status, path))
return changes
def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]:
"""
Interactively choose which files to stage for WIP.
def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> list[str]:
"""Interactively choose which files to stage for WIP.
- Show list of changed files.
- Ask user:
@ -968,18 +941,13 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
return []
# iterfzf may return a single string or list depending on version;
# normalize to list of strings.
if isinstance(selected, str):
selected_files = [selected]
else:
selected_files = list(selected)
return selected_files
return [selected] if isinstance(selected, str) else list(selected)
@app.command("wip")
def wip_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -989,8 +957,7 @@ def wip_workspace(
),
),
):
"""
For each repo in the workspace:
"""For each repo in the workspace:
- Show list of changed files.
- Ask whether to stage all, none, or pick some files.
@ -1012,7 +979,7 @@ def wip_workspace(
@app.command("commit")
def commit_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1021,15 +988,14 @@ def commit_workspace(
"If omitted, uses the workspace containing the current directory."
),
),
message: Optional[str] = typer.Option(
message: str | None = typer.Option(
None,
"--message",
"-m",
help="Commit message to use.",
),
):
"""
For each repo in the workspace:
"""For each repo in the workspace:
- Show list of changed files.
- Ask whether to stage all, none, or pick some files.
@ -1047,9 +1013,8 @@ def commit_workspace(
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
)
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1099,7 +1064,7 @@ def commit_workspace(
@app.command("push")
def push_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1115,8 +1080,7 @@ def push_workspace(
help="Remote name to push to (default: origin).",
),
):
"""
For each repo in the workspace, run 'git push <remote> <current-branch>'.
"""For each repo in the workspace, run 'git push <remote> <current-branch>'.
Skips repos with no current branch or when push fails.
"""
@ -1134,7 +1098,7 @@ def push_workspace(
f"(dir: {ws_dir.name}) to remote '{remote}'"
)
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1170,7 +1134,7 @@ def push_workspace(
@app.command("status")
def status_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1180,8 +1144,7 @@ def status_workspace(
),
),
):
"""
Show detailed status for the current (or specified) workspace.
"""Show detailed status for the current (or specified) workspace.
For each repo in the workspace:
- repo directory name
@ -1199,8 +1162,6 @@ def status_workspace(
title, desc = read_workspace_readme(ws_dir)
# if desc:
# console.print(Panel(desc, title=title))
table = Table(
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
@ -1217,7 +1178,7 @@ def status_workspace(
files_table.add_column("File", justify="left")
files_table.add_column("Status", justify="right")
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1254,7 +1215,7 @@ def status_workspace(
@app.command("diff")
def diff_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1270,8 +1231,7 @@ def diff_workspace(
help="Show only staged changes (git diff --cached).",
),
):
"""
Show git diff for all repos in the workspace.
"""Show git diff for all repos in the workspace.
Uses rich Syntax highlighter for diffs.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
@ -1289,7 +1249,7 @@ def diff_workspace(
)
# Collect repos
worktrees: List[Path] = []
worktrees: list[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
@ -1366,7 +1326,7 @@ def run_tmux(
)
def get_tmux_sessions() -> List[str]:
def get_tmux_sessions() -> list[str]:
"""List all tmux sessions."""
result = run_tmux(["list-sessions", "-F", "#{session_name}"])
return result.stdout.decode().splitlines()
@ -1378,7 +1338,7 @@ def session_exists(session_name: str) -> bool:
return result.returncode == 0
def pick_project(base_dir: Path) -> Optional[str]:
def pick_project(base_dir: Path) -> str | None:
"""Use fzf to pick a subdirectory (project) from base_dir."""
if not base_dir.is_dir():
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
@ -1414,8 +1374,7 @@ def create_detached_session(
path_name: Path,
start_mode: bool,
) -> bool:
"""
Create a detached session.
"""Create a detached session.
- If start_mode: just a single window.
- Else: split layout with nvim on top.
@ -1468,8 +1427,7 @@ def create_if_needed_and_attach(
path_name: Path,
start_mode: bool,
) -> bool:
"""
Mimic the bash logic:
"""Mimic the bash logic:
- If not in tmux: `tmux new-session -As ... -c path_name`
- Else:
@ -1483,8 +1441,7 @@ def create_if_needed_and_attach(
return r.returncode == 0
# Inside tmux
if not session_exists(session_name):
if not create_detached_session(session_name, path_name, start_mode):
if not session_exists(session_name) and not create_detached_session(session_name, path_name, start_mode):
return False
r = run_tmux(["switch-client", "-t", session_name])
@ -1536,7 +1493,7 @@ tmux_app = typer.Typer(
@tmux_app.command("attach")
def tmux_cli_attach(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1546,10 +1503,8 @@ def tmux_cli_attach(
),
),
):
"""Attach or create a session for a repo in a workspace.
"""
Attach or create a session for a repo in a workspace.
"""
# ws_dir = get_workspace_dir(workspace)
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir:
@ -1567,8 +1522,7 @@ def tmux_cli_attach(
@tmux_app.command("list-sessions")
def tmux_cli_list_sessions(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1595,8 +1549,7 @@ def tmux_cli_list_sessions(
@tmux_app.command("remove")
def tmux_cli_remove(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",
@ -1624,8 +1577,7 @@ def tmux_cli_remove(
tmux_sessions = [
session for session in get_tmux_sessions() if session.startswith("ω")
]
if not dry_run:
if not Confirm.ask(
if not dry_run and not Confirm.ask(
f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n",
default=False,
):
@ -1638,8 +1590,6 @@ def tmux_cli_remove(
for session_name in tmux_sessions:
remove_session(session_name)
#
# remove_session_if_exists(session_name)
app.add_typer(tmux_app)
@ -1648,7 +1598,7 @@ app.add_typer(tmux_app)
@app.command("attach")
def attach(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
workspace: str | None = typer.Option(
None,
"--workspace",
"-w",