diff --git a/workspaces.py b/workspaces.py index d8ae44a..f027dd7 100755 --- a/workspaces.py +++ b/workspaces.py @@ -10,6 +10,7 @@ # ] # /// +from __future__ import annotations import os import re import shutil @@ -18,13 +19,17 @@ from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Tuple + import typer + from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict from rich.console import Console from rich.table import Table from rich.prompt import Prompt, Confirm from rich.panel import Panel +from rich.syntax import Syntax +from rich.panel import Panel from iterfzf import iterfzf app = typer.Typer( @@ -233,7 +238,7 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]: return ws_dir.name, "" # First non-empty line must be '# ...' per spec - first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None) + first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None) if first_non_empty_idx is None: return ws_dir.name, "" @@ -865,7 +870,6 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis return selected_files - @app.command("wip") def wip_workspace( ctx: typer.Context, @@ -892,9 +896,48 @@ def wip_workspace( if not ws_dir.exists(): console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") raise typer.Exit(1) - title, _desc = read_workspace_readme(ws_dir) commit_message = f"wip: {title or ws_dir.name}" + commit_workspace(ctx, workspace, message=commit_message) + +@app.command("commit") +def commit_workspace( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to WIP-commit. " + "If omitted, uses the workspace containing the current directory." + ), + ), + message: Optional[str] = typer.Option( + None, + "--message", + "-m", + help="Commit message to use.", + ), +): + """ + For each repo in the workspace: + + - Show list of changed files. + - Ask whether to stage all, none, or pick some files. + - Stage chosen files. + """ + if not message: + console.print( + "[red]No commit message provided. Exiting.[/red]", + file=sys.stderr, + ) + raise typer.Exit(1) + _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) + ws_dir = find_workspace_dir(workspaces_dir, workspace) + if not ws_dir.exists(): + console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + raise typer.Exit(1) + title, _desc = read_workspace_readme(ws_dir) worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): @@ -935,14 +978,14 @@ def wip_workspace( continue # Commit - code, _out, err = run_cmd(["git", "commit", "-m", commit_message], cwd=wt) + code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt) if code != 0: console.print( f"[red]Failed to commit in {wt.name}:[/red]\n{err}" ) else: console.print( - f" [green]Created WIP commit in {wt.name}:[/green] '{commit_message}'" + f" [green]Created commit in {wt.name}:[/green] '{message}'" ) @@ -1050,8 +1093,12 @@ def status_workspace( title, desc = read_workspace_readme(ws_dir) + # if desc: + # console.print(Panel(desc, title=title)) + table = Table( - title=f"Status for workspace '{title}' (dir: {ws_dir.name})", + title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}", + title_justify="left", show_lines=False, ) table.add_column("Repo (dir)", style="bold") @@ -1059,6 +1106,11 @@ def status_workspace( table.add_column("Ahead/Behind/Dirty") table.add_column("Changed Files", justify="right") + files_table = Table(show_lines=False) + files_table.add_column("Repo (dir)", style="bold") + files_table.add_column("File", justify="left") + files_table.add_column("Status", justify="right") + worktrees: List[Path] = [] for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): if child.name.lower() == "readme.md": @@ -1082,10 +1134,304 @@ def status_workspace( str(len(changes)), ) + if changes: + for f, s in changes: + files_table.add_row(wt.name, f, s) + console.print(table) - if desc: - console.print(Panel(desc, title="Workspace description")) + console.print("\nFiles with changes:") + if files_table.rows: + console.print(files_table) + +@app.command("diff") +def diff_workspace( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to diff. " + "If omitted, uses the workspace containing the current directory." + ), + ), + staged: bool = typer.Option( + False, + "--staged", + "-s", + help="Show only staged changes (git diff --cached).", + ), +): + """ + Show git diff for all repos in the workspace. + Uses rich Syntax highlighter for diffs. + """ + _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) + ws_dir = find_workspace_dir(workspaces_dir, workspace) + + if not ws_dir.exists(): + console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]") + raise typer.Exit(1) + + title, _desc = read_workspace_readme(ws_dir) + console.print( + f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})" + ) + + # Collect repos + worktrees: List[Path] = [] + for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()): + if child.name.lower() == "readme.md": + continue + if not ensure_git_repo(child): + continue + worktrees.append(child) + + if not worktrees: + console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]") + raise typer.Exit(0) + + any_diffs = False + + for wt in worktrees: + # Skip clean repos + changes = git_status_porcelain(wt) + if not changes: + continue + + any_diffs = True + + console.rule(f"[bold]{wt.name}[/bold]") + + cmd = ["git", "diff"] + if staged: + cmd.append("--cached") + + code, out, err = run_cmd(cmd, cwd=wt) + if code != 0: + console.print( + f"[red]Failed to get diff for {wt.name}:[/red]\n{err}" + ) + continue + + if not out.strip(): + console.print("[dim]No diff output.[/dim]") + continue + + syntax = Syntax( + out, + "diff", + theme="nord", + line_numbers=False, + word_wrap=False, + ) + + # Wrap each repo diff in a Panel for clarity + console.print(Panel(syntax, title=wt.name, border_style="cyan")) + + if not any_diffs: + console.print("[green]No changes to diff in any repo.[/green]") + +def not_in_tmux() -> bool: + """Return True if not inside tmux or zellij.""" + return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ") + + +def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess: + """Run a tmux command.""" + env = os.environ.copy() + if clear_tmux_env: + env["TMUX"] = "" + return subprocess.run( + ["tmux", *args], + env=env, + check=False, + ) + + +def session_exists(session_name: str) -> bool: + """Check if the tmux session exists.""" + result = run_tmux(["has-session", "-t", f"={session_name}"]) + return result.returncode == 0 + + +def pick_project(base_dir: Path) -> Optional[str]: + """Use fzf to pick a subdirectory (project) from base_dir.""" + if not base_dir.is_dir(): + typer.echo(f"[ta] {base_dir} is not a directory.", err=True) + return None + + subdirs = sorted( + [p.name for p in base_dir.iterdir() if p.is_dir()] + ) + if not subdirs: + typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True) + return None + + if not shutil.which("fzf"): + typer.echo("[ta] fzf not found in PATH.", err=True) + return None + + proc = subprocess.run( + ["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"], + input="\n".join(subdirs), + text=True, + capture_output=True, + ) + + if proc.returncode != 0: + # Cancelled or failed + return None + + choice = proc.stdout.strip() + return choice or None + + +def create_detached_session( + session_name: str, + path_name: Path, + start_mode: bool, +) -> bool: + """ + Create a detached session. + + - If start_mode: just a single window. + - Else: split layout with nvim on top. + """ + # Run tmux as if not in tmux (clear TMUX env) + if start_mode: + r = run_tmux( + ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], + clear_tmux_env=True, + ) + return r.returncode == 0 + else: + r = run_tmux( + ["new-session", "-Ad", "-s", session_name, "-c", str(path_name)], + clear_tmux_env=True, + ) + if r.returncode != 0: + return False + + r = run_tmux( + [ + "split-window", + "-vb", + "-t", + session_name, + "-c", + str(path_name), + "-p", + "70", + ], + clear_tmux_env=True, + ) + if r.returncode != 0: + return False + + r = run_tmux( + [ + "send-keys", + "-t", + session_name, + "nvim '+Telescope find_files'", + "Enter", + ], + clear_tmux_env=True, + ) + return r.returncode == 0 + + +def create_if_needed_and_attach( + session_name: str, + path_name: Path, + start_mode: bool, +) -> bool: + """ + Mimic the bash logic: + + - If not in tmux: `tmux new-session -As ... -c path_name` + - Else: + - If session doesn't exist: create_detached_session() + - Then `tmux switch-client -t session_name` + """ + if not_in_tmux(): + r = run_tmux( + ["new-session", "-As", session_name, "-c", str(path_name)], + ) + return r.returncode == 0 + + # Inside tmux + if not session_exists(session_name): + if not create_detached_session(session_name, path_name, start_mode): + return False + + r = run_tmux(["switch-client", "-t", session_name]) + return r.returncode == 0 + + +def attach_to_first_session() -> None: + """Fallback: attach to first tmux session and open choose-tree.""" + # Attach to the first listed session + list_proc = subprocess.run( + ["tmux", "list-sessions", "-F", "#{session_name}"], + text=True, + capture_output=True, + ) + if list_proc.returncode != 0 or not list_proc.stdout.strip(): + typer.echo("[ta] No tmux sessions found to attach to.", err=True) + raise typer.Exit(1) + + first_session = list_proc.stdout.strip().splitlines()[0] + + attach_proc = run_tmux(["attach-session", "-t", first_session]) + if attach_proc.returncode != 0: + raise typer.Exit(attach_proc.returncode) + + # After attach, show choose-tree (this will run in the attached session) + run_tmux(["choose-tree", "-Za"]) + +tmux_app = typer.Typer( + name="tmux", + help="tmux commands", + add_completion=False, +) + +@tmux_app.command('attach') +def attach( + ctx: typer.Context, + workspace: Optional[str] = typer.Option( + None, + "--workspace", + "-w", + help=( + "Workspace directory name to show status for. " + "If omitted, uses the workspace containing the current directory." + ), + ), +): + """ + Attach or create a session for a repo in a workspace. + """ + # ws_dir = get_workspace_dir(workspace) + _settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx) + ws_dir = find_workspace_dir(workspaces_dir, workspace) + if not ws_dir: + console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]") + raise typer.Exit(1) + # pick repo in workspace + repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()]) + if not repo: + console.print("[red]No repo selected. Exiting.[/red]") + raise typer.Exit(1) + print(ws_dir, repo) + session_name = f'{ws_dir.name}|{repo.name}' + print(f'{ws_dir.name}|{repo.name}') + create_if_needed_and_attach(session_name, repo, False) + + +app.add_typer(tmux_app) if __name__ == "__main__": app()