Compare commits
5 commits
7ec81ffdc7
...
7bb9b49e9b
| Author | SHA1 | Date | |
|---|---|---|---|
| 7bb9b49e9b | |||
| a0d308c19f | |||
| f51701818a | |||
| 7b1a33414a | |||
| 0bdfb253ef |
2 changed files with 94 additions and 144 deletions
|
|
@ -48,7 +48,7 @@ select = [
|
|||
"I", # isort
|
||||
"N", # pep8-naming
|
||||
# "D", # pydocstyle
|
||||
# "UP", # pyupgrade
|
||||
"UP", # pyupgrade
|
||||
"YTT", # flake8-2020
|
||||
# "ANN", # flake8-annotations
|
||||
# "S", # flake8-bandit
|
||||
|
|
@ -66,7 +66,7 @@ select = [
|
|||
"ISC", # flake8-implicit-str-concat
|
||||
"ICN", # flake8-import-conventions
|
||||
"G", # flake8-logging-format
|
||||
# "INP", # flake8-no-pep420
|
||||
"INP", # flake8-no-pep420
|
||||
"PIE", # flake8-pie
|
||||
"T20", # flake8-print
|
||||
"PYI", # flake8-pyi
|
||||
|
|
@ -74,23 +74,23 @@ select = [
|
|||
"Q", # flake8-quotes
|
||||
"RSE", # flake8-raise
|
||||
"RET", # flake8-return
|
||||
# "SLF", # flake8-self
|
||||
# "SIM", # flake8-simplify
|
||||
"SLF", # flake8-self
|
||||
"SIM", # flake8-simplify
|
||||
"TID", # flake8-tidy-imports
|
||||
"TCH", # flake8-type-checking
|
||||
# "INT", # flake8-gettext
|
||||
# "ARG", # flake8-unused-arguments
|
||||
"INT", # flake8-gettext
|
||||
"ARG", # flake8-unused-arguments
|
||||
"PTH", # flake8-use-pathlib
|
||||
# "ERA", # eradicate
|
||||
"ERA", # eradicate
|
||||
"PD", # pandas-vet
|
||||
"PGH", # pygrep-hooks
|
||||
# "PL", # Pylint
|
||||
# "PLC", # Convention
|
||||
"PLC", # Convention
|
||||
"PLE", # Error
|
||||
# "PLR", # Refactor
|
||||
"PLW", # Warning
|
||||
# "TRY", # tryceratops
|
||||
"TRY", # tryceratops
|
||||
"NPY", # NumPy-specific rules
|
||||
# "RUF", # Ruff-specific rules
|
||||
"RUF", # Ruff-specific rules
|
||||
]
|
||||
|
||||
|
|
|
|||
218
workspaces.py
218
workspaces.py
|
|
@ -18,9 +18,6 @@ import shutil
|
|||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import typer
|
||||
from iterfzf import iterfzf
|
||||
|
|
@ -47,8 +44,7 @@ console = Console()
|
|||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""
|
||||
Global configuration for workspaces.
|
||||
"""Global configuration for workspaces.
|
||||
|
||||
Resolution for workspaces_name:
|
||||
1. Command-line flag --workspaces-name
|
||||
|
|
@ -74,10 +70,9 @@ class Settings(BaseSettings):
|
|||
|
||||
@classmethod
|
||||
def from_env_and_override(
|
||||
cls, override_workspaces_name: Optional[str]
|
||||
) -> "Settings":
|
||||
"""
|
||||
Construct settings honoring:
|
||||
cls, override_workspaces_name: str | None
|
||||
) -> Settings:
|
||||
"""Construct settings honoring:
|
||||
1. CLI override
|
||||
2. WORKSPACES_NAME env
|
||||
3. default "git"
|
||||
|
|
@ -91,9 +86,8 @@ class Settings(BaseSettings):
|
|||
return s
|
||||
|
||||
|
||||
def resolve_paths(workspaces_name: Optional[str]) -> Tuple[Settings, Path, Path]:
|
||||
"""
|
||||
Build Settings and derived paths, honoring CLI override of workspaces_name.
|
||||
def resolve_paths(workspaces_name: str | None) -> tuple[Settings, Path, Path]:
|
||||
"""Build Settings and derived paths, honoring CLI override of workspaces_name.
|
||||
"""
|
||||
base_settings = Settings.from_env_and_override(workspaces_name)
|
||||
name = base_settings.workspaces_name
|
||||
|
|
@ -115,15 +109,14 @@ class GitStatus:
|
|||
|
||||
@property
|
||||
def indicator(self) -> str:
|
||||
"""
|
||||
Build ASCII indicator:
|
||||
"""Build ASCII indicator:
|
||||
- clean: "·"
|
||||
- ahead 1: "↑1"
|
||||
- behind 2: "↓2"
|
||||
- both ahead/behind: "↑1 ↓2"
|
||||
- add '*' when dirty, e.g. "↑1*" or "↑1 ↓2*"
|
||||
"""
|
||||
parts: List[str] = []
|
||||
parts: list[str] = []
|
||||
if self.ahead:
|
||||
parts.append(f"↑{self.ahead}")
|
||||
if self.behind:
|
||||
|
|
@ -135,8 +128,7 @@ class GitStatus:
|
|||
|
||||
|
||||
def get_git_status(repo_path: Path) -> GitStatus:
|
||||
"""
|
||||
Get ahead/behind and dirty info for a Git repo.
|
||||
"""Get ahead/behind and dirty info for a Git repo.
|
||||
|
||||
Uses `git status --porcelain=v2 --branch` and parses:
|
||||
- '# branch.ab +A -B' for ahead/behind
|
||||
|
|
@ -158,7 +150,8 @@ def get_git_status(repo_path: Path) -> GitStatus:
|
|||
|
||||
for line in out.splitlines():
|
||||
if line.startswith("# branch.ab"):
|
||||
# Example: "# branch.ab +1 -2"
|
||||
# Example:
|
||||
# branch.ab +1 -2
|
||||
m = re.search(r"\+(\d+)\s+-(\d+)", line)
|
||||
if m:
|
||||
ahead = int(m.group(1))
|
||||
|
|
@ -169,7 +162,7 @@ def get_git_status(repo_path: Path) -> GitStatus:
|
|||
return GitStatus(ahead=ahead, behind=behind, dirty=dirty)
|
||||
|
||||
|
||||
def get_current_branch(repo_path: Path) -> Optional[str]:
|
||||
def get_current_branch(repo_path: Path) -> str | None:
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
||||
|
|
@ -187,7 +180,7 @@ def ensure_git_repo(path: Path) -> bool:
|
|||
return (path / ".git").exists()
|
||||
|
||||
|
||||
def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
|
||||
def run_cmd(cmd: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
|
|
@ -199,9 +192,8 @@ def run_cmd(cmd: List[str], cwd: Optional[Path] = None) -> Tuple[int, str, str]:
|
|||
return proc.returncode, out, err
|
||||
|
||||
|
||||
def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> Path:
|
||||
"""
|
||||
Resolve the directory of a workspace.
|
||||
def find_workspace_dir(workspaces_dir: Path, workspace_name: str | None) -> Path:
|
||||
"""Resolve the directory of a workspace.
|
||||
|
||||
- If workspace_name given: use workspaces_dir / workspace_name
|
||||
- Else: use current working directory (must be inside workspaces_dir).
|
||||
|
|
@ -226,9 +218,8 @@ def find_workspace_dir(workspaces_dir: Path, workspace_name: Optional[str]) -> P
|
|||
return workspace_root
|
||||
|
||||
|
||||
def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
|
||||
"""
|
||||
Return (name_from_h1, description_from_rest_of_file).
|
||||
def read_workspace_readme(ws_dir: Path) -> tuple[str, str]:
|
||||
"""Return (name_from_h1, description_from_rest_of_file).
|
||||
If file missing or malformed, fallback appropriately.
|
||||
"""
|
||||
readme = ws_dir / "readme.md"
|
||||
|
|
@ -269,8 +260,7 @@ def write_workspace_readme(ws_dir: Path, name: str, description: str) -> None:
|
|||
|
||||
|
||||
def slugify_workspace_name(name: str) -> str:
|
||||
"""
|
||||
Turn arbitrary workspace name into a safe directory/worktree name.
|
||||
"""Turn arbitrary workspace name into a safe directory/worktree name.
|
||||
|
||||
- Lowercase
|
||||
- Replace ':' with '-' FIRST (special case so 'fix:my issue' -> 'fix/my-issue' branch later)
|
||||
|
|
@ -293,8 +283,7 @@ def slugify_workspace_name(name: str) -> str:
|
|||
|
||||
|
||||
def branch_name_for_workspace(name: str) -> str:
|
||||
"""
|
||||
Compute branch name from workspace name.
|
||||
"""Compute branch name from workspace name.
|
||||
|
||||
Rules:
|
||||
- Start from slugified name (no spaces/specials).
|
||||
|
|
@ -316,7 +305,7 @@ def branch_name_for_workspace(name: str) -> str:
|
|||
@app.callback()
|
||||
def main(
|
||||
ctx: typer.Context,
|
||||
workspaces_name: Optional[str] = typer.Option(
|
||||
workspaces_name: str | None = typer.Option(
|
||||
None,
|
||||
"--workspaces-name",
|
||||
"-W",
|
||||
|
|
@ -326,8 +315,7 @@ def main(
|
|||
),
|
||||
),
|
||||
):
|
||||
"""
|
||||
Manage workspaces and associated Git worktrees.
|
||||
"""Manage workspaces and associated Git worktrees.
|
||||
|
||||
If no command is given, this will list workspaces.
|
||||
"""
|
||||
|
|
@ -344,7 +332,7 @@ def main(
|
|||
raise typer.Exit(0)
|
||||
|
||||
|
||||
def get_ctx_paths(ctx: typer.Context) -> Tuple[Settings, Path, Path]:
|
||||
def get_ctx_paths(ctx: typer.Context) -> tuple[Settings, Path, Path]:
|
||||
obj = ctx.obj or {}
|
||||
return obj["settings"], obj["repos_dir"], obj["workspaces_dir"]
|
||||
|
||||
|
|
@ -362,8 +350,7 @@ def list_workspaces(workspaces_dir: Path):
|
|||
def cli_list_workspaces(
|
||||
ctx: typer.Context,
|
||||
):
|
||||
"""
|
||||
List all workspaces.
|
||||
"""List all workspaces.
|
||||
|
||||
Shows:
|
||||
- workspace directory name
|
||||
|
|
@ -385,7 +372,7 @@ def cli_list_workspaces(
|
|||
|
||||
for ws in sorted(p for p in workspaces_dir.iterdir() if p.is_dir()):
|
||||
title, desc = read_workspace_readme(ws)
|
||||
repos: List[str] = []
|
||||
repos: list[str] = []
|
||||
for child in sorted(p for p in ws.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
|
|
@ -408,21 +395,20 @@ def cli_list_workspaces(
|
|||
@app.command("new", hidden=True)
|
||||
def create_workspace(
|
||||
ctx: typer.Context,
|
||||
name: Optional[str] = typer.Option(
|
||||
name: str | None = typer.Option(
|
||||
None,
|
||||
"--name",
|
||||
"-n",
|
||||
help="Name of the new workspace (display name; can contain spaces).",
|
||||
),
|
||||
description: Optional[str] = typer.Option(
|
||||
description: str | None = typer.Option(
|
||||
None,
|
||||
"--description",
|
||||
"-d",
|
||||
help="Description of the workspace. Will be written into readme.md.",
|
||||
),
|
||||
):
|
||||
"""
|
||||
Create a new workspace.
|
||||
"""Create a new workspace.
|
||||
|
||||
- Asks for name and description if not provided.
|
||||
- Workspace directory uses a slugified version of the name.
|
||||
|
|
@ -457,7 +443,7 @@ def create_workspace(
|
|||
@app.command("list-repos")
|
||||
def list_repos(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -467,8 +453,7 @@ def list_repos(
|
|||
),
|
||||
),
|
||||
):
|
||||
"""
|
||||
List repos and branches in the current (or specified) workspace.
|
||||
"""List repos and branches in the current (or specified) workspace.
|
||||
|
||||
Shows:
|
||||
- repo directory name
|
||||
|
|
@ -505,9 +490,8 @@ def list_repos(
|
|||
# ---------------- add-repo ----------------
|
||||
|
||||
|
||||
def list_all_repos(repos_dir: Path) -> List[Path]:
|
||||
"""
|
||||
List all directories in repos_dir that appear to be git repos.
|
||||
def list_all_repos(repos_dir: Path) -> list[Path]:
|
||||
"""List all directories in repos_dir that appear to be git repos.
|
||||
"""
|
||||
if not repos_dir.exists():
|
||||
return []
|
||||
|
|
@ -518,9 +502,8 @@ def list_all_repos(repos_dir: Path) -> List[Path]:
|
|||
return repos
|
||||
|
||||
|
||||
def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
|
||||
"""
|
||||
Use iterfzf (Python library) to pick a repo from a list of paths.
|
||||
def pick_repo_with_iterfzf(repos: list[Path]) -> Path | None:
|
||||
"""Use iterfzf (Python library) to pick a repo from a list of paths.
|
||||
"""
|
||||
if not repos:
|
||||
return None
|
||||
|
|
@ -537,9 +520,8 @@ def pick_repo_with_iterfzf(repos: List[Path]) -> Optional[Path]:
|
|||
return None
|
||||
|
||||
|
||||
def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
|
||||
"""
|
||||
Use iterfzf (Python library) to pick a workspace from a list of paths.
|
||||
def pick_workspace_with_iterfzf(workspaces: list[Path]) -> Path | None:
|
||||
"""Use iterfzf (Python library) to pick a workspace from a list of paths.
|
||||
"""
|
||||
names = [w.name for w in workspaces]
|
||||
choice = iterfzf(names, prompt="pick a workspace> ")
|
||||
|
|
@ -557,7 +539,7 @@ def pick_workspace_with_iterfzf(workspaces: List[Path]) -> Optional[Path]:
|
|||
@app.command("add", hidden=True)
|
||||
def add_repo(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -566,7 +548,7 @@ def add_repo(
|
|||
"If omitted, uses the workspace containing the current directory."
|
||||
),
|
||||
),
|
||||
repo_name: Optional[str] = typer.Option(
|
||||
repo_name: str | None = typer.Option(
|
||||
None,
|
||||
"--repo",
|
||||
"-r",
|
||||
|
|
@ -576,8 +558,7 @@ def add_repo(
|
|||
),
|
||||
),
|
||||
):
|
||||
"""
|
||||
Add a repo to a workspace.
|
||||
"""Add a repo to a workspace.
|
||||
|
||||
- Lists all directories in repos_dir as repos.
|
||||
- Uses iterfzf to pick repo if --repo not given.
|
||||
|
|
@ -601,7 +582,7 @@ def add_repo(
|
|||
console.print(f"[red]No git repos found in {repos_dir}[/red]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
repo_path: Optional[Path] = None
|
||||
repo_path: Path | None = None
|
||||
if repo_name:
|
||||
for r in all_repos:
|
||||
if r.name == repo_name:
|
||||
|
|
@ -664,9 +645,8 @@ def add_repo(
|
|||
# ---------------- rm-workspace ----------------
|
||||
|
||||
|
||||
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Path]:
|
||||
"""
|
||||
Try to find the parent repo for a worktree, assuming it lives in repos_dir
|
||||
def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Path | None:
|
||||
"""Try to find the parent repo for a worktree, assuming it lives in repos_dir
|
||||
with the same directory name as the worktree.
|
||||
"""
|
||||
candidate = repos_dir / worktree_path.name
|
||||
|
|
@ -675,9 +655,8 @@ def find_repo_for_worktree(worktree_path: Path, repos_dir: Path) -> Optional[Pat
|
|||
return None
|
||||
|
||||
|
||||
def has_unpushed_commits(repo_path: Path, branch: str) -> bool:
|
||||
"""
|
||||
Detect if branch has commits not on its upstream.
|
||||
def has_unpushed_commits(repo_path: Path) -> bool:
|
||||
"""Detect if branch has commits not on its upstream.
|
||||
Uses 'git rev-list @{u}..HEAD'; if non-empty, there are unpushed commits.
|
||||
"""
|
||||
# Check if upstream exists
|
||||
|
|
@ -707,8 +686,7 @@ def is_branch_integrated_into_main(
|
|||
branch: str,
|
||||
main_ref: str = "origin/main",
|
||||
) -> bool:
|
||||
"""
|
||||
Heuristic: is `branch`'s content already in `main_ref`?
|
||||
"""Heuristic: is `branch`'s content already in `main_ref`?
|
||||
|
||||
Returns True if:
|
||||
- branch tip is an ancestor of main_ref (normal merge / FF / rebase-before-merge), OR
|
||||
|
|
@ -760,7 +738,7 @@ def is_branch_integrated_into_main(
|
|||
@app.command("rm")
|
||||
def remove_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -781,8 +759,7 @@ def remove_workspace(
|
|||
help="Ref to consider as the integration target (default: origin/main).",
|
||||
),
|
||||
):
|
||||
"""
|
||||
Remove a workspace:
|
||||
"""Remove a workspace:
|
||||
|
||||
- For each repo worktree in the workspace:
|
||||
* Check for dirty work or unpushed commits.
|
||||
|
|
@ -802,7 +779,7 @@ def remove_workspace(
|
|||
title, _desc = read_workspace_readme(ws_dir)
|
||||
|
||||
# Collect worktrees (subdirs that look like git repos, excluding readme.md)
|
||||
worktrees: List[Path] = []
|
||||
worktrees: list[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
|
|
@ -823,7 +800,7 @@ def remove_workspace(
|
|||
raise typer.Exit(0)
|
||||
|
||||
# Check for dirty / unpushed changes
|
||||
problems: List[str] = []
|
||||
problems: list[str] = []
|
||||
for wt in worktrees:
|
||||
status = get_git_status(wt)
|
||||
branch = get_current_branch(wt) or "?"
|
||||
|
|
@ -837,9 +814,8 @@ def remove_workspace(
|
|||
problems.append(f"{wt.name}: dirty working tree on '{branch}'")
|
||||
|
||||
# Only care about "unpushed commits" if the branch is NOT integrated into main.
|
||||
if repo is not None and branch != "?" and not integrated:
|
||||
if has_unpushed_commits(wt, branch):
|
||||
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
|
||||
if repo is not None and branch != "?" and not integrated and has_unpushed_commits(wt):
|
||||
problems.append(f"{wt.name}: unpushed commits on '{branch}'")
|
||||
if problems and not force:
|
||||
console.print(
|
||||
"[red]Refusing to remove workspace; found dirty or unpushed work:[/red]"
|
||||
|
|
@ -905,9 +881,8 @@ def remove_workspace(
|
|||
# ---------------- wip (stage + commit) ----------------
|
||||
|
||||
|
||||
def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
Return a list of (status_code, path) for changes in the repo/worktree.
|
||||
def git_status_porcelain(repo_path: Path) -> list[tuple[str, str]]:
|
||||
"""Return a list of (status_code, path) for changes in the repo/worktree.
|
||||
|
||||
Uses 'git status --porcelain'.
|
||||
"""
|
||||
|
|
@ -916,20 +891,18 @@ def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
|
|||
console.print(f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}")
|
||||
return []
|
||||
|
||||
changes: List[Tuple[str, str]] = []
|
||||
changes: list[tuple[str, str]] = []
|
||||
for line in out.splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
# Format: 'XY path'
|
||||
status = line[:2]
|
||||
path = line[3:]
|
||||
changes.append((status, path))
|
||||
return changes
|
||||
|
||||
|
||||
def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]:
|
||||
"""
|
||||
Interactively choose which files to stage for WIP.
|
||||
def choose_files_for_wip(repo_path: Path, changes: list[tuple[str, str]]) -> list[str]:
|
||||
"""Interactively choose which files to stage for WIP.
|
||||
|
||||
- Show list of changed files.
|
||||
- Ask user:
|
||||
|
|
@ -968,18 +941,13 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
|
|||
return []
|
||||
# iterfzf may return a single string or list depending on version;
|
||||
# normalize to list of strings.
|
||||
if isinstance(selected, str):
|
||||
selected_files = [selected]
|
||||
else:
|
||||
selected_files = list(selected)
|
||||
|
||||
return selected_files
|
||||
return [selected] if isinstance(selected, str) else list(selected)
|
||||
|
||||
|
||||
@app.command("wip")
|
||||
def wip_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -989,8 +957,7 @@ def wip_workspace(
|
|||
),
|
||||
),
|
||||
):
|
||||
"""
|
||||
For each repo in the workspace:
|
||||
"""For each repo in the workspace:
|
||||
|
||||
- Show list of changed files.
|
||||
- Ask whether to stage all, none, or pick some files.
|
||||
|
|
@ -1012,7 +979,7 @@ def wip_workspace(
|
|||
@app.command("commit")
|
||||
def commit_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -1021,15 +988,14 @@ def commit_workspace(
|
|||
"If omitted, uses the workspace containing the current directory."
|
||||
),
|
||||
),
|
||||
message: Optional[str] = typer.Option(
|
||||
message: str | None = typer.Option(
|
||||
None,
|
||||
"--message",
|
||||
"-m",
|
||||
help="Commit message to use.",
|
||||
),
|
||||
):
|
||||
"""
|
||||
For each repo in the workspace:
|
||||
"""For each repo in the workspace:
|
||||
|
||||
- Show list of changed files.
|
||||
- Ask whether to stage all, none, or pick some files.
|
||||
|
|
@ -1047,9 +1013,8 @@ def commit_workspace(
|
|||
f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]"
|
||||
)
|
||||
raise typer.Exit(1)
|
||||
title, _desc = read_workspace_readme(ws_dir)
|
||||
|
||||
worktrees: List[Path] = []
|
||||
worktrees: list[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
|
|
@ -1099,7 +1064,7 @@ def commit_workspace(
|
|||
@app.command("push")
|
||||
def push_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -1115,8 +1080,7 @@ def push_workspace(
|
|||
help="Remote name to push to (default: origin).",
|
||||
),
|
||||
):
|
||||
"""
|
||||
For each repo in the workspace, run 'git push <remote> <current-branch>'.
|
||||
"""For each repo in the workspace, run 'git push <remote> <current-branch>'.
|
||||
|
||||
Skips repos with no current branch or when push fails.
|
||||
"""
|
||||
|
|
@ -1134,7 +1098,7 @@ def push_workspace(
|
|||
f"(dir: {ws_dir.name}) to remote '{remote}'"
|
||||
)
|
||||
|
||||
worktrees: List[Path] = []
|
||||
worktrees: list[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
|
|
@ -1170,7 +1134,7 @@ def push_workspace(
|
|||
@app.command("status")
|
||||
def status_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -1180,8 +1144,7 @@ def status_workspace(
|
|||
),
|
||||
),
|
||||
):
|
||||
"""
|
||||
Show detailed status for the current (or specified) workspace.
|
||||
"""Show detailed status for the current (or specified) workspace.
|
||||
|
||||
For each repo in the workspace:
|
||||
- repo directory name
|
||||
|
|
@ -1199,8 +1162,6 @@ def status_workspace(
|
|||
|
||||
title, desc = read_workspace_readme(ws_dir)
|
||||
|
||||
# if desc:
|
||||
# console.print(Panel(desc, title=title))
|
||||
|
||||
table = Table(
|
||||
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
|
||||
|
|
@ -1217,7 +1178,7 @@ def status_workspace(
|
|||
files_table.add_column("File", justify="left")
|
||||
files_table.add_column("Status", justify="right")
|
||||
|
||||
worktrees: List[Path] = []
|
||||
worktrees: list[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
|
|
@ -1254,7 +1215,7 @@ def status_workspace(
|
|||
@app.command("diff")
|
||||
def diff_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -1270,8 +1231,7 @@ def diff_workspace(
|
|||
help="Show only staged changes (git diff --cached).",
|
||||
),
|
||||
):
|
||||
"""
|
||||
Show git diff for all repos in the workspace.
|
||||
"""Show git diff for all repos in the workspace.
|
||||
Uses rich Syntax highlighter for diffs.
|
||||
"""
|
||||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||
|
|
@ -1289,7 +1249,7 @@ def diff_workspace(
|
|||
)
|
||||
|
||||
# Collect repos
|
||||
worktrees: List[Path] = []
|
||||
worktrees: list[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
|
|
@ -1366,7 +1326,7 @@ def run_tmux(
|
|||
)
|
||||
|
||||
|
||||
def get_tmux_sessions() -> List[str]:
|
||||
def get_tmux_sessions() -> list[str]:
|
||||
"""List all tmux sessions."""
|
||||
result = run_tmux(["list-sessions", "-F", "#{session_name}"])
|
||||
return result.stdout.decode().splitlines()
|
||||
|
|
@ -1378,7 +1338,7 @@ def session_exists(session_name: str) -> bool:
|
|||
return result.returncode == 0
|
||||
|
||||
|
||||
def pick_project(base_dir: Path) -> Optional[str]:
|
||||
def pick_project(base_dir: Path) -> str | None:
|
||||
"""Use fzf to pick a subdirectory (project) from base_dir."""
|
||||
if not base_dir.is_dir():
|
||||
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
|
||||
|
|
@ -1414,8 +1374,7 @@ def create_detached_session(
|
|||
path_name: Path,
|
||||
start_mode: bool,
|
||||
) -> bool:
|
||||
"""
|
||||
Create a detached session.
|
||||
"""Create a detached session.
|
||||
|
||||
- If start_mode: just a single window.
|
||||
- Else: split layout with nvim on top.
|
||||
|
|
@ -1468,8 +1427,7 @@ def create_if_needed_and_attach(
|
|||
path_name: Path,
|
||||
start_mode: bool,
|
||||
) -> bool:
|
||||
"""
|
||||
Mimic the bash logic:
|
||||
"""Mimic the bash logic:
|
||||
|
||||
- If not in tmux: `tmux new-session -As ... -c path_name`
|
||||
- Else:
|
||||
|
|
@ -1483,9 +1441,8 @@ def create_if_needed_and_attach(
|
|||
return r.returncode == 0
|
||||
|
||||
# Inside tmux
|
||||
if not session_exists(session_name):
|
||||
if not create_detached_session(session_name, path_name, start_mode):
|
||||
return False
|
||||
if not session_exists(session_name) and not create_detached_session(session_name, path_name, start_mode):
|
||||
return False
|
||||
|
||||
r = run_tmux(["switch-client", "-t", session_name])
|
||||
return r.returncode == 0
|
||||
|
|
@ -1536,7 +1493,7 @@ tmux_app = typer.Typer(
|
|||
@tmux_app.command("attach")
|
||||
def tmux_cli_attach(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -1546,10 +1503,8 @@ def tmux_cli_attach(
|
|||
),
|
||||
),
|
||||
):
|
||||
"""Attach or create a session for a repo in a workspace.
|
||||
"""
|
||||
Attach or create a session for a repo in a workspace.
|
||||
"""
|
||||
# ws_dir = get_workspace_dir(workspace)
|
||||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||
if not ws_dir:
|
||||
|
|
@ -1567,8 +1522,7 @@ def tmux_cli_attach(
|
|||
|
||||
@tmux_app.command("list-sessions")
|
||||
def tmux_cli_list_sessions(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -1595,8 +1549,7 @@ def tmux_cli_list_sessions(
|
|||
|
||||
@tmux_app.command("remove")
|
||||
def tmux_cli_remove(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
@ -1624,12 +1577,11 @@ def tmux_cli_remove(
|
|||
tmux_sessions = [
|
||||
session for session in get_tmux_sessions() if session.startswith("ω")
|
||||
]
|
||||
if not dry_run:
|
||||
if not Confirm.ask(
|
||||
if not dry_run and not Confirm.ask(
|
||||
f"Are you sure you want to remove all tmux sessions?\n* {'\n* '.join(tmux_sessions)}\n",
|
||||
default=False,
|
||||
):
|
||||
raise typer.Exit(1)
|
||||
raise typer.Exit(1)
|
||||
|
||||
if dry_run:
|
||||
console.print(tmux_sessions)
|
||||
|
|
@ -1638,8 +1590,6 @@ def tmux_cli_remove(
|
|||
for session_name in tmux_sessions:
|
||||
remove_session(session_name)
|
||||
|
||||
#
|
||||
# remove_session_if_exists(session_name)
|
||||
|
||||
|
||||
app.add_typer(tmux_app)
|
||||
|
|
@ -1648,7 +1598,7 @@ app.add_typer(tmux_app)
|
|||
@app.command("attach")
|
||||
def attach(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
workspace: str | None = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue