From b1a74f24ae64c2f7fdf1b0c1f61bf7eea298bd6b Mon Sep 17 00:00:00 2001 From: "Waylon S. Walker" Date: Sat, 11 Oct 2025 19:13:21 -0500 Subject: [PATCH] init --- latest_yt_poster.py | 70 ++++++++++++++++++++ main.py | 151 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) create mode 100755 latest_yt_poster.py create mode 100755 main.py diff --git a/latest_yt_poster.py b/latest_yt_poster.py new file mode 100755 index 0000000..ab20e98 --- /dev/null +++ b/latest_yt_poster.py @@ -0,0 +1,70 @@ +#!/usr/bin/env -S uv run --quiet --script +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "rich", +# "Pillow", +# ] +# /// +import os, glob, shutil +from rich import print + + +base_dir = "/mnt/media/youtube" + +for channel_dir in os.listdir(base_dir): + full_path = os.path.join(base_dir, channel_dir) + if not os.path.isdir(full_path): + continue + + # find all video thumbnails (jpg/png) + thumbs = sorted( + glob.glob(os.path.join(full_path, "**/*.jpg")) + + glob.glob(os.path.join(full_path, "**/*.png")), + key=os.path.getmtime, + reverse=True + ) + + if not thumbs: + print(f"No thumbnails found for {channel_dir}") + continue + + latest_thumb = thumbs[0] + poster_path = os.path.join(full_path, "poster.jpg") + + if not os.path.exists(poster_path) or os.path.getmtime(latest_thumb) > os.path.getmtime(poster_path): + shutil.copy2(latest_thumb, poster_path) + print(f"Set poster for {channel_dir} → {os.path.basename(latest_thumb)}") + +## Grid collage +from PIL import Image +import glob, os, random + +# base_dir = "/mnt/main/media/YouTube" +poster_path = os.path.join(base_dir, "poster.jpg") + +# Collect recent thumbnails from subfolders +thumbs = [] +# for root, dirs, files in os.walk(base_dir): +# for f in files: +# if f.lower().endswith((".jpg", ".png")) and "poster" not in f: +# thumbs.append(os.path.join(root, f)) +# thumbs = sorted(thumbs, key=os.path.getmtime, reverse=True)[:50] +thumbs = sorted(glob.glob(os.path.join(base_dir, "**/*.jpg")) + glob.glob(os.path.join(base_dir, "**/*.png")), key=os.path.getmtime, reverse=True)[:50] +random.shuffle(thumbs) + +if not thumbs: + raise SystemExit("No thumbnails found") + +# Make a 5x5 grid collage (adjust as desired) +rows, cols = 2, 2 +tile_size = 256 +grid = Image.new("RGB", (cols*tile_size, rows*tile_size)) + +for i, thumb in enumerate(thumbs[:rows*cols]): + img = Image.open(thumb).convert("RGB").resize((tile_size, tile_size)) + r, c = divmod(i, cols) + grid.paste(img, (c*tile_size, r*tile_size)) + +grid.save(poster_path, quality=85) +print(f"Created {poster_path}") diff --git a/main.py b/main.py new file mode 100755 index 0000000..dcf426c --- /dev/null +++ b/main.py @@ -0,0 +1,151 @@ +#!/usr/bin/env -S uv run --quiet --script +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "typer", +# "sqlite-utils", +# "rich", +# ] +# /// + +import os +import json +import subprocess +from pathlib import Path +import typer +import sqlite_utils +from rich import print +from rich.table import Table +from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn + +app = typer.Typer(no_args_is_help=True) +DB_PATH = Path("media_inventory.db") +MEDIA_ROOT = Path("/mnt/media") +VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm"} + +def get_media_info(file_path: Path) -> dict | None: + try: + result = subprocess.run( + ["ffprobe", "-v", "error", "-print_format", "json", "-show_format", "-show_streams", str(file_path)], + capture_output=True, + text=True, + check=True, + ) + return json.loads(result.stdout) + except subprocess.CalledProcessError: + print(f"[red]Error reading file:[/red] {file_path}") + return None + +def needs_transcoding(info: dict) -> bool: + video_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "video"] + audio_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "audio"] + if not video_streams or not audio_streams: + return True + + video_codec = video_streams[0].get("codec_name", "") + if video_codec not in {"h264", "hevc"}: + return True + + format_name = info.get("format", {}).get("format_name", "") + if "matroska" in format_name: + return True + + return False + +@app.command() +def scan(refresh: bool = typer.Option(False, "--refresh", help="Re-scan all files from scratch.")): + """ + Scan for video files and store their metadata in a database. + """ + db = sqlite_utils.Database(DB_PATH) + table = db["media_files"] + table.create({ + "path": str, + "needs_transcode": bool, + "format": str, + "video_codec": str, + "audio_codec": str, + "duration": float, + "size": int, + }, pk="path", if_not_exists=True) + + print("[bold cyan]Scanning for video files...[/bold cyan]") + all_video_files = [ + Path(root) / name + for root, _, files in os.walk(MEDIA_ROOT) + for name in files + if Path(name).suffix.lower() in VIDEO_EXTENSIONS + ] + + with Progress( + SpinnerColumn(), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + TimeRemainingColumn(), + TextColumn("{task.description}"), + transient=True, + ) as progress: + task = progress.add_task("[green]Processing media files...", total=len(all_video_files)) + + for path in all_video_files: + if not refresh: + existing = list(table.rows_where("path = ?", [str(path)])) + if existing: + progress.update(task, advance=1) + continue + + info = get_media_info(path) + if not info: + progress.update(task, advance=1) + continue + + video_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "video"] + audio_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "audio"] + format_name = info.get("format", {}).get("format_name", "") + video_codec = video_streams[0].get("codec_name", "") if video_streams else "" + audio_codec = audio_streams[0].get("codec_name", "") if audio_streams else "" + duration = float(info.get("format", {}).get("duration", 0)) + size = int(info.get("format", {}).get("size", 0)) + + table.upsert({ + "path": str(path), + "needs_transcode": needs_transcoding(info), + "format": format_name, + "video_codec": video_codec, + "audio_codec": audio_codec, + "duration": duration, + "size": size, + }, pk="path") + + progress.update(task, advance=1) + + print("[bold green]Scan complete.[/bold green]") + +@app.command() +def analyze(): + """ + Analyze the database for files that need transcoding. + """ + db = sqlite_utils.Database(DB_PATH) + table = db["media_files"] + results = list(table.rows_where("needs_transcode = 1")) + + table_display = Table(title="Files Needing Transcode") + table_display.add_column("Path", style="cyan") + table_display.add_column("Video Codec", style="green") + table_display.add_column("Audio Codec", style="green") + table_display.add_column("Format", style="yellow") + + for row in results: + table_display.add_row( + row["path"], + row["video_codec"], + row["audio_codec"], + row["format"] + ) + + print(table_display) + +if __name__ == "__main__": + app()