#!/usr/bin/env python3 import subprocess import json import time import multiprocessing import getpass import shlex from pathlib import Path class BaseTranscoder: def __init__(self, args, video_defaults, audio_defaults): self.input_path = Path(args.input).resolve() if not self.input_path.exists(): raise FileNotFoundError(f"输入文件不存在: {self.input_path}") self.vencoder = args.vencoder self.aencoder = args.aencoder self.run_metrics = args.metrics self.notify_mode = args.notify self.v_params = ( shlex.split(args.vargs) if args.vargs else video_defaults.get(self.vencoder, []) ) self.a_params = ( shlex.split(args.aargs) if args.aargs else audio_defaults.get(self.aencoder, []) ) filters = [] if getattr(args, "res", None): filters.append(f"scale={args.res}") if getattr(args, "fps", None): filters.append(f"fps={args.fps}") self.vf_string = ",".join(filters) if filters else "" self.out_dir = ( Path(args.outdir).resolve() if args.outdir else self.input_path.parent ) self.out_dir.mkdir(parents=True, exist_ok=True) self.basename = self.input_path.stem self.output_path = self.out_dir / f"{self.basename}.{self.vencoder}.mp4" self.log_path = self.out_dir / f"{self.basename}.{self.vencoder}.log" self.vmaf_log_path = self.out_dir / f"{self.basename}.{self.vencoder}.vmaf.json" self.temp_files = [] self.orig_info = None def get_media_info(self, file_path: Path): cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(file_path), ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return None data = json.loads(result.stdout) video_stream = next( (s for s in data.get("streams", []) if s["codec_type"] == "video"), None ) if not video_stream: return None duration = float(data.get("format", {}).get("duration", 0)) bitrate = int(data.get("format", {}).get("bit_rate", 0)) fps_str = video_stream.get("r_frame_rate", "0/1") nb_frames = video_stream.get("nb_frames") if nb_frames: frame_count = int(nb_frames) else: num, den = map(int, fps_str.split("/")) fps = num / den if den != 0 else 0 frame_count = int(duration * fps) return { "codec": video_stream.get("codec_name"), "bitrate_kbps": bitrate / 1000, "frame_count": frame_count, "fps_str": fps_str, "width": video_stream.get("width"), "height": video_stream.get("height"), } def _run_vmaf_psnr(self, new_info): target_samples = 500 n_subsample = max(1, (self.orig_info.get("frame_count", 0) if self.orig_info else 0) // target_samples) threads = multiprocessing.cpu_count() fps = self.orig_info.get("fps_str", "0/1") if self.orig_info else "0/1" print( f"\n[Metrics] 启动 VMAF/PSNR 测试 (采样间隔: {n_subsample}, 线程数: {threads}, 强制帧率: {fps})..." ) out_w = new_info.get("width") if new_info else None out_h = new_info.get("height") if new_info else None orig_w = self.orig_info.get("width") if self.orig_info else None orig_h = self.orig_info.get("height") if self.orig_info else None orig_fps = self.orig_info.get("fps_str", "0/1") if self.orig_info else "0/1" out_fps = new_info.get("fps_str", "0/1") if new_info else "0/1" # 使用滤镜链对齐帧率、分辨率和首帧时间戳 filters_0 = [] if out_fps != orig_fps: filters_0.append(f"fps={orig_fps}") if out_w and out_h and orig_w and orig_h and (out_w != orig_w or out_h != orig_h): filters_0.append(f"scale={orig_w}:{orig_h}:flags=bicubic") filters_0.append("setpts=PTS-STARTPTS") filter_0_str = ",".join(filters_0) vmaf_filter = ( f"[0:v]{filter_0_str}[dist];[1:v]setpts=PTS-STARTPTS[ref];" f"[dist][ref]libvmaf=feature='name=psnr':log_path='{self.vmaf_log_path.as_posix()}':" f"log_fmt=json:n_subsample={n_subsample}:n_threads={threads}" ) cmd = [ "ffmpeg", "-y", "-i", str(self.output_path), "-i", str(self.input_path), "-lavfi", vmaf_filter, "-f", "null", "-", ] subprocess.run(cmd, check=False) try: with open(self.vmaf_log_path, "r", encoding="utf-8") as f: data = json.load(f).get("pooled_metrics", {}) vmaf = ( round(data.get("vmaf", {}).get("mean", 0), 2) if "vmaf" in data else "N/A" ) psnr = ( round(data.get("psnr_y", {}).get("mean", 0), 2) if "psnr_y" in data else "N/A" ) return vmaf, psnr except Exception as e: print(f"[Metrics Error] 无法解析 VMAF 日志: {e}") return "N/A", "N/A" def _generate_log_content(self, encode_time, new_info, vmaf, psnr, extra_info=""): size_ratio = ( self.output_path.stat().st_size / self.input_path.stat().st_size ) * 100 content = ( f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"输入: {self.input_path}\n" f"输出: {self.output_path}\n" f"{extra_info}" f"视频编码器: {self.vencoder} ({' '.join(self.v_params)})\n" f"音频编码器: {self.aencoder} ({' '.join(self.a_params)})\n" f"耗时: {encode_time:.2f} s\n" f"[元数据] {self.orig_info.get('codec','') if self.orig_info else ''} -> {new_info.get('codec','') if new_info else ''} | {self.orig_info.get('bitrate_kbps',0) if self.orig_info else 0:.0f} kbps -> {new_info.get('bitrate_kbps',0) if new_info else 0:.0f} kbps\n" f"[体积比] {size_ratio:.1f}%\n" ) if self.run_metrics: content += f"[质量] VMAF: {vmaf} | PSNR(Y): {psnr}\n" content += "-" * 50 + "\n" return content def _send_notification( self, encode_time: float = 0.0, new_info=None, vmaf="N/A", full_text="", error=None, extra_title="" ): if self.notify_mode == "none": return subject = f"{extra_title}任务{'失败' if error else '完成'}: {self.input_path.name}" if self.notify_mode == "mail": body = error if error else full_text try: subprocess.run( ["mail", "-s", subject, getpass.getuser()], input=body.encode("utf-8"), check=True, ) except Exception as e: print(f"[Notify Error] 邮件发送失败: {e}") elif self.notify_mode == "notify": if error: body = error else: body = f"编码器: {self.vencoder}\n耗时: {encode_time:.1f}s" if self.run_metrics: body += f"\nVMAF: {vmaf}" try: subprocess.run(["notify-send", subject, body], check=True) except Exception as e: print(f"[Notify Error] DBus 通知发送失败: {e}") def _cleanup(self): for temp_file in self.temp_files: if temp_file.exists(): temp_file.unlink()