#!/usr/bin/env python3 import subprocess import json import time import multiprocessing import getpass import shlex from pathlib import Path class BaseTranscoder: def __init__(self, args, video_defaults, audio_defaults): self.input_path = Path(args.input).resolve() if not self.input_path.exists(): raise FileNotFoundError(f"Input file not found: {self.input_path}") self.vencoder = args.vencoder self.aencoder = args.aencoder self.run_metrics = args.metrics self.notify_mode = args.notify self.v_params = ( shlex.split(args.vargs) if args.vargs else video_defaults.get(self.vencoder, []) ) self.a_params = ( shlex.split(args.aargs) if args.aargs else audio_defaults.get(self.aencoder, []) ) filters = [] if getattr(args, "res", None): filters.append(f"scale={args.res}") if getattr(args, "fps", None): filters.append(f"fps={args.fps}") self.vf_string = ",".join(filters) if filters else "" self.out_dir = ( Path(args.outdir).resolve() if args.outdir else self.input_path.parent ) self.out_dir.mkdir(parents=True, exist_ok=True) self.basename = self.input_path.stem self.output_path = self.out_dir / f"{self.basename}.{self.vencoder}.mp4" self.log_path = self.out_dir / f"{self.basename}.{self.vencoder}.log" self.vmaf_log_path = self.out_dir / f"{self.basename}.{self.vencoder}.vmaf.json" self.temp_files = [] self.orig_info = None def get_media_info(self, file_path: Path): cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(file_path), ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return None data = json.loads(result.stdout) video_stream = next( (s for s in data.get("streams", []) if s["codec_type"] == "video"), None ) if not video_stream: return None duration = float(data.get("format", {}).get("duration", 0)) bitrate = int(data.get("format", {}).get("bit_rate", 0)) fps_str = video_stream.get("r_frame_rate", "0/1") nb_frames = video_stream.get("nb_frames") if nb_frames: frame_count = int(nb_frames) else: num, den = map(int, fps_str.split("/")) fps = num / den if den != 0 else 0 frame_count = int(duration * fps) return { "codec": video_stream.get("codec_name"), "bitrate_kbps": bitrate / 1000, "frame_count": frame_count, "fps_str": fps_str, "width": video_stream.get("width"), "height": video_stream.get("height"), } def _run_vmaf_psnr(self, new_info): target_samples = 500 n_subsample = max(1, (new_info.get("frame_count", 0) if new_info else 0) // target_samples) threads = multiprocessing.cpu_count() fps = new_info.get("fps_str", "0/1") if new_info else "0/1" print( f"\n[Metrics] Starting VMAF/PSNR test (Subsample interval: {n_subsample}, Threads: {threads}, Forced fps: {fps})..." ) out_w = new_info.get("width") if new_info else None out_h = new_info.get("height") if new_info else None orig_w = self.orig_info.get("width") if self.orig_info else None orig_h = self.orig_info.get("height") if self.orig_info else None orig_fps = new_info.get("fps_str", "0/1") if new_info else "0/1" out_fps = new_info.get("fps_str", "0/1") if new_info else "0/1" # Apply identical scaling and framerate conversion to the reference video filters_1 = [] if out_fps != orig_fps: filters_1.append(f"fps={out_fps}") if out_w and out_h and orig_w and orig_h and (out_w != orig_w or out_h != orig_h): filters_1.append(f"scale={out_w}:{out_h}:flags=bicubic") filters_1.append("setpts=PTS-STARTPTS") filter_1_str = ",".join(filters_1) vmaf_filter = ( f"[0:v]setpts=PTS-STARTPTS[dist];[1:v]{filter_1_str}[ref];" f"[dist][ref]libvmaf=feature='name=psnr':log_path='{self.vmaf_log_path.as_posix()}':" f"log_fmt=json:n_subsample={n_subsample}:n_threads={threads}" ) cmd = [ "ffmpeg", "-y", "-i", str(self.output_path), "-i", str(self.input_path), "-lavfi", vmaf_filter, "-f", "null", "-", ] subprocess.run(cmd, check=False) try: with open(self.vmaf_log_path, "r", encoding="utf-8") as f: data = json.load(f).get("pooled_metrics", {}) vmaf = ( round(data.get("vmaf", {}).get("mean", 0), 2) if "vmaf" in data else "N/A" ) psnr = ( round(data.get("psnr_y", {}).get("mean", 0), 2) if "psnr_y" in data else "N/A" ) return vmaf, psnr except Exception as e: print(f"[Metrics Error] Failed to parse VMAF log: {e}") return "N/A", "N/A" def _generate_log_content(self, encode_time, new_info, vmaf, psnr, extra_info=""): size_ratio = ( self.output_path.stat().st_size / self.input_path.stat().st_size ) * 100 content = ( f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Input: {self.input_path}\n" f"Output: {self.output_path}\n" f"{extra_info}" f"Video encoder: {self.vencoder} ({' '.join(self.v_params)})\n" f"Audio encoder: {self.aencoder} ({' '.join(self.a_params)})\n" f"Time elapsed: {encode_time:.2f} s\n" f"[Metadata] {self.orig_info.get('codec','') if self.orig_info else ''} -> {new_info.get('codec','') if new_info else ''} | {self.orig_info.get('bitrate_kbps',0) if self.orig_info else 0:.0f} kbps -> {new_info.get('bitrate_kbps',0) if new_info else 0:.0f} kbps\n" f"[Size ratio] {size_ratio:.1f}%\n" ) if self.run_metrics: content += f"[Quality] VMAF: {vmaf} | PSNR(Y): {psnr}\n" content += "-" * 50 + "\n" return content def _send_notification( self, encode_time: float = 0.0, new_info=None, vmaf="N/A", full_text="", error=None, extra_title="" ): if self.notify_mode == "none": return subject = f"{extra_title}Task {'failed' if error else 'completed'}: {self.input_path.name}" if self.notify_mode == "mail": body = error if error else full_text try: subprocess.run( ["mail", "-s", subject, getpass.getuser()], input=body.encode("utf-8"), check=True, ) except Exception as e: print(f"[Notify Error] Failed to send email: {e}") elif self.notify_mode == "notify": if error: body = error else: body = f"Encoder: {self.vencoder}\nTime elapsed: {encode_time:.1f}s" if self.run_metrics: body += f"\nVMAF: {vmaf}" try: subprocess.run(["notify-send", subject, body], check=True) except Exception as e: print(f"[Notify Error] DBus Failed to send notification: {e}") def _cleanup(self): for temp_file in self.temp_files: if temp_file.exists(): temp_file.unlink()