diff --git a/workspaces.py b/workspaces.py index f027dd7..d8ae44a 100755 --- a/workspaces.py +++ b/workspaces.py @@ -10,7 +10,6 @@ # ] # /// -from __future__ import annotations import os import re import shutil @@ -19,17 +18,13 @@ from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Tuple - import typer - from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict from rich.console import Console from rich.table import Table from rich.prompt import Prompt, Confirm from rich.panel import Panel -from rich.syntax import Syntax -from rich.panel import Panel from iterfzf import iterfzf app = typer.Typer( @@ -238,7 +233,7 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]: return ws_dir.name, "" # First non-empty line must be '# ...' per spec - first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None) + first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None) if first_non_empty_idx is None: return ws_dir.name, "" @@ -870,6 +865,7 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis return selected_files + @app.command("wip") def wip_workspace( ctx: typer.Context, @@ -896,48 +892,9 @@ def wip_workspace( if not ws_dir.exists(): console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) + title, _desc = read_workspace_readme(ws_dir) commit_message = f"wip: {title or ws_dir.name}" - commit_workspace(ctx, workspace, message=commit_message) - -@app.command("commit") -def commit_workspace( - ctx: typer.Context, - workspace: Optional[str] = typer.Option( - None, - "--workspace", - "-w", - help=( - "Workspace directory name to WIP-commit. " - "If omitted, uses the workspace containing the current directory." - ), - ), - message: Optional[str] = typer.Option( - None, - "--message", - "-m", - help="Commit message to use.", - ), -): - """ - For each repo in the workspace: - - - Show list of changed files. - - Ask whether to stage all, none, or pick some files. - - Stage chosen files. - """ - if not message: - console.print( - "[red]No commit message provided. Exiting.[/red]", - file=sys.stderr, - ) - raise typer.Exit(1) - _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) - ws_dir = find_workspace_dir(workspaces_dir, workspace) - if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") - raise typer.Exit(1) - title, _desc = read_workspace_readme(ws_dir) worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): @@ -978,14 +935,14 @@ def commit_workspace( continue # Commit - code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt) + code, _out, err = run_cmd(["git", "commit", "-m", commit_message], cwd=wt) if code != 0: console.print( f"[red]Failed to commit in {wt.name}:[/red]\n{err}" ) else: console.print( - f" [green]Created commit in {wt.name}:[/green] '{message}'" + f" [green]Created WIP commit in {wt.name}:[/green] '{commit_message}'" ) @@ -1093,12 +1050,8 @@ def status_workspace( title, desc = read_workspace_readme(ws_dir) - # if desc: - # console.print(Panel(desc, title=title)) - table = Table( - title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", - title_justify="left", + title=f"Status for workspace '{title}' (dir: {ws_dir.name})", show_lines=False, ) table.add_column("Repo (dir)", style="bold") @@ -1106,11 +1059,6 @@ def status_workspace( table.add_column("Ahead/Behind/Dirty") table.add_column("Changed Files", justify="right") - files_table = Table(show_lines=False) - files_table.add_column("Repo (dir)", style="bold") - files_table.add_column("File", justify="left") - files_table.add_column("Status", justify="right") - worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": @@ -1134,304 +1082,10 @@ def status_workspace( str(len(changes)), ) - if changes: - for f, s in changes: - files_table.add_row(wt.name, f, s) - console.print(table) - console.print("\nFiles with changes:") - if files_table.rows: - console.print(files_table) - -@app.command("diff") -def diff_workspace( - ctx: typer.Context, - workspace: Optional[str] = typer.Option( - None, - "--workspace", - "-w", - help=( - "Workspace directory name to diff. " - "If omitted, uses the workspace containing the current directory." - ), - ), - staged: bool = typer.Option( - False, - "--staged", - "-s", - help="Show only staged changes (git diff --cached).", - ), -): - """ - Show git diff for all repos in the workspace. - Uses rich Syntax highlighter for diffs. - """ - _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) - ws_dir = find_workspace_dir(workspaces_dir, workspace) - - if not ws_dir.exists(): - console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") - raise typer.Exit(1) - - title, _desc = read_workspace_readme(ws_dir) - console.print( - f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})" - ) - - # Collect repos - worktrees: List[Path] = [] - for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): - if child.name.lower() == "readme.md": - continue - if not ensure_git_repo(child): - continue - worktrees.append(child) - - if not worktrees: - console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]") - raise typer.Exit(0) - - any_diffs = False - - for wt in worktrees: - # Skip clean repos - changes = git_status_porcelain(wt) - if not changes: - continue - - any_diffs = True - - console.rule(f"[bold]{wt.name}[/bold]") - - cmd = ["git", "diff"] - if staged: - cmd.append("--cached") - - code, out, err = run_cmd(cmd, cwd=wt) - if code != 0: - console.print( - f"[red]Failed to get diff for {wt.name}:[/red]\n{err}" - ) - continue - - if not out.strip(): - console.print("[dim]No diff output.[/dim]") - continue - - syntax = Syntax( - out, - "diff", - theme="nord", - line_numbers=False, - word_wrap=False, - ) - - # Wrap each repo diff in a Panel for clarity - console.print(Panel(syntax, title=wt.name, border_style="cyan")) - - if not any_diffs: - console.print("[green]No changes to diff in any repo.[/green]") - -def not_in_tmux() -> bool: - """Return True if not inside tmux or zellij.""" - return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ") - - -def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess: - """Run a tmux command.""" - env = os.environ.copy() - if clear_tmux_env: - env["TMUX"] = "" - return subprocess.run( - ["tmux", *args], - env=env, - check=False, - ) - - -def session_exists(session_name: str) -> bool: - """Check if the tmux session exists.""" - result = run_tmux(["has-session", "-t", f"={session_name}"]) - return result.returncode == 0 - - -def pick_project(base_dir: Path) -> Optional[str]: - """Use fzf to pick a subdirectory (project) from base_dir.""" - if not base_dir.is_dir(): - typer.echo(f"[ta] {base_dir} is not a directory.", err=True) - return None - - subdirs = sorted( - [p.name for p in base_dir.iterdir() if p.is_dir()] - ) - if not subdirs: - typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True) - return None - - if not shutil.which("fzf"): - typer.echo("[ta] fzf not found in PATH.", err=True) - return None - - proc = subprocess.run( - ["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"], - input="\n".join(subdirs), - text=True, - capture_output=True, - ) - - if proc.returncode != 0: - # Cancelled or failed - return None - - choice = proc.stdout.strip() - return choice or None - - -def create_detached_session( - session_name: str, - path_name: Path, - start_mode: bool, -) -> bool: - """ - Create a detached session. - - - If start_mode: just a single window. - - Else: split layout with nvim on top. - """ - # Run tmux as if not in tmux (clear TMUX env) - if start_mode: - r = run_tmux( - ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], - clear_tmux_env=True, - ) - return r.returncode == 0 - else: - r = run_tmux( - ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], - clear_tmux_env=True, - ) - if r.returncode != 0: - return False - - r = run_tmux( - [ - "split-window", - "-vb", - "-t", - session_name, - "-c", - str(path_name), - "-p", - "70", - ], - clear_tmux_env=True, - ) - if r.returncode != 0: - return False - - r = run_tmux( - [ - "send-keys", - "-t", - session_name, - "nvim '+Telescope find_files'", - "Enter", - ], - clear_tmux_env=True, - ) - return r.returncode == 0 - - -def create_if_needed_and_attach( - session_name: str, - path_name: Path, - start_mode: bool, -) -> bool: - """ - Mimic the bash logic: - - - If not in tmux: `tmux new-session -As ... -c path_name` - - Else: - - If session doesn't exist: create_detached_session() - - Then `tmux switch-client -t session_name` - """ - if not_in_tmux(): - r = run_tmux( - ["new-session", "-As", session_name, "-c", str(path_name)], - ) - return r.returncode == 0 - - # Inside tmux - if not session_exists(session_name): - if not create_detached_session(session_name, path_name, start_mode): - return False - - r = run_tmux(["switch-client", "-t", session_name]) - return r.returncode == 0 - - -def attach_to_first_session() -> None: - """Fallback: attach to first tmux session and open choose-tree.""" - # Attach to the first listed session - list_proc = subprocess.run( - ["tmux", "list-sessions", "-F", "#{session_name}"], - text=True, - capture_output=True, - ) - if list_proc.returncode != 0 or not list_proc.stdout.strip(): - typer.echo("[ta] No tmux sessions found to attach to.", err=True) - raise typer.Exit(1) - - first_session = list_proc.stdout.strip().splitlines()[0] - - attach_proc = run_tmux(["attach-session", "-t", first_session]) - if attach_proc.returncode != 0: - raise typer.Exit(attach_proc.returncode) - - # After attach, show choose-tree (this will run in the attached session) - run_tmux(["choose-tree", "-Za"]) - -tmux_app = typer.Typer( - name="tmux", - help="tmux commands", - add_completion=False, -) - -@tmux_app.command('attach') -def attach( - ctx: typer.Context, - workspace: Optional[str] = typer.Option( - None, - "--workspace", - "-w", - help=( - "Workspace directory name to show status for. " - "If omitted, uses the workspace containing the current directory." - ), - ), -): - """ - Attach or create a session for a repo in a workspace. - """ - # ws_dir = get_workspace_dir(workspace) - _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) - ws_dir = find_workspace_dir(workspaces_dir, workspace) - if not ws_dir: - console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]") - raise typer.Exit(1) - # pick repo in workspace - repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()]) - if not repo: - console.print("[red]No repo selected. Exiting.[/red]") - raise typer.Exit(1) - print(ws_dir, repo) - session_name = f'{ws_dir.name}|{repo.name}' - print(f'{ws_dir.name}|{repo.name}') - create_if_needed_and_attach(session_name, repo, False) - - -app.add_typer(tmux_app) + if desc: + console.print(Panel(desc, title="Workspace description")) if __name__ == "__main__": app()