commit 6097317d256e02d60906f7204c6f074656207dff Author: Uyanide Date: Wed Mar 18 23:03:59 2026 +0100 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..505a3b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# Virtual environments +.venv diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..24ee5b1 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/core.py b/core.py new file mode 100644 index 0000000..f219b96 --- /dev/null +++ b/core.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +import subprocess +import json +import time +import multiprocessing +import getpass +import shlex +from pathlib import Path + +class BaseTranscoder: + def __init__(self, args, video_defaults, audio_defaults): + self.input_path = Path(args.input).resolve() + if not self.input_path.exists(): + raise FileNotFoundError(f"输入文件不存在: {self.input_path}") + + self.vencoder = args.vencoder + self.aencoder = args.aencoder + self.run_metrics = args.metrics + self.notify_mode = args.notify + + self.v_params = ( + shlex.split(args.vargs) if args.vargs else video_defaults.get(self.vencoder, []) + ) + self.a_params = ( + shlex.split(args.aargs) if args.aargs else audio_defaults.get(self.aencoder, []) + ) + + filters = [] + if getattr(args, "res", None): + filters.append(f"scale={args.res}") + if getattr(args, "fps", None): + filters.append(f"fps={args.fps}") + self.vf_string = ",".join(filters) if filters else "" + + self.out_dir = ( + Path(args.outdir).resolve() if args.outdir else self.input_path.parent + ) + self.out_dir.mkdir(parents=True, exist_ok=True) + + self.basename = self.input_path.stem + self.output_path = self.out_dir / f"{self.basename}.{self.vencoder}.mp4" + self.log_path = self.out_dir / f"{self.basename}.{self.vencoder}.log" + self.vmaf_log_path = self.out_dir / f"{self.basename}.{self.vencoder}.vmaf.json" + + self.temp_files = [] + self.orig_info = None + + def get_media_info(self, file_path: Path): + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_streams", + str(file_path), + ] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + return None + + data = json.loads(result.stdout) + video_stream = next( + (s for s in data.get("streams", []) if s["codec_type"] == "video"), None + ) + if not video_stream: + return None + + duration = float(data.get("format", {}).get("duration", 0)) + bitrate = int(data.get("format", {}).get("bit_rate", 0)) + fps_str = video_stream.get("r_frame_rate", "0/1") + + nb_frames = video_stream.get("nb_frames") + if nb_frames: + frame_count = int(nb_frames) + else: + num, den = map(int, fps_str.split("/")) + fps = num / den if den != 0 else 0 + frame_count = int(duration * fps) + + return { + "codec": video_stream.get("codec_name"), + "bitrate_kbps": bitrate / 1000, + "frame_count": frame_count, + "fps_str": fps_str, + "width": video_stream.get("width"), + "height": video_stream.get("height"), + } + + def _run_vmaf_psnr(self, new_info): + target_samples = 500 + n_subsample = max(1, (self.orig_info.get("frame_count", 0) if self.orig_info else 0) // target_samples) + threads = multiprocessing.cpu_count() + fps = self.orig_info.get("fps_str", "0/1") if self.orig_info else "0/1" + + print( + f"\n[Metrics] 启动 VMAF/PSNR 测试 (采样间隔: {n_subsample}, 线程数: {threads}, 强制帧率: {fps})..." + ) + + out_w = new_info.get("width") if new_info else None + out_h = new_info.get("height") if new_info else None + orig_w = self.orig_info.get("width") if self.orig_info else None + orig_h = self.orig_info.get("height") if self.orig_info else None + orig_fps = self.orig_info.get("fps_str", "0/1") if self.orig_info else "0/1" + out_fps = new_info.get("fps_str", "0/1") if new_info else "0/1" + + # 使用滤镜链对齐帧率、分辨率和首帧时间戳 + filters_0 = [] + if out_fps != orig_fps: + filters_0.append(f"fps={orig_fps}") + if out_w and out_h and orig_w and orig_h and (out_w != orig_w or out_h != orig_h): + filters_0.append(f"scale={orig_w}:{orig_h}:flags=bicubic") + filters_0.append("setpts=PTS-STARTPTS") + + filter_0_str = ",".join(filters_0) + + vmaf_filter = ( + f"[0:v]{filter_0_str}[dist];[1:v]setpts=PTS-STARTPTS[ref];" + f"[dist][ref]libvmaf=feature='name=psnr':log_path='{self.vmaf_log_path.as_posix()}':" + f"log_fmt=json:n_subsample={n_subsample}:n_threads={threads}" + ) + + cmd = [ + "ffmpeg", + "-y", + "-i", + str(self.output_path), + "-i", + str(self.input_path), + "-lavfi", + vmaf_filter, + "-f", + "null", + "-", + ] + subprocess.run(cmd, check=False) + + try: + with open(self.vmaf_log_path, "r", encoding="utf-8") as f: + data = json.load(f).get("pooled_metrics", {}) + vmaf = ( + round(data.get("vmaf", {}).get("mean", 0), 2) + if "vmaf" in data + else "N/A" + ) + psnr = ( + round(data.get("psnr_y", {}).get("mean", 0), 2) + if "psnr_y" in data + else "N/A" + ) + return vmaf, psnr + except Exception as e: + print(f"[Metrics Error] 无法解析 VMAF 日志: {e}") + return "N/A", "N/A" + + def _generate_log_content(self, encode_time, new_info, vmaf, psnr, extra_info=""): + size_ratio = ( + self.output_path.stat().st_size / self.input_path.stat().st_size + ) * 100 + content = ( + f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"输入: {self.input_path}\n" + f"输出: {self.output_path}\n" + f"{extra_info}" + f"视频编码器: {self.vencoder} ({' '.join(self.v_params)})\n" + f"音频编码器: {self.aencoder} ({' '.join(self.a_params)})\n" + f"耗时: {encode_time:.2f} s\n" + f"[元数据] {self.orig_info.get('codec','') if self.orig_info else ''} -> {new_info.get('codec','') if new_info else ''} | {self.orig_info.get('bitrate_kbps',0) if self.orig_info else 0:.0f} kbps -> {new_info.get('bitrate_kbps',0) if new_info else 0:.0f} kbps\n" + f"[体积比] {size_ratio:.1f}%\n" + ) + if self.run_metrics: + content += f"[质量] VMAF: {vmaf} | PSNR(Y): {psnr}\n" + content += "-" * 50 + "\n" + return content + + def _send_notification( + self, encode_time: float = 0.0, new_info=None, vmaf="N/A", full_text="", error=None, extra_title="" + ): + if self.notify_mode == "none": + return + + subject = f"{extra_title}任务{'失败' if error else '完成'}: {self.input_path.name}" + + if self.notify_mode == "mail": + body = error if error else full_text + try: + subprocess.run( + ["mail", "-s", subject, getpass.getuser()], + input=body.encode("utf-8"), + check=True, + ) + except Exception as e: + print(f"[Notify Error] 邮件发送失败: {e}") + + elif self.notify_mode == "notify": + if error: + body = error + else: + body = f"编码器: {self.vencoder}\n耗时: {encode_time:.1f}s" + if self.run_metrics: + body += f"\nVMAF: {vmaf}" + try: + subprocess.run(["notify-send", subject, body], check=True) + except Exception as e: + print(f"[Notify Error] DBus 通知发送失败: {e}") + + def _cleanup(self): + for temp_file in self.temp_files: + if temp_file.exists(): + temp_file.unlink() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7a6e5f5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[project] +name = "transcode" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.13" +dependencies = [] diff --git a/tc-2pass.py b/tc-2pass.py new file mode 100755 index 0000000..13dd563 --- /dev/null +++ b/tc-2pass.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +import argparse +import sys +import time +import subprocess +from pathlib import Path + +sys.path.append(str(Path(__file__).resolve().parent)) +from transcode_core import BaseTranscoder + +VIDEO_DEFAULTS = { + "libx265": ["-preset", "slow", "-pix_fmt", "yuv420p10le", "-x265-params", "aq-mode=3:aq-strength=0.8:psy-rd=1.0"], + "libaom-av1": ["-cpu-used", "4", "-row-mt", "1", "-pix_fmt", "yuv420p10le", "-aom-params", "tune=0"], + "libvvenc": ["-preset", "slow", "-qpa", "0", "-pix_fmt", "yuv420p10le"], +} + +AUDIO_DEFAULTS = { + "fdkaac": ["-m", "4"], + "opus": ["-c:a", "libopus", "-b:a", "128k", "-vbr", "on"], +} + +class MediaTranscoder2Pass(BaseTranscoder): + def __init__(self, args): + super().__init__(args, VIDEO_DEFAULTS, AUDIO_DEFAULTS) + self.target_bitrate = args.bitrate + self.passlog_prefix = self.out_dir / f"passlog_{self.basename}_{self.vencoder}_{int(time.time())}" + + def execute(self): + print(f"--- 初始化 2-Pass 转码任务 ---") + self.orig_info = self.get_media_info(self.input_path) + if not self.orig_info: + raise ValueError("无法读取输入文件元数据。") + + start_time = time.time() + try: + self._run_pass_1() + if self.aencoder == "fdkaac": + self._run_pass_2_external_audio() + else: + self._run_pass_2_standard() + except subprocess.CalledProcessError as e: + self._cleanup() + self._send_notification(error=f"转码失败,退出码: {e.returncode}", extra_title="2-Pass转码") + sys.exit(f"\n转码失败,退出码: {e.returncode}") + + self._cleanup() + encode_time = time.time() - start_time + + new_info = self.get_media_info(self.output_path) + vmaf_score, psnr_score = "N/A", "N/A" + + if self.run_metrics and new_info: + vmaf_score, psnr_score = self._run_vmaf_psnr(new_info) + + log_content = self._generate_log_content( + encode_time, new_info, vmaf_score, psnr_score, extra_info=f"设定码率: {self.target_bitrate}\n" + ) + + with open(self.log_path, "a", encoding="utf-8") as f: + f.write(log_content) + + self._send_notification(encode_time=encode_time, new_info=new_info, vmaf=vmaf_score, full_text=log_content, extra_title="2-Pass转码") + + print(f"\n转码完成!耗时: {encode_time:.2f} 秒") + print(f"输出文件: {self.output_path}") + + def _get_base_video_cmd(self, pass_num): + cmd = ["-c:v", self.vencoder] + if hasattr(self, 'vf_string') and self.vf_string: + cmd.extend(["-vf", self.vf_string]) + cmd.extend([ + *self.v_params, + "-b:v", self.target_bitrate, + "-pass", str(pass_num), + "-passlogfile", str(self.passlog_prefix), + ]) + return cmd + + def _run_pass_1(self): + print("\n[Pass 1] 正在进行第一阶段视频分析...") + cmd = ["ffmpeg", "-y", "-i", str(self.input_path), *self._get_base_video_cmd(1), "-an", "-f", "null", "-"] + subprocess.run(cmd, check=True) + + def _run_pass_2_external_audio(self): + print("\n[Pass 2 - Audio] 提取音频流并使用外部 fdkaac 编码...") + temp_wav = self.out_dir / f"temp_{self.input_path.stem}.wav" + temp_m4a = self.out_dir / f"temp_{self.input_path.stem}.m4a" + self.temp_files.extend([temp_wav, temp_m4a]) + + subprocess.run(["ffmpeg", "-y", "-i", str(self.input_path), "-vn", "-c:a", "pcm_s16le", str(temp_wav)], check=True) + subprocess.run(["fdkaac"] + self.a_params + [str(temp_wav), "-o", str(temp_m4a)], check=True) + + print("\n[Pass 2 - Video] 正在进行第二阶段最终编码并混流...") + cmd = [ + "ffmpeg", "-y", "-i", str(self.input_path), "-i", str(temp_m4a), + "-map", "0:v:0", "-map", "1:a:0", + *self._get_base_video_cmd(2), "-c:a", "copy", str(self.output_path), + ] + subprocess.run(cmd, check=True) + + def _run_pass_2_standard(self): + print("\n[Pass 2] 正在进行第二阶段最终编码 (包含音频)...") + cmd = ["ffmpeg", "-y", "-i", str(self.input_path), *self._get_base_video_cmd(2), *self.a_params, str(self.output_path)] + subprocess.run(cmd, check=True) + + def _cleanup(self): + super()._cleanup() + for log_file in self.out_dir.glob(f"{self.passlog_prefix.name}*"): + try: + log_file.unlink() + except Exception as e: + print(f"[Cleanup Warning] 无法删除临时日志 {log_file.name}: {e}") + +def main(): + parser = argparse.ArgumentParser(description="2-Pass 可配置视频转码脚本") + parser.add_argument("-i", "--input", required=True, help="输入视频文件") + parser.add_argument("-b", "--bitrate", required=True, help="目标视频码率 (例如: 8M, 3000K)") + parser.add_argument("-d", "--outdir", help="指定输出目录 (默认与输入文件同目录)") + parser.add_argument("-cv", "--vencoder", choices=VIDEO_DEFAULTS.keys(), default="libx265", help="视频编码器 (默认 libx265)") + parser.add_argument("-ca", "--aencoder", choices=AUDIO_DEFAULTS.keys(), default="opus", help="音频编码器 (默认 opus)") + parser.add_argument("--vargs", type=str, default="", help="覆盖默认视频编码参数") + parser.add_argument("--aargs", type=str, default="", help="覆盖默认音频编码参数") + parser.add_argument("--fps", type=str, default="", help="指定帧率 (例如 60, 30000/1001)") + parser.add_argument("--res", type=str, default="", help="指定分辨率,格式与scale滤镜相同 (例如 1920:1080)") + parser.add_argument("--metrics", action=argparse.BooleanOptionalAction, default=True, help="结束后运行 VMAF/PSNR") + parser.add_argument("--notify", choices=["none", "mail", "notify"], default="mail", help="完成后的通知方式 (默认 mail)") + + try: + transcoder = MediaTranscoder2Pass(parser.parse_args()) + transcoder.execute() + except Exception as e: + sys.exit(f"发生错误: {e}") + +if __name__ == "__main__": + main() diff --git a/tc.py b/tc.py new file mode 100755 index 0000000..a3d2dc6 --- /dev/null +++ b/tc.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +import argparse +import sys +import time +import subprocess +from pathlib import Path + +# 将当前目录加入 path 以引入 core +sys.path.append(str(Path(__file__).resolve().parent)) +from transcode_core import BaseTranscoder + +VIDEO_DEFAULTS = { + "hevc_nvenc": ["-preset", "p7", "-tune", "uhq", "-rc", "vbr_hq", "-cq", "24", "-spatial-aq", "1", "-multipass", "2", "-pix_fmt", "p010le"], + "av1_nvenc": ["-preset", "p7", "-tune", "uhq", "-rc", "vbr", "-cq", "32", "-multipass", "2", "-pix_fmt", "p010le"], + "libx265": ["-preset", "slow", "-crf", "24", "-pix_fmt", "yuv420p10le", "-x265-params", "aq-mode=3:aq-strength=0.8:psy-rd=1.0"], + "libsvtav1": ["-preset", "4", "-crf", "35", "-pix_fmt", "yuv420p10le", "-svtav1-params", "tune=0:enable-qm=1"], + "libvvenc": ["-preset", "medium", "-qpa", "0", "-qp", "24", "-pix_fmt", "yuv420p10le"], +} + +AUDIO_DEFAULTS = { + "fdkaac": ["-m", "4"], + "opus": ["-c:a", "libopus", "-b:a", "128k", "-vbr", "on"], +} + +class MediaTranscoder(BaseTranscoder): + def __init__(self, args): + super().__init__(args, VIDEO_DEFAULTS, AUDIO_DEFAULTS) + + def execute(self): + print(f"--- 初始化转码任务 ---") + self.orig_info = self.get_media_info(self.input_path) + if not self.orig_info: + raise ValueError("无法读取输入文件元数据。") + + start_time = time.time() + try: + if self.aencoder == "fdkaac": + self._encode_with_external_audio() + else: + self._encode_standard() + except subprocess.CalledProcessError as e: + self._cleanup() + self._send_notification(error=f"转码失败,退出码: {e.returncode}", extra_title="转码") + sys.exit(f"\n转码失败,退出码: {e.returncode}") + + self._cleanup() + encode_time = time.time() - start_time + + new_info = self.get_media_info(self.output_path) + vmaf_score, psnr_score = "N/A", "N/A" + + if self.run_metrics and new_info: + vmaf_score, psnr_score = self._run_vmaf_psnr(new_info) + + log_content = self._generate_log_content( + encode_time, new_info, vmaf_score, psnr_score + ) + + with open(self.log_path, "a", encoding="utf-8") as f: + f.write(log_content) + + self._send_notification(encode_time=encode_time, new_info=new_info, vmaf=vmaf_score, full_text=log_content, extra_title="转码") + + print(f"\n转码完成!耗时: {encode_time:.2f} 秒") + print(f"输出文件: {self.output_path}") + + def _encode_with_external_audio(self): + print("\n[Audio] 提取音频流并使用外部 fdkaac 编码...") + temp_wav = self.out_dir / f"temp_{self.input_path.stem}.wav" + temp_m4a = self.out_dir / f"temp_{self.input_path.stem}.m4a" + self.temp_files.extend([temp_wav, temp_m4a]) + + subprocess.run(["ffmpeg", "-y", "-i", str(self.input_path), "-vn", "-c:a", "pcm_s16le", str(temp_wav)], check=True) + subprocess.run(["fdkaac"] + self.a_params + [str(temp_wav), "-o", str(temp_m4a)], check=True) + + print("\n[Video] 编码视频并混流...") + cmd = [ + "ffmpeg", "-y", "-i", str(self.input_path), "-i", str(temp_m4a), + "-map", "0:v:0", "-map", "1:a:0", "-c:v", self.vencoder, + ] + if self.vf_string: + cmd.extend(["-vf", self.vf_string]) + cmd.extend(self.v_params) + cmd.extend(["-c:a", "copy", str(self.output_path)]) + subprocess.run(cmd, check=True) + + def _encode_standard(self): + print("\n[Transcode] 使用 ffmpeg 同时处理视音频...") + cmd = ["ffmpeg", "-y", "-i", str(self.input_path), "-c:v", self.vencoder] + if self.vf_string: + cmd.extend(["-vf", self.vf_string]) + cmd.extend(self.v_params) + cmd.extend(self.a_params) + cmd.extend([str(self.output_path)]) + subprocess.run(cmd, check=True) + +def main(): + parser = argparse.ArgumentParser(description="可配置视频转码脚本") + parser.add_argument("-i", "--input", required=True, help="输入视频文件") + parser.add_argument("-d", "--outdir", help="指定输出目录 (默认与输入文件同目录)") + parser.add_argument("-cv", "--vencoder", choices=VIDEO_DEFAULTS.keys(), default="libsvtav1", help="视频编码器 (默认 libsvtav1)") + parser.add_argument("-ca", "--aencoder", choices=AUDIO_DEFAULTS.keys(), default="opus", help="音频编码器 (默认 opus)") + parser.add_argument("--vargs", type=str, default="", help="覆盖默认视频编码参数") + parser.add_argument("--aargs", type=str, default="", help="覆盖默认音频编码参数") + parser.add_argument("--fps", type=str, default="", help="指定帧率 (例如 60, 30000/1001)") + parser.add_argument("--res", type=str, default="", help="指定分辨率,格式与scale滤镜相同 (例如 1920:1080)") + parser.add_argument("--metrics", action=argparse.BooleanOptionalAction, default=True, help="结束后运行 VMAF/PSNR") + parser.add_argument("--notify", choices=["none", "mail", "notify"], default="mail", help="完成后的通知方式 (默认 mail)") + + try: + transcoder = MediaTranscoder(parser.parse_args()) + transcoder.execute() + except Exception as e: + sys.exit(f"发生错误: {e}") + +if __name__ == "__main__": + main()