add wip and push commands
This commit is contained in:
parent
5c5d6a6bc2
commit
7455e540e3
1 changed files with 228 additions and 0 deletions
228
workspaces.py
228
workspaces.py
|
|
@ -24,6 +24,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
|
|||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.prompt import Prompt, Confirm
|
||||
from rich.panel import Panel
|
||||
from iterfzf import iterfzf
|
||||
|
||||
app = typer.Typer(
|
||||
|
|
@ -787,5 +788,232 @@ def remove_workspace(
|
|||
)
|
||||
|
||||
|
||||
# ---------------- wip (stage + commit) ----------------
|
||||
|
||||
|
||||
def git_status_porcelain(repo_path: Path) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
Return a list of (status_code, path) for changes in the repo/worktree.
|
||||
|
||||
Uses 'git status --porcelain'.
|
||||
"""
|
||||
code, out, err = run_cmd(["git", "status", "--porcelain"], cwd=repo_path)
|
||||
if code != 0:
|
||||
console.print(
|
||||
f"[red]Failed to get status for {repo_path.name}:[/red]\n{err}"
|
||||
)
|
||||
return []
|
||||
|
||||
changes: List[Tuple[str, str]] = []
|
||||
for line in out.splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
# Format: 'XY path'
|
||||
status = line[:2]
|
||||
path = line[3:]
|
||||
changes.append((status, path))
|
||||
return changes
|
||||
|
||||
|
||||
def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> List[str]:
|
||||
"""
|
||||
Interactively choose which files to stage for WIP.
|
||||
|
||||
- Show list of changed files.
|
||||
- Ask user:
|
||||
* stage all
|
||||
* stage none
|
||||
* pick some using iterfzf (multi-select)
|
||||
"""
|
||||
if not changes:
|
||||
return []
|
||||
|
||||
files = [p for _s, p in changes]
|
||||
|
||||
console.print(
|
||||
Panel(
|
||||
"\n".join(files),
|
||||
title=f"Changed files in {repo_path.name}",
|
||||
subtitle="Use 'all', 'none', or 'pick' when prompted",
|
||||
)
|
||||
)
|
||||
|
||||
choice = Prompt.ask(
|
||||
"Stage which files?",
|
||||
choices=["all", "none", "pick"],
|
||||
default="all",
|
||||
)
|
||||
|
||||
if choice == "none":
|
||||
return []
|
||||
|
||||
if choice == "all":
|
||||
return files
|
||||
|
||||
# Multi-select via iterfzf
|
||||
selected = iterfzf(files, multi=True, prompt="pick files> ")
|
||||
if not selected:
|
||||
return []
|
||||
# iterfzf may return a single string or list depending on version;
|
||||
# normalize to list of strings.
|
||||
if isinstance(selected, str):
|
||||
selected_files = [selected]
|
||||
else:
|
||||
selected_files = list(selected)
|
||||
|
||||
return selected_files
|
||||
|
||||
|
||||
@app.command("wip")
|
||||
def wip_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
help=(
|
||||
"Workspace directory name to WIP-commit. "
|
||||
"If omitted, uses the workspace containing the current directory."
|
||||
),
|
||||
),
|
||||
):
|
||||
"""
|
||||
For each repo in the workspace:
|
||||
|
||||
- Show list of changed files.
|
||||
- Ask whether to stage all, none, or pick some files.
|
||||
- Stage chosen files.
|
||||
- Make commit: 'wip: <workspace display name>'.
|
||||
"""
|
||||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||
if not ws_dir.exists():
|
||||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
title, _desc = read_workspace_readme(ws_dir)
|
||||
commit_message = f"wip: {title or ws_dir.name}"
|
||||
|
||||
worktrees: List[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
if not ensure_git_repo(child):
|
||||
continue
|
||||
worktrees.append(child)
|
||||
|
||||
if not worktrees:
|
||||
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||||
raise typer.Exit(0)
|
||||
|
||||
for wt in worktrees:
|
||||
console.print(f"\n[bold]Repo:[/bold] {wt.name}")
|
||||
changes = git_status_porcelain(wt)
|
||||
if not changes:
|
||||
console.print(" [green]No changes.[/green]")
|
||||
continue
|
||||
|
||||
files_to_stage = choose_files_for_wip(wt, changes)
|
||||
if not files_to_stage:
|
||||
console.print(" [yellow]Nothing selected to stage.[/yellow]")
|
||||
continue
|
||||
|
||||
# Stage selected files
|
||||
for f in files_to_stage:
|
||||
code, _out, err = run_cmd(["git", "add", f], cwd=wt)
|
||||
if code != 0:
|
||||
console.print(
|
||||
f"[red]Failed to add {f} in {wt.name}:[/red]\n{err}"
|
||||
)
|
||||
|
||||
# Check if there is anything to commit
|
||||
code, out, _err = run_cmd(["git", "diff", "--cached", "--name-only"], cwd=wt)
|
||||
if code != 0 or not out.strip():
|
||||
console.print(" [yellow]No staged changes to commit.[/yellow]")
|
||||
continue
|
||||
|
||||
# Commit
|
||||
code, _out, err = run_cmd(["git", "commit", "-m", commit_message], cwd=wt)
|
||||
if code != 0:
|
||||
console.print(
|
||||
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
|
||||
)
|
||||
else:
|
||||
console.print(
|
||||
f" [green]Created WIP commit in {wt.name}:[/green] '{commit_message}'"
|
||||
)
|
||||
|
||||
|
||||
# ---------------- push ----------------
|
||||
|
||||
|
||||
@app.command("push")
|
||||
def push_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
help=(
|
||||
"Workspace directory name to push. "
|
||||
"If omitted, uses the workspace containing the current directory."
|
||||
),
|
||||
),
|
||||
remote: str = typer.Option(
|
||||
"origin",
|
||||
"--remote",
|
||||
"-r",
|
||||
help="Remote name to push to (default: origin).",
|
||||
),
|
||||
):
|
||||
"""
|
||||
For each repo in the workspace, run 'git push <remote> <current-branch>'.
|
||||
|
||||
Skips repos with no current branch or when push fails.
|
||||
"""
|
||||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||
if not ws_dir.exists():
|
||||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
title, _desc = read_workspace_readme(ws_dir)
|
||||
console.print(
|
||||
f"Pushing all repos in workspace [bold]{title or ws_dir.name}[/bold] "
|
||||
f"(dir: {ws_dir.name}) to remote '{remote}'"
|
||||
)
|
||||
|
||||
worktrees: List[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
if not ensure_git_repo(child):
|
||||
continue
|
||||
worktrees.append(child)
|
||||
|
||||
if not worktrees:
|
||||
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||||
raise typer.Exit(0)
|
||||
|
||||
for wt in worktrees:
|
||||
branch = get_current_branch(wt)
|
||||
if not branch or branch == "HEAD":
|
||||
console.print(
|
||||
f"[yellow]Skipping {wt.name}: no current branch (detached HEAD?).[/yellow]"
|
||||
)
|
||||
continue
|
||||
|
||||
console.print(f"\n[bold]Repo:[/bold] {wt.name} [bold]Branch:[/bold] {branch}")
|
||||
code, _out, err = run_cmd(["git", "push", remote, branch], cwd=wt)
|
||||
if code != 0:
|
||||
console.print(
|
||||
f"[red]Failed to push {wt.name} ({branch}) to {remote}:[/red]\n{err}"
|
||||
)
|
||||
else:
|
||||
console.print(
|
||||
f"[green]Pushed {wt.name} ({branch}) to {remote}[/green]"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue