Compare commits
4 commits
575e51eaa9
...
42d034eef2
| Author | SHA1 | Date | |
|---|---|---|---|
| 42d034eef2 | |||
| 0ee5711dcd | |||
| 4698450200 | |||
| d5d081f743 |
1 changed files with 354 additions and 8 deletions
362
workspaces.py
362
workspaces.py
|
|
@ -10,6 +10,7 @@
|
||||||
# ]
|
# ]
|
||||||
# ///
|
# ///
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
|
|
@ -18,13 +19,17 @@ from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Optional, Tuple
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
|
||||||
import typer
|
import typer
|
||||||
|
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.table import Table
|
from rich.table import Table
|
||||||
from rich.prompt import Prompt, Confirm
|
from rich.prompt import Prompt, Confirm
|
||||||
from rich.panel import Panel
|
from rich.panel import Panel
|
||||||
|
from rich.syntax import Syntax
|
||||||
|
from rich.panel import Panel
|
||||||
from iterfzf import iterfzf
|
from iterfzf import iterfzf
|
||||||
|
|
||||||
app = typer.Typer(
|
app = typer.Typer(
|
||||||
|
|
@ -233,7 +238,7 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
|
||||||
return ws_dir.name, ""
|
return ws_dir.name, ""
|
||||||
|
|
||||||
# First non-empty line must be '# ...' per spec
|
# First non-empty line must be '# ...' per spec
|
||||||
first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None)
|
first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None)
|
||||||
if first_non_empty_idx is None:
|
if first_non_empty_idx is None:
|
||||||
return ws_dir.name, ""
|
return ws_dir.name, ""
|
||||||
|
|
||||||
|
|
@ -865,7 +870,6 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
|
||||||
|
|
||||||
return selected_files
|
return selected_files
|
||||||
|
|
||||||
|
|
||||||
@app.command("wip")
|
@app.command("wip")
|
||||||
def wip_workspace(
|
def wip_workspace(
|
||||||
ctx: typer.Context,
|
ctx: typer.Context,
|
||||||
|
|
@ -892,9 +896,48 @@ def wip_workspace(
|
||||||
if not ws_dir.exists():
|
if not ws_dir.exists():
|
||||||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
|
|
||||||
title, _desc = read_workspace_readme(ws_dir)
|
title, _desc = read_workspace_readme(ws_dir)
|
||||||
commit_message = f"wip: {title or ws_dir.name}"
|
commit_message = f"wip: {title or ws_dir.name}"
|
||||||
|
commit_workspace(ctx, workspace, message=commit_message)
|
||||||
|
|
||||||
|
@app.command("commit")
|
||||||
|
def commit_workspace(
|
||||||
|
ctx: typer.Context,
|
||||||
|
workspace: Optional[str] = typer.Option(
|
||||||
|
None,
|
||||||
|
"--workspace",
|
||||||
|
"-w",
|
||||||
|
help=(
|
||||||
|
"Workspace directory name to WIP-commit. "
|
||||||
|
"If omitted, uses the workspace containing the current directory."
|
||||||
|
),
|
||||||
|
),
|
||||||
|
message: Optional[str] = typer.Option(
|
||||||
|
None,
|
||||||
|
"--message",
|
||||||
|
"-m",
|
||||||
|
help="Commit message to use.",
|
||||||
|
),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
For each repo in the workspace:
|
||||||
|
|
||||||
|
- Show list of changed files.
|
||||||
|
- Ask whether to stage all, none, or pick some files.
|
||||||
|
- Stage chosen files.
|
||||||
|
"""
|
||||||
|
if not message:
|
||||||
|
console.print(
|
||||||
|
"[red]No commit message provided. Exiting.[/red]",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||||
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||||
|
if not ws_dir.exists():
|
||||||
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
title, _desc = read_workspace_readme(ws_dir)
|
||||||
|
|
||||||
worktrees: List[Path] = []
|
worktrees: List[Path] = []
|
||||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||||
|
|
@ -935,14 +978,14 @@ def wip_workspace(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Commit
|
# Commit
|
||||||
code, _out, err = run_cmd(["git", "commit", "-m", commit_message], cwd=wt)
|
code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt)
|
||||||
if code != 0:
|
if code != 0:
|
||||||
console.print(
|
console.print(
|
||||||
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
|
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
console.print(
|
console.print(
|
||||||
f" [green]Created WIP commit in {wt.name}:[/green] '{commit_message}'"
|
f" [green]Created commit in {wt.name}:[/green] '{message}'"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1050,8 +1093,12 @@ def status_workspace(
|
||||||
|
|
||||||
title, desc = read_workspace_readme(ws_dir)
|
title, desc = read_workspace_readme(ws_dir)
|
||||||
|
|
||||||
|
# if desc:
|
||||||
|
# console.print(Panel(desc, title=title))
|
||||||
|
|
||||||
table = Table(
|
table = Table(
|
||||||
title=f"Status for workspace '{title}' (dir: {ws_dir.name})",
|
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
|
||||||
|
title_justify="left",
|
||||||
show_lines=False,
|
show_lines=False,
|
||||||
)
|
)
|
||||||
table.add_column("Repo (dir)", style="bold")
|
table.add_column("Repo (dir)", style="bold")
|
||||||
|
|
@ -1059,6 +1106,11 @@ def status_workspace(
|
||||||
table.add_column("Ahead/Behind/Dirty")
|
table.add_column("Ahead/Behind/Dirty")
|
||||||
table.add_column("Changed Files", justify="right")
|
table.add_column("Changed Files", justify="right")
|
||||||
|
|
||||||
|
files_table = Table(show_lines=False)
|
||||||
|
files_table.add_column("Repo (dir)", style="bold")
|
||||||
|
files_table.add_column("File", justify="left")
|
||||||
|
files_table.add_column("Status", justify="right")
|
||||||
|
|
||||||
worktrees: List[Path] = []
|
worktrees: List[Path] = []
|
||||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||||
if child.name.lower() == "readme.md":
|
if child.name.lower() == "readme.md":
|
||||||
|
|
@ -1082,10 +1134,304 @@ def status_workspace(
|
||||||
str(len(changes)),
|
str(len(changes)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if changes:
|
||||||
|
for f, s in changes:
|
||||||
|
files_table.add_row(wt.name, f, s)
|
||||||
|
|
||||||
console.print(table)
|
console.print(table)
|
||||||
|
|
||||||
if desc:
|
console.print("\nFiles with changes:")
|
||||||
console.print(Panel(desc, title="Workspace description"))
|
if files_table.rows:
|
||||||
|
console.print(files_table)
|
||||||
|
|
||||||
|
@app.command("diff")
|
||||||
|
def diff_workspace(
|
||||||
|
ctx: typer.Context,
|
||||||
|
workspace: Optional[str] = typer.Option(
|
||||||
|
None,
|
||||||
|
"--workspace",
|
||||||
|
"-w",
|
||||||
|
help=(
|
||||||
|
"Workspace directory name to diff. "
|
||||||
|
"If omitted, uses the workspace containing the current directory."
|
||||||
|
),
|
||||||
|
),
|
||||||
|
staged: bool = typer.Option(
|
||||||
|
False,
|
||||||
|
"--staged",
|
||||||
|
"-s",
|
||||||
|
help="Show only staged changes (git diff --cached).",
|
||||||
|
),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Show git diff for all repos in the workspace.
|
||||||
|
Uses rich Syntax highlighter for diffs.
|
||||||
|
"""
|
||||||
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||||
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||||
|
|
||||||
|
if not ws_dir.exists():
|
||||||
|
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
title, _desc = read_workspace_readme(ws_dir)
|
||||||
|
console.print(
|
||||||
|
f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Collect repos
|
||||||
|
worktrees: List[Path] = []
|
||||||
|
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||||
|
if child.name.lower() == "readme.md":
|
||||||
|
continue
|
||||||
|
if not ensure_git_repo(child):
|
||||||
|
continue
|
||||||
|
worktrees.append(child)
|
||||||
|
|
||||||
|
if not worktrees:
|
||||||
|
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||||||
|
raise typer.Exit(0)
|
||||||
|
|
||||||
|
any_diffs = False
|
||||||
|
|
||||||
|
for wt in worktrees:
|
||||||
|
# Skip clean repos
|
||||||
|
changes = git_status_porcelain(wt)
|
||||||
|
if not changes:
|
||||||
|
continue
|
||||||
|
|
||||||
|
any_diffs = True
|
||||||
|
|
||||||
|
console.rule(f"[bold]{wt.name}[/bold]")
|
||||||
|
|
||||||
|
cmd = ["git", "diff"]
|
||||||
|
if staged:
|
||||||
|
cmd.append("--cached")
|
||||||
|
|
||||||
|
code, out, err = run_cmd(cmd, cwd=wt)
|
||||||
|
if code != 0:
|
||||||
|
console.print(
|
||||||
|
f"[red]Failed to get diff for {wt.name}:[/red]\n{err}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not out.strip():
|
||||||
|
console.print("[dim]No diff output.[/dim]")
|
||||||
|
continue
|
||||||
|
|
||||||
|
syntax = Syntax(
|
||||||
|
out,
|
||||||
|
"diff",
|
||||||
|
theme="nord",
|
||||||
|
line_numbers=False,
|
||||||
|
word_wrap=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wrap each repo diff in a Panel for clarity
|
||||||
|
console.print(Panel(syntax, title=wt.name, border_style="cyan"))
|
||||||
|
|
||||||
|
if not any_diffs:
|
||||||
|
console.print("[green]No changes to diff in any repo.[/green]")
|
||||||
|
|
||||||
|
def not_in_tmux() -> bool:
|
||||||
|
"""Return True if not inside tmux or zellij."""
|
||||||
|
return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ")
|
||||||
|
|
||||||
|
|
||||||
|
def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess:
|
||||||
|
"""Run a tmux command."""
|
||||||
|
env = os.environ.copy()
|
||||||
|
if clear_tmux_env:
|
||||||
|
env["TMUX"] = ""
|
||||||
|
return subprocess.run(
|
||||||
|
["tmux", *args],
|
||||||
|
env=env,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def session_exists(session_name: str) -> bool:
|
||||||
|
"""Check if the tmux session exists."""
|
||||||
|
result = run_tmux(["has-session", "-t", f"={session_name}"])
|
||||||
|
return result.returncode == 0
|
||||||
|
|
||||||
|
|
||||||
|
def pick_project(base_dir: Path) -> Optional[str]:
|
||||||
|
"""Use fzf to pick a subdirectory (project) from base_dir."""
|
||||||
|
if not base_dir.is_dir():
|
||||||
|
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
subdirs = sorted(
|
||||||
|
[p.name for p in base_dir.iterdir() if p.is_dir()]
|
||||||
|
)
|
||||||
|
if not subdirs:
|
||||||
|
typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not shutil.which("fzf"):
|
||||||
|
typer.echo("[ta] fzf not found in PATH.", err=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
proc = subprocess.run(
|
||||||
|
["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"],
|
||||||
|
input="\n".join(subdirs),
|
||||||
|
text=True,
|
||||||
|
capture_output=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if proc.returncode != 0:
|
||||||
|
# Cancelled or failed
|
||||||
|
return None
|
||||||
|
|
||||||
|
choice = proc.stdout.strip()
|
||||||
|
return choice or None
|
||||||
|
|
||||||
|
|
||||||
|
def create_detached_session(
|
||||||
|
session_name: str,
|
||||||
|
path_name: Path,
|
||||||
|
start_mode: bool,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Create a detached session.
|
||||||
|
|
||||||
|
- If start_mode: just a single window.
|
||||||
|
- Else: split layout with nvim on top.
|
||||||
|
"""
|
||||||
|
# Run tmux as if not in tmux (clear TMUX env)
|
||||||
|
if start_mode:
|
||||||
|
r = run_tmux(
|
||||||
|
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
||||||
|
clear_tmux_env=True,
|
||||||
|
)
|
||||||
|
return r.returncode == 0
|
||||||
|
else:
|
||||||
|
r = run_tmux(
|
||||||
|
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
||||||
|
clear_tmux_env=True,
|
||||||
|
)
|
||||||
|
if r.returncode != 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
r = run_tmux(
|
||||||
|
[
|
||||||
|
"split-window",
|
||||||
|
"-vb",
|
||||||
|
"-t",
|
||||||
|
session_name,
|
||||||
|
"-c",
|
||||||
|
str(path_name),
|
||||||
|
"-p",
|
||||||
|
"70",
|
||||||
|
],
|
||||||
|
clear_tmux_env=True,
|
||||||
|
)
|
||||||
|
if r.returncode != 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
r = run_tmux(
|
||||||
|
[
|
||||||
|
"send-keys",
|
||||||
|
"-t",
|
||||||
|
session_name,
|
||||||
|
"nvim '+Telescope find_files'",
|
||||||
|
"Enter",
|
||||||
|
],
|
||||||
|
clear_tmux_env=True,
|
||||||
|
)
|
||||||
|
return r.returncode == 0
|
||||||
|
|
||||||
|
|
||||||
|
def create_if_needed_and_attach(
|
||||||
|
session_name: str,
|
||||||
|
path_name: Path,
|
||||||
|
start_mode: bool,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Mimic the bash logic:
|
||||||
|
|
||||||
|
- If not in tmux: `tmux new-session -As ... -c path_name`
|
||||||
|
- Else:
|
||||||
|
- If session doesn't exist: create_detached_session()
|
||||||
|
- Then `tmux switch-client -t session_name`
|
||||||
|
"""
|
||||||
|
if not_in_tmux():
|
||||||
|
r = run_tmux(
|
||||||
|
["new-session", "-As", session_name, "-c", str(path_name)],
|
||||||
|
)
|
||||||
|
return r.returncode == 0
|
||||||
|
|
||||||
|
# Inside tmux
|
||||||
|
if not session_exists(session_name):
|
||||||
|
if not create_detached_session(session_name, path_name, start_mode):
|
||||||
|
return False
|
||||||
|
|
||||||
|
r = run_tmux(["switch-client", "-t", session_name])
|
||||||
|
return r.returncode == 0
|
||||||
|
|
||||||
|
|
||||||
|
def attach_to_first_session() -> None:
|
||||||
|
"""Fallback: attach to first tmux session and open choose-tree."""
|
||||||
|
# Attach to the first listed session
|
||||||
|
list_proc = subprocess.run(
|
||||||
|
["tmux", "list-sessions", "-F", "#{session_name}"],
|
||||||
|
text=True,
|
||||||
|
capture_output=True,
|
||||||
|
)
|
||||||
|
if list_proc.returncode != 0 or not list_proc.stdout.strip():
|
||||||
|
typer.echo("[ta] No tmux sessions found to attach to.", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
first_session = list_proc.stdout.strip().splitlines()[0]
|
||||||
|
|
||||||
|
attach_proc = run_tmux(["attach-session", "-t", first_session])
|
||||||
|
if attach_proc.returncode != 0:
|
||||||
|
raise typer.Exit(attach_proc.returncode)
|
||||||
|
|
||||||
|
# After attach, show choose-tree (this will run in the attached session)
|
||||||
|
run_tmux(["choose-tree", "-Za"])
|
||||||
|
|
||||||
|
tmux_app = typer.Typer(
|
||||||
|
name="tmux",
|
||||||
|
help="tmux commands",
|
||||||
|
add_completion=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
@tmux_app.command('attach')
|
||||||
|
def attach(
|
||||||
|
ctx: typer.Context,
|
||||||
|
workspace: Optional[str] = typer.Option(
|
||||||
|
None,
|
||||||
|
"--workspace",
|
||||||
|
"-w",
|
||||||
|
help=(
|
||||||
|
"Workspace directory name to show status for. "
|
||||||
|
"If omitted, uses the workspace containing the current directory."
|
||||||
|
),
|
||||||
|
),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Attach or create a session for a repo in a workspace.
|
||||||
|
"""
|
||||||
|
# ws_dir = get_workspace_dir(workspace)
|
||||||
|
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||||
|
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||||
|
if not ws_dir:
|
||||||
|
console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
# pick repo in workspace
|
||||||
|
repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()])
|
||||||
|
if not repo:
|
||||||
|
console.print("[red]No repo selected. Exiting.[/red]")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
print(ws_dir, repo)
|
||||||
|
session_name = f'{ws_dir.name}|{repo.name}'
|
||||||
|
print(f'{ws_dir.name}|{repo.name}')
|
||||||
|
create_if_needed_and_attach(session_name, repo, False)
|
||||||
|
|
||||||
|
|
||||||
|
app.add_typer(tmux_app)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app()
|
app()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue