import os import uuid import subprocess import json import stat import tempfile from zipfile import ZipFile def _setup_wav2lip(): if not os.path.exists("Wav2Lip"): subprocess.run( ["git", "clone", "--depth", "1", "https://github.com/Rudrabha/Wav2Lip.git"], check=True, ) subprocess.run( ["pip", "install", "-q", "--no-deps", "basicsr", "facexlib", "gfpgan", "batch-face"], check=True, ) import shutil from huggingface_hub import hf_hub_download ckpt_dir = "Wav2Lip/checkpoints" os.makedirs(ckpt_dir, exist_ok=True) ckpt_gan = f"{ckpt_dir}/wav2lip_gan.pth" if not os.path.exists(ckpt_gan): shutil.copy( hf_hub_download(repo_id="numz/wav2lip_studio", filename="Wav2lip/wav2lip_gan.pth"), ckpt_gan, ) ckpt_s3fd = "Wav2Lip/face_detection/detection/sfd/s3fd.pth" os.makedirs(os.path.dirname(ckpt_s3fd), exist_ok=True) if not os.path.exists(ckpt_s3fd): shutil.copy( hf_hub_download(repo_id="camenduru/Wav2Lip", filename="checkpoints/s3fd-619a316812.pth"), ckpt_s3fd, ) _setup_wav2lip() import gradio as gr import ffmpeg import torch import soundfile as sf from googletrans import Translator from huggingface_hub import HfApi from qwen_tts import Qwen3TTSModel import spaces try: from moviepy import VideoFileClip except ImportError: from moviepy.editor import VideoFileClip HF_TOKEN = os.environ.get("HF_TOKEN") MAX_VIDEO_DURATION = 60 api = HfApi(token=HF_TOKEN) ZipFile("ffmpeg.zip").extractall() os.chmod("ffmpeg", os.stat("ffmpeg").st_mode | stat.S_IEXEC) language_mapping = { "English": ("en", "English"), "Spanish": ("es", "Spanish"), "French": ("fr", "French"), "German": ("de", "German"), "Italian": ("it", "Italian"), "Portuguese": ("pt", "Portuguese"), "Russian": ("ru", "Russian"), "Chinese (Simplified)": ("zh-CN", "Chinese"), "Japanese": ("ja", "Japanese"), "Korean": ("ko", "Korean"), } TTS_MODEL_ID = "Qwen/Qwen3-TTS-12Hz-1.7B-Base" tts_model = None def get_tts_model(): global tts_model if tts_model is None: tts_model = Qwen3TTSModel.from_pretrained( TTS_MODEL_ID, device_map="cuda", dtype=torch.bfloat16, ) return tts_model def uid(ext=""): return os.path.join(tempfile.gettempdir(), f"{uuid.uuid4().hex}{ext}") def cleanup(*paths): for p in paths: if p and os.path.exists(p): try: os.remove(p) except OSError: pass def extract_audio_segment(video_path, duration=4.0): out = uid(".wav") subprocess.run( ["ffmpeg", "-y", "-i", video_path, "-t", str(duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", out], check=True, capture_output=True, ) return out @spaces.GPU(duration=120) def transcribe_audio(file_path): temp_audio = None if file_path.lower().endswith((".mp4", ".avi", ".mov", ".flv", ".mkv")): video = VideoFileClip(file_path) temp_audio = uid(".wav") video.audio.write_audiofile(temp_audio, logger=None) video.close() file_path = temp_audio output_file = uid(".json") try: subprocess.run( [ "insanely-fast-whisper", "--file-name", file_path, "--device-id", "0", "--model-name", "openai/whisper-large-v3", "--task", "transcribe", "--timestamp", "chunk", "--transcript-path", output_file, "--batch-size", "24", ], check=True, capture_output=True, text=True, ) with open(output_file) as f: data = json.load(f) result = data.get("text") or " ".join(c["text"] for c in data.get("chunks", [])) finally: cleanup(output_file) if temp_audio: cleanup(temp_audio) return result.strip() @spaces.GPU(duration=120) def synthesize_speech(translated_text, ref_audio_path, ref_text, target_language_qwen): model = get_tts_model() prompt = model.create_voice_clone_prompt( ref_audio=ref_audio_path, ref_text=ref_text, ) wavs, sr = model.generate_voice_clone( text=translated_text, language=target_language_qwen, voice_clone_prompt=prompt, ) out_path = uid(".wav") sf.write(out_path, wavs[0], sr) return out_path @spaces.GPU(duration=120) def run_wav2lip(video_path, audio_path, run_uuid): out_path = f"/tmp/{run_uuid}_output_video.mp4" try: subprocess.run( [ "python", "Wav2Lip/inference.py", "--checkpoint_path", "Wav2Lip/checkpoints/wav2lip_gan.pth", "--face", video_path, "--audio", audio_path, "--pads", "0", "15", "0", "0", "--resize_factor", "1", "--nosmooth", "--outfile", out_path, ], check=True, capture_output=True, text=True, ) except subprocess.CalledProcessError: gr.Warning("Wav2Lip failed, falling back to simple audio replace.") subprocess.run( f"ffmpeg -y -i {video_path} -i {audio_path} -c:v copy -c:a aac " f"-map 0:v:0 -map 1:a:0 {out_path}", shell=True, check=True, ) return out_path def process_video(video, target_language, use_wav2lip): if not video: return None, "Please upload a video." if target_language is None: return None, "Please select a target language." run_uuid = uuid.uuid4().hex[:8] resized = f"/tmp/{run_uuid}_resized.mp4" audio_raw = f"/tmp/{run_uuid}_audio_raw.wav" audio_clean = f"/tmp/{run_uuid}_audio_clean.wav" ref_clip = None synth_audio = None try: ffmpeg.input(video).output(resized, vf="scale=-2:720").run(quiet=True, overwrite_output=True) info = ffmpeg.probe(resized) duration = float(next(s for s in info["streams"] if s["codec_type"] == "video")["duration"]) if duration > MAX_VIDEO_DURATION: return None, f"Video exceeds {MAX_VIDEO_DURATION}s limit." ffmpeg.input(resized).output(audio_raw, acodec="pcm_s24le", ar=48000, map="a").run( quiet=True, overwrite_output=True ) subprocess.run( f"ffmpeg -y -i {audio_raw} -af lowpass=3000,highpass=100 {audio_clean}", shell=True, check=True, capture_output=True, ) transcription = transcribe_audio(audio_clean) if not transcription: return None, "Transcription failed or returned empty." lang_code, lang_qwen = language_mapping[target_language] translator = Translator() translated = translator.translate(transcription, dest=lang_code).text ref_clip = extract_audio_segment(resized, duration=4.0) synth_audio = synthesize_speech(translated, ref_clip, transcription[:200], lang_qwen) if use_wav2lip: output_video = run_wav2lip(resized, synth_audio, run_uuid) else: output_video = f"/tmp/{run_uuid}_output_video.mp4" subprocess.run( f"ffmpeg -y -i {resized} -i {synth_audio} -c:v copy -c:a aac " f"-map 0:v:0 -map 1:a:0 {output_video}", shell=True, check=True, capture_output=True, ) if not os.path.exists(output_video): return None, "Output video was not generated." return output_video, "✅ Done!" except Exception as e: return None, f"Error: {e}" finally: cleanup(resized, audio_raw, audio_clean) if ref_clip: cleanup(ref_clip) if synth_audio: cleanup(synth_audio) with gr.Blocks() as demo: gr.Markdown("# 🎬 AI Video Dubbing") gr.Markdown( "Upload a video, pick a target language, and get a dubbed version with the " "**original speaker's cloned voice** — Whisper + Qwen3-TTS + Wav2Lip." ) with gr.Row(): with gr.Column(scale=2): video_input = gr.Video(label="Upload Video (max 60s)") target_language = gr.Dropdown( choices=list(language_mapping.keys()), label="Target Language", value="English", ) use_wav2lip = gr.Checkbox( label="Lip Sync with Wav2Lip", value=False, info="Recommended for close-up face videos. Adds ~30s processing time.", ) submit_button = gr.Button("🚀 Dub Video", variant="primary") with gr.Column(scale=2): output_video = gr.Video(label="Dubbed Video") status = gr.Textbox(label="Status") submit_button.click( process_video, inputs=[video_input, target_language, use_wav2lip], outputs=[output_video, status], ) gr.Markdown(""" --- **Pipeline:** Whisper large-v3 → Google Translate → Qwen3-TTS voice clone → Wav2Lip (optional) By [@artificialguybr](https://twitter.com/artificialguybr) """) demo.queue() demo.launch(theme=gr.themes.Soft())