import spaces import torch from transformers import ( AutoTokenizer, AutoImageProcessor, T5EncoderModel, ) from diffusers import ( WanImageToVideoPipeline, WanTransformer3DModel, AutoencoderKL, EulerDiscreteScheduler, ) import gradio as gr import tempfile import numpy as np from PIL import Image import random import gc from diffusers.utils.export_utils import export_to_video from huggingface_hub import hf_hub_download from safetensors.torch import load_file as safetensors_load from torchao.quantization import quantize_ from torchao.quantization import Float8DynamicActivationFloat8WeightConfig from torchao.quantization import Int8WeightOnlyConfig import aoti import subprocess import ffmpeg import os MODEL_ID = "Wan-AI/Wan2.2-I2V-A14B-Diffusers" MAX_DIM = 768 MIN_DIM = 448 SQUARE_DIM = 576 MULTIPLE_OF = 16 MAX_SEED = np.iinfo(np.int32).max FIXED_FPS = 16 MIN_FRAMES_MODEL = 8 MAX_FRAMES_MODEL = 104 MIN_DURATION = round(MIN_FRAMES_MODEL/FIXED_FPS,1) MAX_DURATION = round(MAX_FRAMES_MODEL/FIXED_FPS,1) pipe = WanImageToVideoPipeline.from_pretrained(MODEL_ID, transformer=WanTransformer3DModel.from_pretrained('cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers', subfolder='transformer', torch_dtype=torch.bfloat16, device_map='cuda', ), transformer_2=WanTransformer3DModel.from_pretrained('cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers', subfolder='transformer_2', torch_dtype=torch.bfloat16, device_map='cuda', ), torch_dtype=torch.bfloat16, ).to('cuda') pipe.load_lora_weights( "Kijai/WanVideo_comfy", weight_name="Lightx2v/lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", adapter_name="lightx2v" ) kwargs_lora_h = {} kwargs_lora_h["load_into_transformer"] = True pipe.load_lora_weights( "GiorgioV/LoRA_for_WAN_22", weight_name="I2V_14B_HIGH.safetensors", adapter_name="lora_h", **kwargs_lora_h ) kwargs_lora_gh = {} kwargs_lora_gh["load_into_transformer"] = True pipe.load_lora_weights( "GiorgioV/LoRA_for_WAN_22", weight_name="Wan2.2 - I2V - GH - HIGH 14B.safetensors", adapter_name="lora_gh", **kwargs_lora_gh ) kwargs_lora = {} kwargs_lora["load_into_transformer_2"] = True pipe.load_lora_weights( "Kijai/WanVideo_comfy", weight_name="Lightx2v/lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", adapter_name="lightx2v_2", **kwargs_lora ) kwargs_lora_l = {} kwargs_lora_l["load_into_transformer_2"] = True pipe.load_lora_weights( "GiorgioV/LoRA_for_WAN_22", weight_name="I2V_14B_LOW.safetensors", adapter_name="lora_l", **kwargs_lora_l ) kwargs_lora_gl = {} kwargs_lora_gl["load_into_transformer_2"] = True pipe.load_lora_weights( "GiorgioV/LoRA_for_WAN_22", weight_name="Wan2.2 - I2V - GH - LOW 14B.safetensors", adapter_name="lora_gl", **kwargs_lora_gl ) pipe.set_adapters(["lightx2v", "lora_h", "lora_gh", "lightx2v_2", "lora_l", "lora_gl"], adapter_weights=[1., 1., 1., 1., 1., 1.]) pipe.fuse_lora(adapter_names=["lightx2v"], lora_scale=3., components=["transformer"]) pipe.fuse_lora(adapter_names=["lora_h"], lora_scale=0.3, components=["transformer"]) pipe.fuse_lora(adapter_names=["lora_gh"], lora_scale=0.3, components=["transformer"]) pipe.fuse_lora(adapter_names=["lightx2v_2"], lora_scale=1., components=["transformer_2"]) pipe.fuse_lora(adapter_names=["lora_l"], lora_scale=1., components=["transformer_2"]) pipe.fuse_lora(adapter_names=["lora_gl"], lora_scale=0.8, components=["transformer_2"]) pipe.unload_lora_weights() quantize_(pipe.text_encoder, Int8WeightOnlyConfig()) quantize_(pipe.transformer, Float8DynamicActivationFloat8WeightConfig()) quantize_(pipe.transformer_2, Float8DynamicActivationFloat8WeightConfig()) aoti.aoti_blocks_load(pipe.transformer, 'zerogpu-aoti/Wan2', variant='fp8da') aoti.aoti_blocks_load(pipe.transformer_2, 'zerogpu-aoti/Wan2', variant='fp8da') default_prompt_i2v = "make this image come alive, cinematic motion, smooth animation" default_negative_prompt = "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, 杂乱的背景, 三条腿, 背景人很多, 倒着走" def resize_image(image: Image.Image) -> Image.Image: """ Resizes an image to fit within the model's constraints, preserving aspect ratio as much as possible. """ width, height = image.size # Handle square case if width == height: return image.resize((SQUARE_DIM, SQUARE_DIM), Image.LANCZOS) aspect_ratio = width / height MAX_ASPECT_RATIO = MAX_DIM / MIN_DIM MIN_ASPECT_RATIO = MIN_DIM / MAX_DIM image_to_resize = image if aspect_ratio > MAX_ASPECT_RATIO: # Very wide image -> crop width to fit 832x480 aspect ratio target_w, target_h = MAX_DIM, MIN_DIM crop_width = int(round(height * MAX_ASPECT_RATIO)) left = (width - crop_width) // 2 image_to_resize = image.crop((left, 0, left + crop_width, height)) elif aspect_ratio < MIN_ASPECT_RATIO: # Very tall image -> crop height to fit 480x832 aspect ratio target_w, target_h = MIN_DIM, MAX_DIM crop_height = int(round(width / MIN_ASPECT_RATIO)) top = (height - crop_height) // 2 image_to_resize = image.crop((0, top, width, top + crop_height)) else: if width > height: # Landscape target_w = MAX_DIM target_h = int(round(target_w / aspect_ratio)) else: # Portrait target_h = MAX_DIM target_w = int(round(target_h * aspect_ratio)) final_w = round(target_w / MULTIPLE_OF) * MULTIPLE_OF final_h = round(target_h / MULTIPLE_OF) * MULTIPLE_OF final_w = max(MIN_DIM, min(MAX_DIM, final_w)) final_h = max(MIN_DIM, min(MAX_DIM, final_h)) return image_to_resize.resize((final_w, final_h), Image.LANCZOS) def get_num_frames(duration_seconds: float): frames = int(round(duration_seconds * FIXED_FPS)) frames = int(np.clip(frames, MIN_FRAMES_MODEL, MAX_FRAMES_MODEL)) # ✅ contrainte WAN : (frames - 1) divisible par 4 remainder = (frames - 1) % 4 if remainder != 0: frames = frames - remainder return frames def get_duration( input_image, prompt, steps, negative_prompt, duration_seconds, guidance_scale, guidance_scale_2, seed, randomize_seed, progress, ): BASE_FRAMES_HEIGHT_WIDTH = 81 * 832 * 624 BASE_STEP_DURATION = 12 width, height = resize_image(input_image).size frames = get_num_frames(duration_seconds) factor = frames * width * height / BASE_FRAMES_HEIGHT_WIDTH step_duration = BASE_STEP_DURATION * factor return 10 + int(steps) * step_duration @spaces.GPU(duration=get_duration) def generate_video( input_image, prompt, steps = 4, negative_prompt=default_negative_prompt, duration_seconds = MAX_DURATION, guidance_scale = 1, guidance_scale_2 = 1, seed = 42, randomize_seed = False, progress=gr.Progress(track_tqdm=True), ): """ Generate a video from an input image using the Wan 2.2 14B I2V model with Lightning LoRA. This function takes an input image and generates a video animation based on the provided prompt and parameters. It uses an FP8 qunatized Wan 2.2 14B Image-to-Video model in with Lightning LoRA for fast generation in 4-8 steps. Args: input_image (PIL.Image): The input image to animate. Will be resized to target dimensions. prompt (str): Text prompt describing the desired animation or motion. steps (int, optional): Number of inference steps. More steps = higher quality but slower. Defaults to 4. Range: 1-30. negative_prompt (str, optional): Negative prompt to avoid unwanted elements. Defaults to default_negative_prompt (contains unwanted visual artifacts). duration_seconds (float, optional): Duration of the generated video in seconds. Defaults to 2. Clamped between MIN_FRAMES_MODEL/FIXED_FPS and MAX_FRAMES_MODEL/FIXED_FPS. guidance_scale (float, optional): Controls adherence to the prompt. Higher values = more adherence. Defaults to 1.0. Range: 0.0-20.0. guidance_scale_2 (float, optional): Controls adherence to the prompt. Higher values = more adherence. Defaults to 1.0. Range: 0.0-20.0. seed (int, optional): Random seed for reproducible results. Defaults to 42. Range: 0 to MAX_SEED (2147483647). randomize_seed (bool, optional): Whether to use a random seed instead of the provided seed. Defaults to False. progress (gr.Progress, optional): Gradio progress tracker. Defaults to gr.Progress(track_tqdm=True). Returns: tuple: A tuple containing: - video_path (str): Path to the generated video file (.mp4) - current_seed (int): The seed used for generation (useful when randomize_seed=True) Raises: gr.Error: If input_image is None (no image uploaded). Note: - Frame count is calculated as duration_seconds * FIXED_FPS (24) - Output dimensions are adjusted to be multiples of MOD_VALUE (32) - The function uses GPU acceleration via the @spaces.GPU decorator - Generation time varies based on steps and duration (see get_duration function) """ if input_image is None: raise gr.Error("Please upload an input image.") num_frames = get_num_frames(duration_seconds) current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) resized_image = resize_image(input_image) output_frames_list = pipe( image=resized_image, prompt=prompt, negative_prompt=negative_prompt, height=resized_image.height, width=resized_image.width, num_frames=num_frames, guidance_scale=float(guidance_scale), guidance_scale_2=float(guidance_scale_2), num_inference_steps=int(steps), generator=torch.Generator(device="cuda").manual_seed(current_seed), ).frames[0] with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: video_path = tmpfile.name export_to_video(output_frames_list, video_path, fps=FIXED_FPS) if check_ffmpeg(): try: # Создаем временный файл для видео с звуком with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as audio_tmpfile: video_with_audio_path = audio_tmpfile.name # Команда ffmpeg для добавления тихого аудио cmd = [ 'ffmpeg', '-f', 'lavfi', '-i', 'anullsrc=channel_layout=stereo:sample_rate=44100', '-i', video_path, '-c:v', 'copy', '-c:a', 'aac', '-shortest', '-y', video_with_audio_path ] # Запускаем ffmpeg subprocess.run(cmd, capture_output=True, check=True) # Создаем временный файл для заблюренного видео with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as blur_tmpfile: blurred_video_path = blur_tmpfile.name # Команда ffmpeg для создания гауссова размытия cmd_blur = [ 'ffmpeg', '-i', video_with_audio_path, # Используем видео с аудио как источник '-vf', 'gblur=sigma=25', # Гауссово размытие с sigma=5 '-c:a', 'copy', # Копируем аудио без изменений '-y', blurred_video_path ] # Запускаем ffmpeg для создания блюра subprocess.run(cmd_blur, capture_output=True, check=True) # Удаляем исходный файл без звука os.unlink(video_path) return video_with_audio_path, blurred_video_path, current_seed except Exception as e: print(f"Error adding audio: {e}") # В случае ошибки возвращаем видео без звука return video_path, video_path, current_seed else: print("FFmpeg not available, returning video without audio") return video_path, video_path, current_seed with gr.Blocks() as demo: gr.Markdown("# Fast 4 steps Wan 2.2 I2V (14B) with Lightning LoRA") gr.Markdown("run Wan 2.2 in just 4-8 steps, with [Lightning LoRA](https://huggingface.co/Kijai/WanVideo_comfy/tree/main/Wan22-Lightning), fp8 quantization & AoT compilation - compatible with 🧨 diffusers and ZeroGPU⚡️") with gr.Row(): with gr.Column(): input_image_component = gr.Image(type="pil", label="Input Image") prompt_input = gr.Textbox(label="Prompt", value=default_prompt_i2v) duration_seconds_input = gr.Slider(minimum=MIN_DURATION, maximum=MAX_DURATION, step=0.1, value=3.5, label="Duration (seconds)", info=f"Clamped to model's {MIN_FRAMES_MODEL}-{MAX_FRAMES_MODEL} frames at {FIXED_FPS}fps.") with gr.Accordion("Advanced Settings", open=False): negative_prompt_input = gr.Textbox(label="Negative Prompt", value=default_negative_prompt, lines=3) seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, interactive=True) randomize_seed_checkbox = gr.Checkbox(label="Randomize seed", value=True, interactive=True) steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=6, label="Inference Steps") guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale - high noise stage") guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale 2 - low noise stage") generate_button = gr.Button("Generate Video", variant="primary") with gr.Column(): video_output_1 = gr.Video(label="Generated Video", autoplay=True, interactive=False) video_output_2 = gr.Video(label="Generated Video", autoplay=True, interactive=False) ui_inputs = [ input_image_component, prompt_input, steps_slider, negative_prompt_input, duration_seconds_input, guidance_scale_input, guidance_scale_2_input, seed_input, randomize_seed_checkbox ] generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output_1, video_output_2, seed_input]) def check_ffmpeg(): try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) return True except (subprocess.CalledProcessError, FileNotFoundError): return False if __name__ == "__main__": demo.queue().launch(mcp_server=True)