This commit is contained in:
2026-03-18 23:03:59 +01:00
commit 6097317d25
6 changed files with 481 additions and 0 deletions
Vendored
+10
View File
@@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
+1
View File
@@ -0,0 +1 @@
3.13
+211
View File
@@ -0,0 +1,211 @@
#!/usr/bin/env python3
import subprocess
import json
import time
import multiprocessing
import getpass
import shlex
from pathlib import Path
class BaseTranscoder:
def __init__(self, args, video_defaults, audio_defaults):
self.input_path = Path(args.input).resolve()
if not self.input_path.exists():
raise FileNotFoundError(f"输入文件不存在: {self.input_path}")
self.vencoder = args.vencoder
self.aencoder = args.aencoder
self.run_metrics = args.metrics
self.notify_mode = args.notify
self.v_params = (
shlex.split(args.vargs) if args.vargs else video_defaults.get(self.vencoder, [])
)
self.a_params = (
shlex.split(args.aargs) if args.aargs else audio_defaults.get(self.aencoder, [])
)
filters = []
if getattr(args, "res", None):
filters.append(f"scale={args.res}")
if getattr(args, "fps", None):
filters.append(f"fps={args.fps}")
self.vf_string = ",".join(filters) if filters else ""
self.out_dir = (
Path(args.outdir).resolve() if args.outdir else self.input_path.parent
)
self.out_dir.mkdir(parents=True, exist_ok=True)
self.basename = self.input_path.stem
self.output_path = self.out_dir / f"{self.basename}.{self.vencoder}.mp4"
self.log_path = self.out_dir / f"{self.basename}.{self.vencoder}.log"
self.vmaf_log_path = self.out_dir / f"{self.basename}.{self.vencoder}.vmaf.json"
self.temp_files = []
self.orig_info = None
def get_media_info(self, file_path: Path):
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
str(file_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None
data = json.loads(result.stdout)
video_stream = next(
(s for s in data.get("streams", []) if s["codec_type"] == "video"), None
)
if not video_stream:
return None
duration = float(data.get("format", {}).get("duration", 0))
bitrate = int(data.get("format", {}).get("bit_rate", 0))
fps_str = video_stream.get("r_frame_rate", "0/1")
nb_frames = video_stream.get("nb_frames")
if nb_frames:
frame_count = int(nb_frames)
else:
num, den = map(int, fps_str.split("/"))
fps = num / den if den != 0 else 0
frame_count = int(duration * fps)
return {
"codec": video_stream.get("codec_name"),
"bitrate_kbps": bitrate / 1000,
"frame_count": frame_count,
"fps_str": fps_str,
"width": video_stream.get("width"),
"height": video_stream.get("height"),
}
def _run_vmaf_psnr(self, new_info):
target_samples = 500
n_subsample = max(1, (self.orig_info.get("frame_count", 0) if self.orig_info else 0) // target_samples)
threads = multiprocessing.cpu_count()
fps = self.orig_info.get("fps_str", "0/1") if self.orig_info else "0/1"
print(
f"\n[Metrics] 启动 VMAF/PSNR 测试 (采样间隔: {n_subsample}, 线程数: {threads}, 强制帧率: {fps})..."
)
out_w = new_info.get("width") if new_info else None
out_h = new_info.get("height") if new_info else None
orig_w = self.orig_info.get("width") if self.orig_info else None
orig_h = self.orig_info.get("height") if self.orig_info else None
orig_fps = self.orig_info.get("fps_str", "0/1") if self.orig_info else "0/1"
out_fps = new_info.get("fps_str", "0/1") if new_info else "0/1"
# 使用滤镜链对齐帧率、分辨率和首帧时间戳
filters_0 = []
if out_fps != orig_fps:
filters_0.append(f"fps={orig_fps}")
if out_w and out_h and orig_w and orig_h and (out_w != orig_w or out_h != orig_h):
filters_0.append(f"scale={orig_w}:{orig_h}:flags=bicubic")
filters_0.append("setpts=PTS-STARTPTS")
filter_0_str = ",".join(filters_0)
vmaf_filter = (
f"[0:v]{filter_0_str}[dist];[1:v]setpts=PTS-STARTPTS[ref];"
f"[dist][ref]libvmaf=feature='name=psnr':log_path='{self.vmaf_log_path.as_posix()}':"
f"log_fmt=json:n_subsample={n_subsample}:n_threads={threads}"
)
cmd = [
"ffmpeg",
"-y",
"-i",
str(self.output_path),
"-i",
str(self.input_path),
"-lavfi",
vmaf_filter,
"-f",
"null",
"-",
]
subprocess.run(cmd, check=False)
try:
with open(self.vmaf_log_path, "r", encoding="utf-8") as f:
data = json.load(f).get("pooled_metrics", {})
vmaf = (
round(data.get("vmaf", {}).get("mean", 0), 2)
if "vmaf" in data
else "N/A"
)
psnr = (
round(data.get("psnr_y", {}).get("mean", 0), 2)
if "psnr_y" in data
else "N/A"
)
return vmaf, psnr
except Exception as e:
print(f"[Metrics Error] 无法解析 VMAF 日志: {e}")
return "N/A", "N/A"
def _generate_log_content(self, encode_time, new_info, vmaf, psnr, extra_info=""):
size_ratio = (
self.output_path.stat().st_size / self.input_path.stat().st_size
) * 100
content = (
f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"输入: {self.input_path}\n"
f"输出: {self.output_path}\n"
f"{extra_info}"
f"视频编码器: {self.vencoder} ({' '.join(self.v_params)})\n"
f"音频编码器: {self.aencoder} ({' '.join(self.a_params)})\n"
f"耗时: {encode_time:.2f} s\n"
f"[元数据] {self.orig_info.get('codec','') if self.orig_info else ''} -> {new_info.get('codec','') if new_info else ''} | {self.orig_info.get('bitrate_kbps',0) if self.orig_info else 0:.0f} kbps -> {new_info.get('bitrate_kbps',0) if new_info else 0:.0f} kbps\n"
f"[体积比] {size_ratio:.1f}%\n"
)
if self.run_metrics:
content += f"[质量] VMAF: {vmaf} | PSNR(Y): {psnr}\n"
content += "-" * 50 + "\n"
return content
def _send_notification(
self, encode_time: float = 0.0, new_info=None, vmaf="N/A", full_text="", error=None, extra_title=""
):
if self.notify_mode == "none":
return
subject = f"{extra_title}任务{'失败' if error else '完成'}: {self.input_path.name}"
if self.notify_mode == "mail":
body = error if error else full_text
try:
subprocess.run(
["mail", "-s", subject, getpass.getuser()],
input=body.encode("utf-8"),
check=True,
)
except Exception as e:
print(f"[Notify Error] 邮件发送失败: {e}")
elif self.notify_mode == "notify":
if error:
body = error
else:
body = f"编码器: {self.vencoder}\n耗时: {encode_time:.1f}s"
if self.run_metrics:
body += f"\nVMAF: {vmaf}"
try:
subprocess.run(["notify-send", subject, body], check=True)
except Exception as e:
print(f"[Notify Error] DBus 通知发送失败: {e}")
def _cleanup(self):
for temp_file in self.temp_files:
if temp_file.exists():
temp_file.unlink()
+7
View File
@@ -0,0 +1,7 @@
[project]
name = "transcode"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = []
Executable
+135
View File
@@ -0,0 +1,135 @@
#!/usr/bin/env python3
import argparse
import sys
import time
import subprocess
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent))
from transcode_core import BaseTranscoder
VIDEO_DEFAULTS = {
"libx265": ["-preset", "slow", "-pix_fmt", "yuv420p10le", "-x265-params", "aq-mode=3:aq-strength=0.8:psy-rd=1.0"],
"libaom-av1": ["-cpu-used", "4", "-row-mt", "1", "-pix_fmt", "yuv420p10le", "-aom-params", "tune=0"],
"libvvenc": ["-preset", "slow", "-qpa", "0", "-pix_fmt", "yuv420p10le"],
}
AUDIO_DEFAULTS = {
"fdkaac": ["-m", "4"],
"opus": ["-c:a", "libopus", "-b:a", "128k", "-vbr", "on"],
}
class MediaTranscoder2Pass(BaseTranscoder):
def __init__(self, args):
super().__init__(args, VIDEO_DEFAULTS, AUDIO_DEFAULTS)
self.target_bitrate = args.bitrate
self.passlog_prefix = self.out_dir / f"passlog_{self.basename}_{self.vencoder}_{int(time.time())}"
def execute(self):
print(f"--- 初始化 2-Pass 转码任务 ---")
self.orig_info = self.get_media_info(self.input_path)
if not self.orig_info:
raise ValueError("无法读取输入文件元数据。")
start_time = time.time()
try:
self._run_pass_1()
if self.aencoder == "fdkaac":
self._run_pass_2_external_audio()
else:
self._run_pass_2_standard()
except subprocess.CalledProcessError as e:
self._cleanup()
self._send_notification(error=f"转码失败,退出码: {e.returncode}", extra_title="2-Pass转码")
sys.exit(f"\n转码失败,退出码: {e.returncode}")
self._cleanup()
encode_time = time.time() - start_time
new_info = self.get_media_info(self.output_path)
vmaf_score, psnr_score = "N/A", "N/A"
if self.run_metrics and new_info:
vmaf_score, psnr_score = self._run_vmaf_psnr(new_info)
log_content = self._generate_log_content(
encode_time, new_info, vmaf_score, psnr_score, extra_info=f"设定码率: {self.target_bitrate}\n"
)
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(log_content)
self._send_notification(encode_time=encode_time, new_info=new_info, vmaf=vmaf_score, full_text=log_content, extra_title="2-Pass转码")
print(f"\n转码完成!耗时: {encode_time:.2f}")
print(f"输出文件: {self.output_path}")
def _get_base_video_cmd(self, pass_num):
cmd = ["-c:v", self.vencoder]
if hasattr(self, 'vf_string') and self.vf_string:
cmd.extend(["-vf", self.vf_string])
cmd.extend([
*self.v_params,
"-b:v", self.target_bitrate,
"-pass", str(pass_num),
"-passlogfile", str(self.passlog_prefix),
])
return cmd
def _run_pass_1(self):
print("\n[Pass 1] 正在进行第一阶段视频分析...")
cmd = ["ffmpeg", "-y", "-i", str(self.input_path), *self._get_base_video_cmd(1), "-an", "-f", "null", "-"]
subprocess.run(cmd, check=True)
def _run_pass_2_external_audio(self):
print("\n[Pass 2 - Audio] 提取音频流并使用外部 fdkaac 编码...")
temp_wav = self.out_dir / f"temp_{self.input_path.stem}.wav"
temp_m4a = self.out_dir / f"temp_{self.input_path.stem}.m4a"
self.temp_files.extend([temp_wav, temp_m4a])
subprocess.run(["ffmpeg", "-y", "-i", str(self.input_path), "-vn", "-c:a", "pcm_s16le", str(temp_wav)], check=True)
subprocess.run(["fdkaac"] + self.a_params + [str(temp_wav), "-o", str(temp_m4a)], check=True)
print("\n[Pass 2 - Video] 正在进行第二阶段最终编码并混流...")
cmd = [
"ffmpeg", "-y", "-i", str(self.input_path), "-i", str(temp_m4a),
"-map", "0:v:0", "-map", "1:a:0",
*self._get_base_video_cmd(2), "-c:a", "copy", str(self.output_path),
]
subprocess.run(cmd, check=True)
def _run_pass_2_standard(self):
print("\n[Pass 2] 正在进行第二阶段最终编码 (包含音频)...")
cmd = ["ffmpeg", "-y", "-i", str(self.input_path), *self._get_base_video_cmd(2), *self.a_params, str(self.output_path)]
subprocess.run(cmd, check=True)
def _cleanup(self):
super()._cleanup()
for log_file in self.out_dir.glob(f"{self.passlog_prefix.name}*"):
try:
log_file.unlink()
except Exception as e:
print(f"[Cleanup Warning] 无法删除临时日志 {log_file.name}: {e}")
def main():
parser = argparse.ArgumentParser(description="2-Pass 可配置视频转码脚本")
parser.add_argument("-i", "--input", required=True, help="输入视频文件")
parser.add_argument("-b", "--bitrate", required=True, help="目标视频码率 (例如: 8M, 3000K)")
parser.add_argument("-d", "--outdir", help="指定输出目录 (默认与输入文件同目录)")
parser.add_argument("-cv", "--vencoder", choices=VIDEO_DEFAULTS.keys(), default="libx265", help="视频编码器 (默认 libx265)")
parser.add_argument("-ca", "--aencoder", choices=AUDIO_DEFAULTS.keys(), default="opus", help="音频编码器 (默认 opus)")
parser.add_argument("--vargs", type=str, default="", help="覆盖默认视频编码参数")
parser.add_argument("--aargs", type=str, default="", help="覆盖默认音频编码参数")
parser.add_argument("--fps", type=str, default="", help="指定帧率 (例如 60, 30000/1001)")
parser.add_argument("--res", type=str, default="", help="指定分辨率格式与scale滤镜相同 (例如 1920:1080)")
parser.add_argument("--metrics", action=argparse.BooleanOptionalAction, default=True, help="结束后运行 VMAF/PSNR")
parser.add_argument("--notify", choices=["none", "mail", "notify"], default="mail", help="完成后的通知方式 (默认 mail)")
try:
transcoder = MediaTranscoder2Pass(parser.parse_args())
transcoder.execute()
except Exception as e:
sys.exit(f"发生错误: {e}")
if __name__ == "__main__":
main()
Executable
+117
View File
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
import argparse
import sys
import time
import subprocess
from pathlib import Path
# 将当前目录加入 path 以引入 core
sys.path.append(str(Path(__file__).resolve().parent))
from transcode_core import BaseTranscoder
VIDEO_DEFAULTS = {
"hevc_nvenc": ["-preset", "p7", "-tune", "uhq", "-rc", "vbr_hq", "-cq", "24", "-spatial-aq", "1", "-multipass", "2", "-pix_fmt", "p010le"],
"av1_nvenc": ["-preset", "p7", "-tune", "uhq", "-rc", "vbr", "-cq", "32", "-multipass", "2", "-pix_fmt", "p010le"],
"libx265": ["-preset", "slow", "-crf", "24", "-pix_fmt", "yuv420p10le", "-x265-params", "aq-mode=3:aq-strength=0.8:psy-rd=1.0"],
"libsvtav1": ["-preset", "4", "-crf", "35", "-pix_fmt", "yuv420p10le", "-svtav1-params", "tune=0:enable-qm=1"],
"libvvenc": ["-preset", "medium", "-qpa", "0", "-qp", "24", "-pix_fmt", "yuv420p10le"],
}
AUDIO_DEFAULTS = {
"fdkaac": ["-m", "4"],
"opus": ["-c:a", "libopus", "-b:a", "128k", "-vbr", "on"],
}
class MediaTranscoder(BaseTranscoder):
def __init__(self, args):
super().__init__(args, VIDEO_DEFAULTS, AUDIO_DEFAULTS)
def execute(self):
print(f"--- 初始化转码任务 ---")
self.orig_info = self.get_media_info(self.input_path)
if not self.orig_info:
raise ValueError("无法读取输入文件元数据。")
start_time = time.time()
try:
if self.aencoder == "fdkaac":
self._encode_with_external_audio()
else:
self._encode_standard()
except subprocess.CalledProcessError as e:
self._cleanup()
self._send_notification(error=f"转码失败,退出码: {e.returncode}", extra_title="转码")
sys.exit(f"\n转码失败,退出码: {e.returncode}")
self._cleanup()
encode_time = time.time() - start_time
new_info = self.get_media_info(self.output_path)
vmaf_score, psnr_score = "N/A", "N/A"
if self.run_metrics and new_info:
vmaf_score, psnr_score = self._run_vmaf_psnr(new_info)
log_content = self._generate_log_content(
encode_time, new_info, vmaf_score, psnr_score
)
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(log_content)
self._send_notification(encode_time=encode_time, new_info=new_info, vmaf=vmaf_score, full_text=log_content, extra_title="转码")
print(f"\n转码完成!耗时: {encode_time:.2f}")
print(f"输出文件: {self.output_path}")
def _encode_with_external_audio(self):
print("\n[Audio] 提取音频流并使用外部 fdkaac 编码...")
temp_wav = self.out_dir / f"temp_{self.input_path.stem}.wav"
temp_m4a = self.out_dir / f"temp_{self.input_path.stem}.m4a"
self.temp_files.extend([temp_wav, temp_m4a])
subprocess.run(["ffmpeg", "-y", "-i", str(self.input_path), "-vn", "-c:a", "pcm_s16le", str(temp_wav)], check=True)
subprocess.run(["fdkaac"] + self.a_params + [str(temp_wav), "-o", str(temp_m4a)], check=True)
print("\n[Video] 编码视频并混流...")
cmd = [
"ffmpeg", "-y", "-i", str(self.input_path), "-i", str(temp_m4a),
"-map", "0:v:0", "-map", "1:a:0", "-c:v", self.vencoder,
]
if self.vf_string:
cmd.extend(["-vf", self.vf_string])
cmd.extend(self.v_params)
cmd.extend(["-c:a", "copy", str(self.output_path)])
subprocess.run(cmd, check=True)
def _encode_standard(self):
print("\n[Transcode] 使用 ffmpeg 同时处理视音频...")
cmd = ["ffmpeg", "-y", "-i", str(self.input_path), "-c:v", self.vencoder]
if self.vf_string:
cmd.extend(["-vf", self.vf_string])
cmd.extend(self.v_params)
cmd.extend(self.a_params)
cmd.extend([str(self.output_path)])
subprocess.run(cmd, check=True)
def main():
parser = argparse.ArgumentParser(description="可配置视频转码脚本")
parser.add_argument("-i", "--input", required=True, help="输入视频文件")
parser.add_argument("-d", "--outdir", help="指定输出目录 (默认与输入文件同目录)")
parser.add_argument("-cv", "--vencoder", choices=VIDEO_DEFAULTS.keys(), default="libsvtav1", help="视频编码器 (默认 libsvtav1)")
parser.add_argument("-ca", "--aencoder", choices=AUDIO_DEFAULTS.keys(), default="opus", help="音频编码器 (默认 opus)")
parser.add_argument("--vargs", type=str, default="", help="覆盖默认视频编码参数")
parser.add_argument("--aargs", type=str, default="", help="覆盖默认音频编码参数")
parser.add_argument("--fps", type=str, default="", help="指定帧率 (例如 60, 30000/1001)")
parser.add_argument("--res", type=str, default="", help="指定分辨率格式与scale滤镜相同 (例如 1920:1080)")
parser.add_argument("--metrics", action=argparse.BooleanOptionalAction, default=True, help="结束后运行 VMAF/PSNR")
parser.add_argument("--notify", choices=["none", "mail", "notify"], default="mail", help="完成后的通知方式 (默认 mail)")
try:
transcoder = MediaTranscoder(parser.parse_args())
transcoder.execute()
except Exception as e:
sys.exit(f"发生错误: {e}")
if __name__ == "__main__":
main()