Files
transcode/tc-2pass.py
2026-03-18 23:13:09 +01:00

139 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import sys
import time
import subprocess
from pathlib import Path
from core import BaseTranscoder
VIDEO_DEFAULTS = {
"libx265": ["-preset", "slow", "-pix_fmt", "yuv420p10le", "-x265-params", "aq-mode=3:aq-strength=0.8:psy-rd=1.0"],
"libaom-av1": ["-cpu-used", "4", "-row-mt", "1", "-pix_fmt", "yuv420p10le", "-aom-params", "tune=0"],
"libvvenc": ["-preset", "slow", "-qpa", "0", "-pix_fmt", "yuv420p10le"],
}
AUDIO_DEFAULTS = {
"fdkaac": ["-m", "4"],
"opus": ["-c:a", "libopus", "-b:a", "128k", "-vbr", "on"],
}
class MediaTranscoder2Pass(BaseTranscoder):
def __init__(self, args):
super().__init__(args, VIDEO_DEFAULTS, AUDIO_DEFAULTS)
self.target_bitrate = args.bitrate
self.passlog_prefix = self.out_dir / f"passlog_{self.basename}_{self.vencoder}_{int(time.time())}"
def execute(self):
print(f"--- Initializing 2-Pass Transcode Task ---")
self.orig_info = self.get_media_info(self.input_path)
if not self.orig_info:
raise ValueError("Failed to read input file metadata.")
start_time = time.time()
try:
self._run_pass_1()
if self.aencoder == "fdkaac":
self._run_pass_2_external_audio()
else:
self._run_pass_2_standard()
except subprocess.CalledProcessError as e:
self._cleanup()
self._send_notification(error=f"Transcode failed, exit code: {e.returncode}", extra_title="2-Pass Transcode ")
sys.exit(f"\nTranscode failed, exit code: {e.returncode}")
self._cleanup()
encode_time = time.time() - start_time
new_info = self.get_media_info(self.output_path)
vmaf_score, psnr_score = "N/A", "N/A"
if self.run_metrics and new_info:
vmaf_score, psnr_score = self._run_vmaf_psnr(new_info)
log_content = self._generate_log_content(
encode_time, new_info, vmaf_score, psnr_score, extra_info=f"Target bitrate: {self.target_bitrate}\n"
)
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(log_content)
self._send_notification(encode_time=encode_time, new_info=new_info, vmaf=vmaf_score,
full_text=log_content, extra_title="2-Pass Transcode ")
print(f"\nTranscode completed! Time elapsed: {encode_time:.2f} s")
print(f"Output file: {self.output_path}")
def _get_base_video_cmd(self, pass_num):
cmd = ["-c:v", self.vencoder]
if hasattr(self, 'vf_string') and self.vf_string:
cmd.extend(["-vf", self.vf_string])
cmd.extend([
*self.v_params,
"-b:v", self.target_bitrate,
"-pass", str(pass_num),
"-passlogfile", str(self.passlog_prefix),
])
return cmd
def _run_pass_1(self):
print("\n[Pass 1] Running first pass video analysis...")
cmd = ["ffmpeg", "-y", "-i", str(self.input_path), *self._get_base_video_cmd(1), "-an", "-f", "null", "-"]
subprocess.run(cmd, check=True)
def _run_pass_2_external_audio(self):
print("\n[Pass 2 - Audio] Extracting audio stream and encoding with external fdkaac...")
temp_wav = self.out_dir / f"temp_{self.input_path.stem}.wav"
temp_m4a = self.out_dir / f"temp_{self.input_path.stem}.m4a"
self.temp_files.extend([temp_wav, temp_m4a])
subprocess.run(["ffmpeg", "-y", "-i", str(self.input_path), "-vn", "-c:a", "pcm_s16le", str(temp_wav)], check=True)
subprocess.run(["fdkaac"] + self.a_params + [str(temp_wav), "-o", str(temp_m4a)], check=True)
print("\n[Pass 2 - Video] Running second pass final encode and muxing...")
cmd = [
"ffmpeg", "-y", "-i", str(self.input_path), "-i", str(temp_m4a),
"-map", "0:v:0", "-map", "1:a:0",
*self._get_base_video_cmd(2), "-c:a", "copy", str(self.output_path),
]
subprocess.run(cmd, check=True)
def _run_pass_2_standard(self):
print("\n[Pass 2] Running second pass final encode (including audio)...")
cmd = ["ffmpeg", "-y", "-i", str(self.input_path), *self._get_base_video_cmd(2), *self.a_params, str(self.output_path)]
subprocess.run(cmd, check=True)
def _cleanup(self):
super()._cleanup()
for log_file in self.out_dir.glob(f"{self.passlog_prefix.name}*"):
try:
log_file.unlink()
except Exception as e:
print(f"[Cleanup Warning] Failed to delete temporary log {log_file.name}: {e}")
def main():
parser = argparse.ArgumentParser(description="2-Pass Configurable video transcoding script")
parser.add_argument("-i", "--input", required=True, help="Input video file")
parser.add_argument("-b", "--bitrate", required=True, help="Target video bitrate (e.g., 8M, 3000K)")
parser.add_argument("-d", "--outdir", help="Output directory (default: same as input)")
parser.add_argument("-cv", "--vencoder", choices=VIDEO_DEFAULTS.keys(), default="libx265", help="Video encoder (default: libx265)")
parser.add_argument("-ca", "--aencoder", choices=AUDIO_DEFAULTS.keys(), default="opus", help="Audio encoder (default: opus)")
parser.add_argument("--vargs", type=str, default="", help="Override default video encoding parameters")
parser.add_argument("--aargs", type=str, default="", help="Override default audio encoding parameters")
parser.add_argument("--fps", type=str, default="", help="Specify framerate (e.g., 60, 30000/1001)")
parser.add_argument("--res", type=str, default="", help="Specify resolution, same format as scale filter (e.g., 1920:1080)")
parser.add_argument("--metrics", action=argparse.BooleanOptionalAction, default=True, help="Run VMAF/PSNR after completion")
parser.add_argument("--notify", choices=["none", "mail", "notify"], default="mail", help="Notification method after completion (default: mail)")
try:
transcoder = MediaTranscoder2Pass(parser.parse_args())
transcoder.execute()
except Exception as e:
sys.exit(f"Error occurred: {e}")
if __name__ == "__main__":
main()