media-manager/main.py
2025-10-11 19:13:21 -05:00

151 lines
4.8 KiB
Python
Executable file

#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "typer",
# "sqlite-utils",
# "rich",
# ]
# ///
import os
import json
import subprocess
from pathlib import Path
import typer
import sqlite_utils
from rich import print
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn
app = typer.Typer(no_args_is_help=True)
DB_PATH = Path("media_inventory.db")
MEDIA_ROOT = Path("/mnt/media")
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm"}
def get_media_info(file_path: Path) -> dict | None:
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-print_format", "json", "-show_format", "-show_streams", str(file_path)],
capture_output=True,
text=True,
check=True,
)
return json.loads(result.stdout)
except subprocess.CalledProcessError:
print(f"[red]Error reading file:[/red] {file_path}")
return None
def needs_transcoding(info: dict) -> bool:
video_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "video"]
audio_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "audio"]
if not video_streams or not audio_streams:
return True
video_codec = video_streams[0].get("codec_name", "")
if video_codec not in {"h264", "hevc"}:
return True
format_name = info.get("format", {}).get("format_name", "")
if "matroska" in format_name:
return True
return False
@app.command()
def scan(refresh: bool = typer.Option(False, "--refresh", help="Re-scan all files from scratch.")):
"""
Scan for video files and store their metadata in a database.
"""
db = sqlite_utils.Database(DB_PATH)
table = db["media_files"]
table.create({
"path": str,
"needs_transcode": bool,
"format": str,
"video_codec": str,
"audio_codec": str,
"duration": float,
"size": int,
}, pk="path", if_not_exists=True)
print("[bold cyan]Scanning for video files...[/bold cyan]")
all_video_files = [
Path(root) / name
for root, _, files in os.walk(MEDIA_ROOT)
for name in files
if Path(name).suffix.lower() in VIDEO_EXTENSIONS
]
with Progress(
SpinnerColumn(),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
TextColumn("{task.description}"),
transient=True,
) as progress:
task = progress.add_task("[green]Processing media files...", total=len(all_video_files))
for path in all_video_files:
if not refresh:
existing = list(table.rows_where("path = ?", [str(path)]))
if existing:
progress.update(task, advance=1)
continue
info = get_media_info(path)
if not info:
progress.update(task, advance=1)
continue
video_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "video"]
audio_streams = [s for s in info.get("streams", []) if s.get("codec_type") == "audio"]
format_name = info.get("format", {}).get("format_name", "")
video_codec = video_streams[0].get("codec_name", "") if video_streams else ""
audio_codec = audio_streams[0].get("codec_name", "") if audio_streams else ""
duration = float(info.get("format", {}).get("duration", 0))
size = int(info.get("format", {}).get("size", 0))
table.upsert({
"path": str(path),
"needs_transcode": needs_transcoding(info),
"format": format_name,
"video_codec": video_codec,
"audio_codec": audio_codec,
"duration": duration,
"size": size,
}, pk="path")
progress.update(task, advance=1)
print("[bold green]Scan complete.[/bold green]")
@app.command()
def analyze():
"""
Analyze the database for files that need transcoding.
"""
db = sqlite_utils.Database(DB_PATH)
table = db["media_files"]
results = list(table.rows_where("needs_transcode = 1"))
table_display = Table(title="Files Needing Transcode")
table_display.add_column("Path", style="cyan")
table_display.add_column("Video Codec", style="green")
table_display.add_column("Audio Codec", style="green")
table_display.add_column("Format", style="yellow")
for row in results:
table_display.add_row(
row["path"],
row["video_codec"],
row["audio_codec"],
row["format"]
)
print(table_display)
if __name__ == "__main__":
app()