diff --git a/pyproject.toml b/pyproject.toml index bb1c52b..e453d48 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ select = [ "I", # isort "N", # pep8-naming # "D", # pydocstyle - "UP", # pyupgrade +# "UP", # pyupgrade "YTT", # flake8-2020 # "ANN", # flake8-annotations # "S", # flake8-bandit @@ -66,7 +66,7 @@ select = [ "ISC", # flake8-implicit-str-concat "ICN", # flake8-import-conventions "G", # flake8-logging-format - "INP", # flake8-no-pep420 +# "INP", # flake8-no-pep420 "PIE", # flake8-pie "T20", # flake8-print "PYI", # flake8-pyi @@ -74,23 +74,23 @@ select = [ "Q", # flake8-quotes "RSE", # flake8-raise "RET", # flake8-return - "SLF", # flake8-self - "SIM", # flake8-simplify +# "SLF", # flake8-self +# "SIM", # flake8-simplify "TID", # flake8-tidy-imports "TCH", # flake8-type-checking - "INT", # flake8-gettext - "ARG", # flake8-unused-arguments +# "INT", # flake8-gettext +# "ARG", # flake8-unused-arguments "PTH", # flake8-use-pathlib - "ERA", # eradicate +# "ERA", # eradicate "PD", # pandas-vet "PGH", # pygrep-hooks # "PL", # Pylint - "PLC", # Convention +# "PLC", # Convention "PLE", # Error # "PLR", # Refactor "PLW", # Warning - "TRY", # tryceratops +# "TRY", # tryceratops "NPY", # NumPy-specific rules - "RUF", # Ruff-specific rules +# "RUF", # Ruff-specific rules ] diff --git a/workspaces.py b/workspaces.py index 2a332b8..33dcdeb 100755 --- a/workspaces.py +++ b/workspaces.py @@ -18,6 +18,9 @@ import shutil import subprocess from dataclasses import dataclass from pathlib import Path +from typing import List +from typing import Optional +from typing import Tuple import typer from iterfzf import iterfzf @@ -44,7 +47,8 @@ console = Console() class Settings(BaseSettings): - """Global configuration for workspaces. + """ + Global configuration for workspaces. Resolution for workspaces_name: 1. Command-line flag --workspaces-name @@ -70,9 +74,10 @@ class Settings(BaseSettings): @classmethod def from_env_and_override( - cls, override_workspaces_name: str | None - ) -> Settings: - """Construct settings honoring: + cls, override_workspaces_name: Optional[str] + ) -> "Settings": + """ + Construct settings honoring: 1. CLI override 2. WORKSPACES_NAME env 3. default "git" @@ -86,8 +91,9 @@ class Settings(BaseSettings): return s -def resolve_paths(workspaces_name: str | None) -> tuple[Settings, Path, Path]: - """Build Settings and derived paths, honoring CLI override of workspaces_name. +def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]: + """ + Build Settings and derived paths, honoring CLI override of workspaces_name. """ base_settings = Settings.from_env_and_override(workspaces_name) name = base_settings.workspaces_name @@ -109,14 +115,15 @@ class GitStatus: @property def indicator(self) -> str: - """Build ASCII indicator: + """ + Build ASCII indicator: - clean: "·" - ahead 1: "↑1" - behind 2: "↓2" - both ahead/behind: "↑1 ↓2" - add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*" """ - parts: list[str] = [] + parts: List[str] = [] if self.ahead: parts.append(f"↑{self.ahead}") if self.behind: @@ -128,7 +135,8 @@ class GitStatus: def get_git_status(repo_path: Path) -> GitStatus: - """Get ahead/behind and dirty info for a Git repo. + """ + Get ahead/behind and dirty info for a Git repo. Uses `git status --porcelain=v2 --branch` and parses: - '# branch.ab +A -B' for ahead/behind @@ -150,8 +158,7 @@ def get_git_status(repo_path: Path) -> GitStatus: for line in out.splitlines(): if line.startswith("# branch.ab"): - # Example: - # branch.ab +1 -2 + # Example: "# branch.ab +1 -2" m = re.search(r"\+(\d+)\s+-(\d+)", line) if m: ahead = int(m.group(1)) @@ -162,7 +169,7 @@ def get_git_status(repo_path: Path) -> GitStatus: return GitStatus(ahead=ahead, behind=behind, dirty=dirty) -def get_current_branch(repo_path: Path) -> str | None: +def get_current_branch(repo_path: Path) -> Optional[str]: try: out = subprocess.check_output( ["git", "rev-parse", "--abbrev-ref", "HEAD"], @@ -180,7 +187,7 @@ def ensure_git_repo(path: Path) -> bool: return (path / ".git").exists() -def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]: +def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]: proc = subprocess.Popen( cmd, cwd=cwd, @@ -192,8 +199,9 @@ def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]: return proc.returncode, out, err -def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path: - """Resolve the directory of a workspace. +def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path: + """ + Resolve the directory of a workspace. - If workspace_name given: use workspaces_dir / workspace_name - Else: use current working directory (must be inside workspaces_dir). @@ -218,8 +226,9 @@ def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path return workspace_root -def read_workspace_readme(ws_dir: Path) -> tuple[str, str]: - """Return (name_from_h1, description_from_rest_of_file). +def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]: + """ + Return (name_from_h1, description_from_rest_of_file). If file missing or malformed, fallback appropriately. """ readme = ws_dir / "readme.md" @@ -260,7 +269,8 @@ def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None: def slugify_workspace_name(name: str) -> str: - """Turn arbitrary workspace name into a safe directory/worktree name. + """ + Turn arbitrary workspace name into a safe directory/worktree name. - Lowercase - Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later) @@ -283,7 +293,8 @@ def slugify_workspace_name(name: str) -> str: def branch_name_for_workspace(name: str) -> str: - """Compute branch name from workspace name. + """ + Compute branch name from workspace name. Rules: - Start from slugified name (no spaces/specials). @@ -305,7 +316,7 @@ def branch_name_for_workspace(name: str) -> str: @app.callback() def main( ctx: typer.Context, - workspaces_name: str | None = typer.Option( + workspaces_name: Optional[str] = typer.Option( None, "--workspaces-name", "-W", @@ -315,7 +326,8 @@ def main( ), ), ): - """Manage workspaces and associated Git worktrees. + """ + Manage workspaces and associated Git worktrees. If no command is given, this will list workspaces. """ @@ -332,7 +344,7 @@ def main( raise typer.Exit(0) -def get_ctx_paths(ctx: typer.Context) -> tuple[Settings, Path, Path]: +def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]: obj = ctx.obj or {} return obj["settings"], obj["repos_dir"], obj["workspaces_dir"] @@ -350,7 +362,8 @@ def list_workspaces(workspaces_dir: Path): def cli_list_workspaces( ctx: typer.Context, ): - """List all workspaces. + """ + List all workspaces. Shows: - workspace directory name @@ -372,7 +385,7 @@ def cli_list_workspaces( for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()): title, desc = read_workspace_readme(ws) - repos: list[str] = [] + repos: List[str] = [] for child in sorted(p for p in ws.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue @@ -395,20 +408,21 @@ def cli_list_workspaces( @app.command("new", hidden=True) def create_workspace( ctx: typer.Context, - name: str | None = typer.Option( + name: Optional[str] = typer.Option( None, "--name", "-n", help="Name of the new workspace (display name; can contain spaces).", ), - description: str | None = typer.Option( + description: Optional[str] = typer.Option( None, "--description", "-d", help="Description of the workspace. Will be written into readme.md.", ), ): - """Create a new workspace. + """ + Create a new workspace. - Asks for name and description if not provided. - Workspace directory uses a slugified version of the name. @@ -443,7 +457,7 @@ def create_workspace( @app.command("list-repos") def list_repos( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -453,7 +467,8 @@ def list_repos( ), ), ): - """List repos and branches in the current (or specified) workspace. + """ + List repos and branches in the current (or specified) workspace. Shows: - repo directory name @@ -490,8 +505,9 @@ def list_repos( # ---------------- add-repo ---------------- -def list_all_repos(repos_dir: Path) -> list[Path]: - """List all directories in repos_dir that appear to be git repos. +def list_all_repos(repos_dir: Path) -> List[Path]: + """ + List all directories in repos_dir that appear to be git repos. """ if not repos_dir.exists(): return [] @@ -502,8 +518,9 @@ def list_all_repos(repos_dir: Path) -> list[Path]: return repos -def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None: - """Use iterfzf (Python library) to pick a repo from a list of paths. +def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]: + """ + Use iterfzf (Python library) to pick a repo from a list of paths. """ if not repos: return None @@ -520,8 +537,9 @@ def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None: return None -def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None: - """Use iterfzf (Python library) to pick a workspace from a list of paths. +def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]: + """ + Use iterfzf (Python library) to pick a workspace from a list of paths. """ names = [w.name for w in workspaces] choice = iterfzf(names, prompt="pick a workspace> ") @@ -539,7 +557,7 @@ def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None: @app.command("add", hidden=True) def add_repo( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -548,7 +566,7 @@ def add_repo( "If omitted, uses the workspace containing the current directory." ), ), - repo_name: str | None = typer.Option( + repo_name: Optional[str] = typer.Option( None, "--repo", "-r", @@ -558,7 +576,8 @@ def add_repo( ), ), ): - """Add a repo to a workspace. + """ + Add a repo to a workspace. - Lists all directories in repos_dir as repos. - Uses iterfzf to pick repo if --repo not given. @@ -582,7 +601,7 @@ def add_repo( console.print(f"[red]No git repos found in {repos_dir}[/red]") raise typer.Exit(1) - repo_path: Path | None = None + repo_path: Optional[Path] = None if repo_name: for r in all_repos: if r.name == repo_name: @@ -645,8 +664,9 @@ def add_repo( # ---------------- rm-workspace ---------------- -def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None: - """Try to find the parent repo for a worktree, assuming it lives in repos_dir +def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Path]: + """ + Try to find the parent repo for a worktree, assuming it lives in repos_dir with the same directory name as the worktree. """ candidate = repos_dir / worktree_path.name @@ -655,8 +675,9 @@ def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None: return None -def has_unpushed_commits(repo_path: Path) -> bool: - """Detect if branch has commits not on its upstream. +def has_unpushed_commits(repo_path: Path, branch: str) -> bool: + """ + Detect if branch has commits not on its upstream. Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits. """ # Check if upstream exists @@ -686,7 +707,8 @@ def is_branch_integrated_into_main( branch: str, main_ref: str = "origin/main", ) -> bool: - """Heuristic: is `branch`'s content already in `main_ref`? + """ + Heuristic: is `branch`'s content already in `main_ref`? Returns True if: - branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR @@ -738,7 +760,7 @@ def is_branch_integrated_into_main( @app.command("rm") def remove_workspace( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -759,7 +781,8 @@ def remove_workspace( help="Ref to consider as the integration target (default: origin/main).", ), ): - """Remove a workspace: + """ + Remove a workspace: - For each repo worktree in the workspace: * Check for dirty work or unpushed commits. @@ -779,7 +802,7 @@ def remove_workspace( title, _desc = read_workspace_readme(ws_dir) # Collect worktrees (subdirs that look like git repos, excluding readme.md) - worktrees: list[Path] = [] + worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue @@ -800,7 +823,7 @@ def remove_workspace( raise typer.Exit(0) # Check for dirty / unpushed changes - problems: list[str] = [] + problems: List[str] = [] for wt in worktrees: status = get_git_status(wt) branch = get_current_branch(wt) or "?" @@ -814,8 +837,9 @@ def remove_workspace( problems.append(f"{wt.name}: dirty working tree on '{branch}'") # Only care about "unpushed commits" if the branch is NOT integrated into main. - if repo is not None and branch != "?" and not integrated and has_unpushed_commits(wt): - problems.append(f"{wt.name}: unpushed commits on '{branch}'") + if repo is not None and branch != "?" and not integrated: + if has_unpushed_commits(wt, branch): + problems.append(f"{wt.name}: unpushed commits on '{branch}'") if problems and not force: console.print( "[red]Refusing to remove workspace; found dirty or unpushed work:[/red]" @@ -881,8 +905,9 @@ def remove_workspace( # ---------------- wip (stage + commit) ---------------- -def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]: - """Return a list of (status_code, path) for changes in the repo/worktree. +def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]: + """ + Return a list of (status_code, path) for changes in the repo/worktree. Uses 'git status --porcelain'. """ @@ -891,18 +916,20 @@ def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]: console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}") return [] - changes: list[tuple[str, str]] = [] + changes: List[Tuple[str, str]] = [] for line in out.splitlines(): if not line.strip(): continue + # Format: 'XY path' status = line[:2] path = line[3:] changes.append((status, path)) return changes -def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> list[str]: - """Interactively choose which files to stage for WIP. +def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]: + """ + Interactively choose which files to stage for WIP. - Show list of changed files. - Ask user: @@ -941,13 +968,18 @@ def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> lis return [] # iterfzf may return a single string or list depending on version; # normalize to list of strings. - return [selected] if isinstance(selected, str) else list(selected) + if isinstance(selected, str): + selected_files = [selected] + else: + selected_files = list(selected) + + return selected_files @app.command("wip") def wip_workspace( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -957,7 +989,8 @@ def wip_workspace( ), ), ): - """For each repo in the workspace: + """ + For each repo in the workspace: - Show list of changed files. - Ask whether to stage all, none, or pick some files. @@ -979,7 +1012,7 @@ def wip_workspace( @app.command("commit") def commit_workspace( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -988,14 +1021,15 @@ def commit_workspace( "If omitted, uses the workspace containing the current directory." ), ), - message: str | None = typer.Option( + message: Optional[str] = typer.Option( None, "--message", "-m", help="Commit message to use.", ), ): - """For each repo in the workspace: + """ + For each repo in the workspace: - Show list of changed files. - Ask whether to stage all, none, or pick some files. @@ -1013,8 +1047,9 @@ def commit_workspace( f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]" ) raise typer.Exit(1) + title, _desc = read_workspace_readme(ws_dir) - worktrees: list[Path] = [] + worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue @@ -1064,7 +1099,7 @@ def commit_workspace( @app.command("push") def push_workspace( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -1080,7 +1115,8 @@ def push_workspace( help="Remote name to push to (default: origin).", ), ): - """For each repo in the workspace, run 'git push '. + """ + For each repo in the workspace, run 'git push '. Skips repos with no current branch or when push fails. """ @@ -1098,7 +1134,7 @@ def push_workspace( f"(dir: {ws_dir.name}) to remote '{remote}'" ) - worktrees: list[Path] = [] + worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue @@ -1134,7 +1170,7 @@ def push_workspace( @app.command("status") def status_workspace( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -1144,7 +1180,8 @@ def status_workspace( ), ), ): - """Show detailed status for the current (or specified) workspace. + """ + Show detailed status for the current (or specified) workspace. For each repo in the workspace: - repo directory name @@ -1162,6 +1199,8 @@ def status_workspace( title, desc = read_workspace_readme(ws_dir) + # if desc: + # console.print(Panel(desc, title=title)) table = Table( title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", @@ -1178,7 +1217,7 @@ def status_workspace( files_table.add_column("File", justify="left") files_table.add_column("Status", justify="right") - worktrees: list[Path] = [] + worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue @@ -1215,7 +1254,7 @@ def status_workspace( @app.command("diff") def diff_workspace( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -1231,7 +1270,8 @@ def diff_workspace( help="Show only staged changes (git diff --cached).", ), ): - """Show git diff for all repos in the workspace. + """ + Show git diff for all repos in the workspace. Uses rich Syntax highlighter for diffs. """ _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) @@ -1249,7 +1289,7 @@ def diff_workspace( ) # Collect repos - worktrees: list[Path] = [] + worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": continue @@ -1326,7 +1366,7 @@ def run_tmux( ) -def get_tmux_sessions() -> list[str]: +def get_tmux_sessions() -> List[str]: """List all tmux sessions.""" result = run_tmux(["list-sessions", "-F", "#{session_name}"]) return result.stdout.decode().splitlines() @@ -1338,7 +1378,7 @@ def session_exists(session_name: str) -> bool: return result.returncode == 0 -def pick_project(base_dir: Path) -> str | None: +def pick_project(base_dir: Path) -> Optional[str]: """Use fzf to pick a subdirectory (project) from base_dir.""" if not base_dir.is_dir(): typer.echo(f"[ta] {base_dir} is not a directory.", err=True) @@ -1374,7 +1414,8 @@ def create_detached_session( path_name: Path, start_mode: bool, ) -> bool: - """Create a detached session. + """ + Create a detached session. - If start_mode: just a single window. - Else: split layout with nvim on top. @@ -1427,7 +1468,8 @@ def create_if_needed_and_attach( path_name: Path, start_mode: bool, ) -> bool: - """Mimic the bash logic: + """ + Mimic the bash logic: - If not in tmux: `tmux new-session -As ... -c path_name` - Else: @@ -1441,8 +1483,9 @@ def create_if_needed_and_attach( return r.returncode == 0 # Inside tmux - if not session_exists(session_name) and not create_detached_session(session_name, path_name, start_mode): - return False + if not session_exists(session_name): + if not create_detached_session(session_name, path_name, start_mode): + return False r = run_tmux(["switch-client", "-t", session_name]) return r.returncode == 0 @@ -1493,7 +1536,7 @@ tmux_app = typer.Typer( @tmux_app.command("attach") def tmux_cli_attach( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -1503,8 +1546,10 @@ def tmux_cli_attach( ), ), ): - """Attach or create a session for a repo in a workspace. """ + Attach or create a session for a repo in a workspace. + """ + # ws_dir = get_workspace_dir(workspace) _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) ws_dir = find_workspace_dir(workspaces_dir, workspace) if not ws_dir: @@ -1522,7 +1567,8 @@ def tmux_cli_attach( @tmux_app.command("list-sessions") def tmux_cli_list_sessions( - workspace: str | None = typer.Option( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -1549,7 +1595,8 @@ def tmux_cli_list_sessions( @tmux_app.command("remove") def tmux_cli_remove( - workspace: str | None = typer.Option( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( None, "--workspace", "-w", @@ -1577,11 +1624,12 @@ def tmux_cli_remove( tmux_sessions = [ session for session in get_tmux_sessions() if session.startswith("ω") ] - if not dry_run and not Confirm.ask( + if not dry_run: + if not Confirm.ask( f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n", default=False, ): - raise typer.Exit(1) + raise typer.Exit(1) if dry_run: console.print(tmux_sessions) @@ -1590,6 +1638,8 @@ def tmux_cli_remove( for session_name in tmux_sessions: remove_session(session_name) + # + # remove_session_if_exists(session_name) app.add_typer(tmux_app) @@ -1598,7 +1648,7 @@ app.add_typer(tmux_app) @app.command("attach") def attach( ctx: typer.Context, - workspace: str | None = typer.Option( + workspace: Optional[str] = typer.Option( None, "--workspace", "-w",