Compare commits
4 commits
575e51eaa9
...
42d034eef2
| Author | SHA1 | Date | |
|---|---|---|---|
| 42d034eef2 | |||
| 0ee5711dcd | |||
| 4698450200 | |||
| d5d081f743 |
1 changed files with 354 additions and 8 deletions
362
workspaces.py
362
workspaces.py
|
|
@ -10,6 +10,7 @@
|
|||
# ]
|
||||
# ///
|
||||
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
|
@ -18,13 +19,17 @@ from dataclasses import dataclass
|
|||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
|
||||
import typer
|
||||
|
||||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.prompt import Prompt, Confirm
|
||||
from rich.panel import Panel
|
||||
from rich.syntax import Syntax
|
||||
from rich.panel import Panel
|
||||
from iterfzf import iterfzf
|
||||
|
||||
app = typer.Typer(
|
||||
|
|
@ -233,7 +238,7 @@ def read_workspace_readme(ws_dir: Path) -> Tuple[str, str]:
|
|||
return ws_dir.name, ""
|
||||
|
||||
# First non-empty line must be '# ...' per spec
|
||||
first_non_empty_idx = next((i for i, l in enumerate(lines) if l.strip()), None)
|
||||
first_non_empty_idx = next((i for i, line in enumerate(lines) if line.strip()), None)
|
||||
if first_non_empty_idx is None:
|
||||
return ws_dir.name, ""
|
||||
|
||||
|
|
@ -865,7 +870,6 @@ def choose_files_for_wip(repo_path: Path, changes: List[Tuple[str, str]]) -> Lis
|
|||
|
||||
return selected_files
|
||||
|
||||
|
||||
@app.command("wip")
|
||||
def wip_workspace(
|
||||
ctx: typer.Context,
|
||||
|
|
@ -892,9 +896,48 @@ def wip_workspace(
|
|||
if not ws_dir.exists():
|
||||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
title, _desc = read_workspace_readme(ws_dir)
|
||||
commit_message = f"wip: {title or ws_dir.name}"
|
||||
commit_workspace(ctx, workspace, message=commit_message)
|
||||
|
||||
@app.command("commit")
|
||||
def commit_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
help=(
|
||||
"Workspace directory name to WIP-commit. "
|
||||
"If omitted, uses the workspace containing the current directory."
|
||||
),
|
||||
),
|
||||
message: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--message",
|
||||
"-m",
|
||||
help="Commit message to use.",
|
||||
),
|
||||
):
|
||||
"""
|
||||
For each repo in the workspace:
|
||||
|
||||
- Show list of changed files.
|
||||
- Ask whether to stage all, none, or pick some files.
|
||||
- Stage chosen files.
|
||||
"""
|
||||
if not message:
|
||||
console.print(
|
||||
"[red]No commit message provided. Exiting.[/red]",
|
||||
file=sys.stderr,
|
||||
)
|
||||
raise typer.Exit(1)
|
||||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||
if not ws_dir.exists():
|
||||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||
raise typer.Exit(1)
|
||||
title, _desc = read_workspace_readme(ws_dir)
|
||||
|
||||
worktrees: List[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
|
|
@ -935,14 +978,14 @@ def wip_workspace(
|
|||
continue
|
||||
|
||||
# Commit
|
||||
code, _out, err = run_cmd(["git", "commit", "-m", commit_message], cwd=wt)
|
||||
code, _out, err = run_cmd(["git", "commit", "-m", message], cwd=wt)
|
||||
if code != 0:
|
||||
console.print(
|
||||
f"[red]Failed to commit in {wt.name}:[/red]\n{err}"
|
||||
)
|
||||
else:
|
||||
console.print(
|
||||
f" [green]Created WIP commit in {wt.name}:[/green] '{commit_message}'"
|
||||
f" [green]Created commit in {wt.name}:[/green] '{message}'"
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -1050,8 +1093,12 @@ def status_workspace(
|
|||
|
||||
title, desc = read_workspace_readme(ws_dir)
|
||||
|
||||
# if desc:
|
||||
# console.print(Panel(desc, title=title))
|
||||
|
||||
table = Table(
|
||||
title=f"Status for workspace '{title}' (dir: {ws_dir.name})",
|
||||
title=f"Status for workspace:'{title}'\n(dir: {ws_dir.name})\n\n{desc}",
|
||||
title_justify="left",
|
||||
show_lines=False,
|
||||
)
|
||||
table.add_column("Repo (dir)", style="bold")
|
||||
|
|
@ -1059,6 +1106,11 @@ def status_workspace(
|
|||
table.add_column("Ahead/Behind/Dirty")
|
||||
table.add_column("Changed Files", justify="right")
|
||||
|
||||
files_table = Table(show_lines=False)
|
||||
files_table.add_column("Repo (dir)", style="bold")
|
||||
files_table.add_column("File", justify="left")
|
||||
files_table.add_column("Status", justify="right")
|
||||
|
||||
worktrees: List[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
|
|
@ -1082,10 +1134,304 @@ def status_workspace(
|
|||
str(len(changes)),
|
||||
)
|
||||
|
||||
if changes:
|
||||
for f, s in changes:
|
||||
files_table.add_row(wt.name, f, s)
|
||||
|
||||
console.print(table)
|
||||
|
||||
if desc:
|
||||
console.print(Panel(desc, title="Workspace description"))
|
||||
console.print("\nFiles with changes:")
|
||||
if files_table.rows:
|
||||
console.print(files_table)
|
||||
|
||||
@app.command("diff")
|
||||
def diff_workspace(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
help=(
|
||||
"Workspace directory name to diff. "
|
||||
"If omitted, uses the workspace containing the current directory."
|
||||
),
|
||||
),
|
||||
staged: bool = typer.Option(
|
||||
False,
|
||||
"--staged",
|
||||
"-s",
|
||||
help="Show only staged changes (git diff --cached).",
|
||||
),
|
||||
):
|
||||
"""
|
||||
Show git diff for all repos in the workspace.
|
||||
Uses rich Syntax highlighter for diffs.
|
||||
"""
|
||||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||
|
||||
if not ws_dir.exists():
|
||||
console.print(f"[red]Workspace '{ws_dir.name}' does not exist at {ws_dir}[/red]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
title, _desc = read_workspace_readme(ws_dir)
|
||||
console.print(
|
||||
f"Showing diffs for workspace [bold]{title or ws_dir.name}[/bold] (dir: {ws_dir.name})"
|
||||
)
|
||||
|
||||
# Collect repos
|
||||
worktrees: List[Path] = []
|
||||
for child in sorted(p for p in ws_dir.iterdir() if p.is_dir()):
|
||||
if child.name.lower() == "readme.md":
|
||||
continue
|
||||
if not ensure_git_repo(child):
|
||||
continue
|
||||
worktrees.append(child)
|
||||
|
||||
if not worktrees:
|
||||
console.print(f"[yellow]No repos in workspace {ws_dir.name}[/yellow]")
|
||||
raise typer.Exit(0)
|
||||
|
||||
any_diffs = False
|
||||
|
||||
for wt in worktrees:
|
||||
# Skip clean repos
|
||||
changes = git_status_porcelain(wt)
|
||||
if not changes:
|
||||
continue
|
||||
|
||||
any_diffs = True
|
||||
|
||||
console.rule(f"[bold]{wt.name}[/bold]")
|
||||
|
||||
cmd = ["git", "diff"]
|
||||
if staged:
|
||||
cmd.append("--cached")
|
||||
|
||||
code, out, err = run_cmd(cmd, cwd=wt)
|
||||
if code != 0:
|
||||
console.print(
|
||||
f"[red]Failed to get diff for {wt.name}:[/red]\n{err}"
|
||||
)
|
||||
continue
|
||||
|
||||
if not out.strip():
|
||||
console.print("[dim]No diff output.[/dim]")
|
||||
continue
|
||||
|
||||
syntax = Syntax(
|
||||
out,
|
||||
"diff",
|
||||
theme="nord",
|
||||
line_numbers=False,
|
||||
word_wrap=False,
|
||||
)
|
||||
|
||||
# Wrap each repo diff in a Panel for clarity
|
||||
console.print(Panel(syntax, title=wt.name, border_style="cyan"))
|
||||
|
||||
if not any_diffs:
|
||||
console.print("[green]No changes to diff in any repo.[/green]")
|
||||
|
||||
def not_in_tmux() -> bool:
|
||||
"""Return True if not inside tmux or zellij."""
|
||||
return not os.environ.get("TMUX") and not os.environ.get("ZELLIJ")
|
||||
|
||||
|
||||
def run_tmux(args: list[str], *, clear_tmux_env: bool = False) -> subprocess.CompletedProcess:
|
||||
"""Run a tmux command."""
|
||||
env = os.environ.copy()
|
||||
if clear_tmux_env:
|
||||
env["TMUX"] = ""
|
||||
return subprocess.run(
|
||||
["tmux", *args],
|
||||
env=env,
|
||||
check=False,
|
||||
)
|
||||
|
||||
|
||||
def session_exists(session_name: str) -> bool:
|
||||
"""Check if the tmux session exists."""
|
||||
result = run_tmux(["has-session", "-t", f"={session_name}"])
|
||||
return result.returncode == 0
|
||||
|
||||
|
||||
def pick_project(base_dir: Path) -> Optional[str]:
|
||||
"""Use fzf to pick a subdirectory (project) from base_dir."""
|
||||
if not base_dir.is_dir():
|
||||
typer.echo(f"[ta] {base_dir} is not a directory.", err=True)
|
||||
return None
|
||||
|
||||
subdirs = sorted(
|
||||
[p.name for p in base_dir.iterdir() if p.is_dir()]
|
||||
)
|
||||
if not subdirs:
|
||||
typer.echo(f"[ta] No subdirectories found in {base_dir}.", err=True)
|
||||
return None
|
||||
|
||||
if not shutil.which("fzf"):
|
||||
typer.echo("[ta] fzf not found in PATH.", err=True)
|
||||
return None
|
||||
|
||||
proc = subprocess.run(
|
||||
["fzf", "--reverse", f"--header=Select project from {base_dir.name} >"],
|
||||
input="\n".join(subdirs),
|
||||
text=True,
|
||||
capture_output=True,
|
||||
)
|
||||
|
||||
if proc.returncode != 0:
|
||||
# Cancelled or failed
|
||||
return None
|
||||
|
||||
choice = proc.stdout.strip()
|
||||
return choice or None
|
||||
|
||||
|
||||
def create_detached_session(
|
||||
session_name: str,
|
||||
path_name: Path,
|
||||
start_mode: bool,
|
||||
) -> bool:
|
||||
"""
|
||||
Create a detached session.
|
||||
|
||||
- If start_mode: just a single window.
|
||||
- Else: split layout with nvim on top.
|
||||
"""
|
||||
# Run tmux as if not in tmux (clear TMUX env)
|
||||
if start_mode:
|
||||
r = run_tmux(
|
||||
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
||||
clear_tmux_env=True,
|
||||
)
|
||||
return r.returncode == 0
|
||||
else:
|
||||
r = run_tmux(
|
||||
["new-session", "-Ad", "-s", session_name, "-c", str(path_name)],
|
||||
clear_tmux_env=True,
|
||||
)
|
||||
if r.returncode != 0:
|
||||
return False
|
||||
|
||||
r = run_tmux(
|
||||
[
|
||||
"split-window",
|
||||
"-vb",
|
||||
"-t",
|
||||
session_name,
|
||||
"-c",
|
||||
str(path_name),
|
||||
"-p",
|
||||
"70",
|
||||
],
|
||||
clear_tmux_env=True,
|
||||
)
|
||||
if r.returncode != 0:
|
||||
return False
|
||||
|
||||
r = run_tmux(
|
||||
[
|
||||
"send-keys",
|
||||
"-t",
|
||||
session_name,
|
||||
"nvim '+Telescope find_files'",
|
||||
"Enter",
|
||||
],
|
||||
clear_tmux_env=True,
|
||||
)
|
||||
return r.returncode == 0
|
||||
|
||||
|
||||
def create_if_needed_and_attach(
|
||||
session_name: str,
|
||||
path_name: Path,
|
||||
start_mode: bool,
|
||||
) -> bool:
|
||||
"""
|
||||
Mimic the bash logic:
|
||||
|
||||
- If not in tmux: `tmux new-session -As ... -c path_name`
|
||||
- Else:
|
||||
- If session doesn't exist: create_detached_session()
|
||||
- Then `tmux switch-client -t session_name`
|
||||
"""
|
||||
if not_in_tmux():
|
||||
r = run_tmux(
|
||||
["new-session", "-As", session_name, "-c", str(path_name)],
|
||||
)
|
||||
return r.returncode == 0
|
||||
|
||||
# Inside tmux
|
||||
if not session_exists(session_name):
|
||||
if not create_detached_session(session_name, path_name, start_mode):
|
||||
return False
|
||||
|
||||
r = run_tmux(["switch-client", "-t", session_name])
|
||||
return r.returncode == 0
|
||||
|
||||
|
||||
def attach_to_first_session() -> None:
|
||||
"""Fallback: attach to first tmux session and open choose-tree."""
|
||||
# Attach to the first listed session
|
||||
list_proc = subprocess.run(
|
||||
["tmux", "list-sessions", "-F", "#{session_name}"],
|
||||
text=True,
|
||||
capture_output=True,
|
||||
)
|
||||
if list_proc.returncode != 0 or not list_proc.stdout.strip():
|
||||
typer.echo("[ta] No tmux sessions found to attach to.", err=True)
|
||||
raise typer.Exit(1)
|
||||
|
||||
first_session = list_proc.stdout.strip().splitlines()[0]
|
||||
|
||||
attach_proc = run_tmux(["attach-session", "-t", first_session])
|
||||
if attach_proc.returncode != 0:
|
||||
raise typer.Exit(attach_proc.returncode)
|
||||
|
||||
# After attach, show choose-tree (this will run in the attached session)
|
||||
run_tmux(["choose-tree", "-Za"])
|
||||
|
||||
tmux_app = typer.Typer(
|
||||
name="tmux",
|
||||
help="tmux commands",
|
||||
add_completion=False,
|
||||
)
|
||||
|
||||
@tmux_app.command('attach')
|
||||
def attach(
|
||||
ctx: typer.Context,
|
||||
workspace: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--workspace",
|
||||
"-w",
|
||||
help=(
|
||||
"Workspace directory name to show status for. "
|
||||
"If omitted, uses the workspace containing the current directory."
|
||||
),
|
||||
),
|
||||
):
|
||||
"""
|
||||
Attach or create a session for a repo in a workspace.
|
||||
"""
|
||||
# ws_dir = get_workspace_dir(workspace)
|
||||
_settings, _repos_dir, workspaces_dir = get_ctx_paths(ctx)
|
||||
ws_dir = find_workspace_dir(workspaces_dir, workspace)
|
||||
if not ws_dir:
|
||||
console.print(f"[red]Workspace '{workspace}' does not exist at {ws_dir}[/red]")
|
||||
raise typer.Exit(1)
|
||||
# pick repo in workspace
|
||||
repo = pick_repo_with_iterfzf([dir for dir in ws_dir.iterdir() if dir.is_dir()])
|
||||
if not repo:
|
||||
console.print("[red]No repo selected. Exiting.[/red]")
|
||||
raise typer.Exit(1)
|
||||
print(ws_dir, repo)
|
||||
session_name = f'{ws_dir.name}|{repo.name}'
|
||||
print(f'{ws_dir.name}|{repo.name}')
|
||||
create_if_needed_and_attach(session_name, repo, False)
|
||||
|
||||
|
||||
app.add_typer(tmux_app)
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue