#!/usr/bin/env -S uv run --quiet --script # /// script # requires-python = ">=3.12" # dependencies = [ # "typer", # "sqlite-utils", # "rich", # ] # /// import os import json import subprocess from pathlib import Path import typer import sqlite_utils from rich import print from rich.table import Table from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn app = typer.Typer(no_args_is_help=True) DB_PATH = Path("media_inventory.db") MEDIA_ROOT = Path("/mnt/media") VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm"} def get_media_info(file_path: Path) -> dict | None: try: result = subprocess.run( ["ffprobe", "-v", "error", "-print_format", "json", "-show_format", "-show_streams", str(file_path)], capture_output=True, text=True, check=True, ) return json.loads(result.stdout) except subprocess.CalledProcessError: print(f"[red]Error reading file:[/red] {file_path}") return None def needs_transcoding(info: dict) -> bool: video_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "video"] audio_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "audio"] if not video_streams or not audio_streams: return True video_codec = video_streams[0].get("codec_name", "") if video_codec not in {"h264", "hevc"}: return True format_name = info.get("format", {}).get("format_name", "") if "matroska" in format_name: return True return False @app.command() def scan(refresh: bool = typer.Option(False, "--refresh", help="Re-scan all files from scratch.")): """ Scan for video files and store their metadata in a database. """ db = sqlite_utils.Database(DB_PATH) table = db["media_files"] table.create({ "path": str, "needs_transcode": bool, "format": str, "video_codec": str, "audio_codec": str, "duration": float, "size": int, }, pk="path", if_not_exists=True) print("[bold cyan]Scanning for video files...[/bold cyan]") all_video_files = [ Path(root) / name for root, _, files in os.walk(MEDIA_ROOT) for name in files if Path(name).suffix.lower() in VIDEO_EXTENSIONS ] with Progress( SpinnerColumn(), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeElapsedColumn(), TimeRemainingColumn(), TextColumn("{task.description}"), transient=True, ) as progress: task = progress.add_task("[green]Processing media files...", total=len(all_video_files)) for path in all_video_files: if not refresh: existing = list(table.rows_where("path = ?", [str(path)])) if existing: progress.update(task, advance=1) continue info = get_media_info(path) if not info: progress.update(task, advance=1) continue video_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "video"] audio_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "audio"] format_name = info.get("format", {}).get("format_name", "") video_codec = video_streams[0].get("codec_name", "") if video_streams else "" audio_codec = audio_streams[0].get("codec_name", "") if audio_streams else "" duration = float(info.get("format", {}).get("duration", 0)) size = int(info.get("format", {}).get("size", 0)) table.upsert({ "path": str(path), "needs_transcode": needs_transcoding(info), "format": format_name, "video_codec": video_codec, "audio_codec": audio_codec, "duration": duration, "size": size, }, pk="path") progress.update(task, advance=1) print("[bold green]Scan complete.[/bold green]") @app.command() def analyze(): """ Analyze the database for files that need transcoding. """ db = sqlite_utils.Database(DB_PATH) table = db["media_files"] results = list(table.rows_where("needs_transcode = 1")) table_display = Table(title="Files Needing Transcode") table_display.add_column("Path", style="cyan") table_display.add_column("Video Codec", style="green") table_display.add_column("Audio Codec", style="green") table_display.add_column("Format", style="yellow") for row in results: table_display.add_row( row["path"], row["video_codec"], row["audio_codec"], row["format"] ) print(table_display) if __name__ == "__main__": app()