Compare commits

...

4 commits

View file

@ -10,6 +10,7 @@
# ]
# ///
from __future__ import annotations
import os
import re
import shutil
@ -18,13 +19,17 @@ from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
import typer
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt, Confirm
from rich.panel import Panel
from rich.syntax import Syntax
from rich.panel import Panel
from iterfzf import iterfzf
app = typer.Typer(
@ -233,7 +238,7 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
return ws_dir.name, ""
# First non-empty line must be '# ...' per spec
first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None)
first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None)
if first_non_empty_idx is None:
return ws_dir.name, ""
@ -865,7 +870,6 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
return selected_files
@app.command("wip")
def wip_workspace(
ctx: typer.Context,
@ -892,9 +896,48 @@ def wip_workspace(
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
commit_message = f"wip: {title or ws_dir.name}"
commit_workspace(ctx, workspace, message=commit_message)
@app.command("commit")
def commit_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to WIP-commit. "
"If omitted, uses the workspace containing the current directory."
),
),
message: Optional[str] = typer.Option(
None,
"--message",
"-m",
help="Commit message to use.",
),
):
"""
For each repo in the workspace:
- Show list of changed files.
- Ask whether to stage all, none, or pick some files.
- Stage chosen files.
"""
if not message:
console.print(
"[red]No commit message provided. Exiting.[/red]",
file=sys.stderr,
)
raise typer.Exit(1)
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
worktrees: List[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
@ -935,14 +978,14 @@ def wip_workspace(
continue
# Commit
code, _out, err = run_cmd(["git", "commit", "-m", commit_message], cwd=wt)
code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt)
if code != 0:
console.print(
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
)
else:
console.print(
f" [green]Created WIP commit in {wt.name}:[/green] '{commit_message}'"
f" [green]Created commit in {wt.name}:[/green] '{message}'"
)
@ -1050,8 +1093,12 @@ def status_workspace(
title, desc = read_workspace_readme(ws_dir)
# if desc:
# console.print(Panel(desc, title=title))
table = Table(
title=f"Status for workspace '{title}' (dir: {ws_dir.name})",
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
title_justify="left",
show_lines=False,
)
table.add_column("Repo (dir)", style="bold")
@ -1059,6 +1106,11 @@ def status_workspace(
table.add_column("Ahead/Behind/Dirty")
table.add_column("Changed Files", justify="right")
files_table = Table(show_lines=False)
files_table.add_column("Repo (dir)", style="bold")
files_table.add_column("File", justify="left")
files_table.add_column("Status", justify="right")
worktrees: List[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
@ -1082,10 +1134,304 @@ def status_workspace(
str(len(changes)),
)
if changes:
for f, s in changes:
files_table.add_row(wt.name, f, s)
console.print(table)
if desc:
console.print(Panel(desc, title="Workspace description"))
console.print("\nFiles with changes:")
if files_table.rows:
console.print(files_table)
@app.command("diff")
def diff_workspace(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to diff. "
"If omitted, uses the workspace containing the current directory."
),
),
staged: bool = typer.Option(
False,
"--staged",
"-s",
help="Show only staged changes (git diff --cached).",
),
):
"""
Show git diff for all repos in the workspace.
Uses rich Syntax highlighter for diffs.
"""
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir.exists():
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
title, _desc = read_workspace_readme(ws_dir)
console.print(
f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})"
)
# Collect repos
worktrees: List[Path] = []
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
if child.name.lower() == "readme.md":
continue
if not ensure_git_repo(child):
continue
worktrees.append(child)
if not worktrees:
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
raise typer.Exit(0)
any_diffs = False
for wt in worktrees:
# Skip clean repos
changes = git_status_porcelain(wt)
if not changes:
continue
any_diffs = True
console.rule(f"[bold]{wt.name}[/bold]")
cmd = ["git", "diff"]
if staged:
cmd.append("--cached")
code, out, err = run_cmd(cmd, cwd=wt)
if code != 0:
console.print(
f"[red]Failed to get diff for {wt.name}:[/red]\n{err}"
)
continue
if not out.strip():
console.print("[dim]No diff output.[/dim]")
continue
syntax = Syntax(
out,
"diff",
theme="nord",
line_numbers=False,
word_wrap=False,
)
# Wrap each repo diff in a Panel for clarity
console.print(Panel(syntax, title=wt.name, border_style="cyan"))
if not any_diffs:
console.print("[green]No changes to diff in any repo.[/green]")
def not_in_tmux() -> bool:
"""Return True if not inside tmux or zellij."""
return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ")
def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess:
"""Run a tmux command."""
env = os.environ.copy()
if clear_tmux_env:
env["TMUX"] = ""
return subprocess.run(
["tmux", *args],
env=env,
check=False,
)
def session_exists(session_name: str) -> bool:
"""Check if the tmux session exists."""
result = run_tmux(["has-session", "-t", f"={session_name}"])
return result.returncode == 0
def pick_project(base_dir: Path) -> Optional[str]:
"""Use fzf to pick a subdirectory (project) from base_dir."""
if not base_dir.is_dir():
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
return None
subdirs = sorted(
[p.name for p in base_dir.iterdir() if p.is_dir()]
)
if not subdirs:
typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True)
return None
if not shutil.which("fzf"):
typer.echo("[ta] fzf not found in PATH.", err=True)
return None
proc = subprocess.run(
["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"],
input="\n".join(subdirs),
text=True,
capture_output=True,
)
if proc.returncode != 0:
# Cancelled or failed
return None
choice = proc.stdout.strip()
return choice or None
def create_detached_session(
session_name: str,
path_name: Path,
start_mode: bool,
) -> bool:
"""
Create a detached session.
- If start_mode: just a single window.
- Else: split layout with nvim on top.
"""
# Run tmux as if not in tmux (clear TMUX env)
if start_mode:
r = run_tmux(
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
clear_tmux_env=True,
)
return r.returncode == 0
else:
r = run_tmux(
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
clear_tmux_env=True,
)
if r.returncode != 0:
return False
r = run_tmux(
[
"split-window",
"-vb",
"-t",
session_name,
"-c",
str(path_name),
"-p",
"70",
],
clear_tmux_env=True,
)
if r.returncode != 0:
return False
r = run_tmux(
[
"send-keys",
"-t",
session_name,
"nvim '+Telescope find_files'",
"Enter",
],
clear_tmux_env=True,
)
return r.returncode == 0
def create_if_needed_and_attach(
session_name: str,
path_name: Path,
start_mode: bool,
) -> bool:
"""
Mimic the bash logic:
- If not in tmux: `tmux new-session -As ... -c path_name`
- Else:
- If session doesn't exist: create_detached_session()
- Then `tmux switch-client -t session_name`
"""
if not_in_tmux():
r = run_tmux(
["new-session", "-As", session_name, "-c", str(path_name)],
)
return r.returncode == 0
# Inside tmux
if not session_exists(session_name):
if not create_detached_session(session_name, path_name, start_mode):
return False
r = run_tmux(["switch-client", "-t", session_name])
return r.returncode == 0
def attach_to_first_session() -> None:
"""Fallback: attach to first tmux session and open choose-tree."""
# Attach to the first listed session
list_proc = subprocess.run(
["tmux", "list-sessions", "-F", "#{session_name}"],
text=True,
capture_output=True,
)
if list_proc.returncode != 0 or not list_proc.stdout.strip():
typer.echo("[ta] No tmux sessions found to attach to.", err=True)
raise typer.Exit(1)
first_session = list_proc.stdout.strip().splitlines()[0]
attach_proc = run_tmux(["attach-session", "-t", first_session])
if attach_proc.returncode != 0:
raise typer.Exit(attach_proc.returncode)
# After attach, show choose-tree (this will run in the attached session)
run_tmux(["choose-tree", "-Za"])
tmux_app = typer.Typer(
name="tmux",
help="tmux commands",
add_completion=False,
)
@tmux_app.command('attach')
def attach(
ctx: typer.Context,
workspace: Optional[str] = typer.Option(
None,
"--workspace",
"-w",
help=(
"Workspace directory name to show status for. "
"If omitted, uses the workspace containing the current directory."
),
),
):
"""
Attach or create a session for a repo in a workspace.
"""
# ws_dir = get_workspace_dir(workspace)
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
ws_dir = find_workspace_dir(workspaces_dir, workspace)
if not ws_dir:
console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]")
raise typer.Exit(1)
# pick repo in workspace
repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()])
if not repo:
console.print("[red]No repo selected. Exiting.[/red]")
raise typer.Exit(1)
print(ws_dir, repo)
session_name = f'{ws_dir.name}|{repo.name}'
print(f'{ws_dir.name}|{repo.name}')
create_if_needed_and_attach(session_name, repo, False)
app.add_typer(tmux_app)
if __name__ == "__main__":
app()