from __future__ import annotations import asyncio import json import os import pathlib import random import re import shutil import subprocess import sys import tempfile import time import uuid from typing import Any from PIL import Image as _PILImage _TARGET_MP = 848 * 480 ASPECT_PRESETS: dict[str, tuple[int, int]] = { "16:9": (848, 480), "9:16": (480, 848), "1:1": (640, 640), "4:3": (736, 560), "3:4": (560, 736), } def _compute_dims(aspect_ratio: str, first_image_path: str | None, use_ref: bool) -> tuple[int, int]: if use_ref and first_image_path and os.path.exists(first_image_path): try: with _PILImage.open(first_image_path) as im: iw, ih = im.size ratio = iw / ih h = round((_TARGET_MP / ratio) ** 0.5 / 16) * 16 w = round((_TARGET_MP / h / 16)) * 16 h = max(16, h) w = max(16, w) return w, h except Exception: pass return ASPECT_PRESETS.get(aspect_ratio, (848, 480)) def _setup_cuda_lib_path() -> None: candidates = [ "/cuda-image/usr/local/cuda-13.0/targets/x86_64-linux/lib", "/cuda-image/usr/local/cuda-13.0/lib64", "/usr/local/cuda-13.0/targets/x86_64-linux/lib", "/usr/local/cuda-13.0/lib64", "/usr/local/cuda/targets/x86_64-linux/lib", "/usr/local/cuda/lib64", ] for base in ("/cuda-image/usr/local", "/usr/local"): bp = pathlib.Path(base) if bp.exists(): for found in bp.rglob("libcudart.so.13*"): candidates.insert(0, str(found.parent)) break for p in candidates: if pathlib.Path(p).is_dir(): cur = os.environ.get("LD_LIBRARY_PATH", "") os.environ["LD_LIBRARY_PATH"] = f"{p}:{cur}" print(f"[cuda] LD_LIBRARY_PATH set to include {p}", flush=True) return print("[cuda] WARNING: could not find libcudart.so.13 directory", flush=True) _setup_cuda_lib_path() import gradio as gr import spaces import torch ROOT = pathlib.Path(__file__).resolve().parent COMFY = ROOT / "ComfyUI" MODELS = COMFY / "models" INPUT = COMFY / "input" OUTPUT = COMFY / "output" WORKFLOW_FILE = "bernini_r2v_base.json" COMFY_COMMIT = "4e1f7cb1db1c26bb9ee61cf1875776517e2abae8" CUSTOM_NODES = [ ("bernini_chunk", None), ("ComfyUI-WanVideoWrapper", "https://github.com/kijai/ComfyUI-WanVideoWrapper.git"), ("ComfyUI-KJNodes", "https://github.com/kijai/ComfyUI-KJNodes.git"), ("ComfyUI-RH-Bernini", "https://github.com/RH-RunningHub/ComfyUI-RH-Bernini.git"), ("ComfyUI-VideoHelperSuite", "https://github.com/Kosinkadink/ComfyUI-VideoHelperSuite.git"), ("ComfyUI-Frame-Interpolation", "https://github.com/Fannovel16/ComfyUI-Frame-Interpolation.git"), ] DOWNLOADS = [ { "repo": "Comfy-Org/Bernini-R", "file": "diffusion_models/wan2.2_bernini_r_high_noise_fp8_scaled.safetensors", "dest": MODELS / "diffusion_models" / "Wan22_Bernini_HIGH_fp8_e4m3fn_scaled.safetensors", "label": "bernini HIGH fp8", }, { "repo": "Comfy-Org/Bernini-R", "file": "diffusion_models/wan2.2_bernini_r_low_noise_fp8_scaled.safetensors", "dest": MODELS / "diffusion_models" / "Wan22_Bernini_LOW_fp8_e4m3fn_scaled.safetensors", "label": "bernini LOW fp8", }, { "repo": "Osrivers/nsfw_wan_umt5-xxl_fp8_scaled.safetensors", "file": "nsfw_wan_umt5-xxl_fp8_scaled.safetensors", "dest": MODELS / "text_encoders" / "umt5_xxl_fp8_e4m3fn_scaled.safetensors", "label": "t5 text encoder fp8", }, { "repo": "Comfy-Org/Wan_2.1_ComfyUI_repackaged", "file": "split_files/vae/wan_2.1_vae.safetensors", "dest": MODELS / "vae" / "wan_2.1_vae.safetensors", "label": "wan vae", }, { "repo": "Kijai/WanVideo_comfy", "file": "Lightx2v/lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank64_bf16.safetensors", "dest": MODELS / "loras" / "lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank64_bf16.safetensors", "label": "lightx2v lora", }, { "repo": "Comfy-Org/Wan_2.1_ComfyUI_repackaged", "file": "split_files/clip_vision/clip_vision_h.safetensors", "dest": MODELS / "clip_vision" / "clip_vision_h.safetensors", "label": "clip vision h", }, { "repo": "signsur4739379373/archive", "file": "wan22/wamu_v3_lightning_lora_high_noise_r128.safetensors", "dest": MODELS / "loras" / "wamu_v3_lora_high_noise_r128.safetensors", "label": "wamu high lora", }, { "repo": "signsur4739379373/archive", "file": "wan22/wamu_v3_lightning_lora_low_noise_r128.safetensors", "dest": MODELS / "loras" / "wamu_v3_lora_low_noise_r128.safetensors", "label": "wamu low lora", }, { "repo": "Kijai/WanVideo_comfy", "file": "Lightx2v/lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", "dest": MODELS / "loras" / "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", "label": "lightx2v I2V lora", }, { "repo": "yeqiu168182/DR34ML4Y_I2V_14B_HIGH", "file": "DR34ML4Y_I2V_14B_HIGH.safetensors", "dest": MODELS / "loras" / "DR34ML4Y_I2V_14B_HIGH.safetensors", "label": "dreamly high lora", }, { "repo": "yeqiu168182/DR34ML4Y_I2V_14B_LOW", "file": "DR34ML4Y_I2V_14B_LOW.safetensors", "dest": MODELS / "loras" / "DR34ML4Y_I2V_14B_LOW.safetensors", "label": "dreamly low lora", }, { "repo": "signsur4739379373/ipnc_antiloras_archive", "file": "Wan22_BerniniR_DR34ML4Y_HIGH_Rank1_IPNC.safetensors", "dest": MODELS / "loras" / "Wan22_BerniniR_DR34ML4Y_HIGH_Rank1_IPNC.safetensors", "label": "dreamly high IPNC", }, { "repo": "signsur4739379373/ipnc_antiloras_archive", "file": "Wan22_BerniniR_DR34ML4Y_LOW_Rank1_IPNC.safetensors", "dest": MODELS / "loras" / "Wan22_BerniniR_DR34ML4Y_LOW_Rank1_IPNC.safetensors", "label": "dreamly low IPNC", }, { "repo": "signsur4739379373/ipnc_antiloras_archive", "file": "Wan22_BerniniR_wamu_v3_HIGH_Rank1_IPNC.safetensors", "dest": MODELS / "loras" / "Wan22_BerniniR_wamu_v3_HIGH_Rank1_IPNC.safetensors", "label": "wamu high IPNC", }, { "repo": "signsur4739379373/ipnc_antiloras_archive", "file": "Wan22_BerniniR_wamu_v3_LOW_Rank1_IPNC.safetensors", "dest": MODELS / "loras" / "Wan22_BerniniR_wamu_v3_LOW_Rank1_IPNC.safetensors", "label": "wamu low IPNC", }, { "repo": "signsur4739379373/archive", "file": "wan22/nsfwsvicamera_lora_high_r128.safetensors", "dest": MODELS / "loras" / "nsfwsvicamera_lora_high_r128.safetensors", "label": "svicamera high lora", }, { "repo": "signsur4739379373/archive", "file": "wan22/nsfwsvicamera_lora_low_r128.safetensors", "dest": MODELS / "loras" / "nsfwsvicamera_lora_low_r128.safetensors", "label": "svicamera low lora", }, { "repo": "signsur4739379373/ipnc_antiloras_archive", "file": "Wan22_BerniniR_nsfwsvicamera_HIGH_Rank1_IPNC.safetensors", "dest": MODELS / "loras" / "Wan22_BerniniR_nsfwsvicamera_HIGH_Rank1_IPNC.safetensors", "label": "svicamera high IPNC", }, { "repo": "signsur4739379373/ipnc_antiloras_archive", "file": "Wan22_BerniniR_nsfwsvicamera_LOW_Rank1_IPNC.safetensors", "dest": MODELS / "loras" / "Wan22_BerniniR_nsfwsvicamera_LOW_Rank1_IPNC.safetensors", "label": "svicamera low IPNC", }, { "repo": "VMTamashii/rife49", "file": "rife49.pth", "dest": COMFY / "custom_nodes" / "ComfyUI-Frame-Interpolation" / "ckpts" / "rife" / "rife49.pth", "label": "rife49 model", }, ] SAVE_BASE = tempfile.mkdtemp(prefix="bernini_comfy_") os.makedirs(SAVE_BASE, exist_ok=True) MAX_SEED = 2_147_483_647 _comfy_ready = False _nodes_ready = False _models_ready = False def _run(cmd: list[str], cwd: pathlib.Path | None = None, check: bool = True) -> subprocess.CompletedProcess: print("[setup]", " ".join(str(x) for x in cmd), flush=True) return subprocess.run(cmd, cwd=str(cwd) if cwd else None, check=check) def _pip_install(args: list[str], check: bool = True) -> None: _run([sys.executable, "-m", "pip", "install", "--no-cache-dir", *args], check=check) def _install_filtered_requirements(req_path: pathlib.Path) -> None: if not req_path.exists(): return blocked = {"torch", "torchvision", "torchaudio", "transformers", "huggingface-hub", "accelerate"} safe: list[str] = [] for line in req_path.read_text(encoding="utf-8", errors="ignore").splitlines(): item = line.strip() if not item or item.startswith("#"): continue low = item.lower().replace("_", "-") package = re.split(r"[<>=!~;\[\s]", low, maxsplit=1)[0] if package in blocked: continue safe.append(item) if safe: _pip_install(safe, check=False) def _ensure_repo(path: pathlib.Path, url: str, commit: str | None = None) -> None: if not path.exists(): _run(["git", "clone", "--depth", "1", url, str(path)]) if commit: _run(["git", "fetch", "--depth", "1", "origin", commit], cwd=path, check=False) _run(["git", "checkout", commit], cwd=path, check=False) def _apply_comfy_utils_namespace_fix() -> None: utils_path = COMFY / "utils" utilities_path = COMFY / "utilities" if utils_path.exists() and not utilities_path.exists(): utils_path.rename(utilities_path) replacements = [ (re.compile(r"(^|\n)(\s*)from utils(\s|\.)"), r"\1\2from utilities\3"), (re.compile(r"(^|\n)(\s*)import utils(\s|\.|$)"), r"\1\2import utilities\3"), ] for path in COMFY.rglob("*.py"): if "__pycache__" in path.parts: continue try: text = path.read_text(encoding="utf-8") except UnicodeDecodeError: continue updated = text for pattern, repl in replacements: updated = pattern.sub(repl, updated) updated = updated.replace("from utils import", "from utilities import") if updated != text: path.write_text(updated, encoding="utf-8") def _download_to_dest(repo: str, file: str, dest: pathlib.Path, token: str | None = None) -> None: from huggingface_hub import hf_hub_download dest.parent.mkdir(parents=True, exist_ok=True) downloaded = hf_hub_download(repo_id=repo, filename=file, token=token) if not dest.exists(): shutil.copy(downloaded, dest) def _ensure_comfy() -> None: global _comfy_ready if _comfy_ready: return _ensure_repo(COMFY, "https://github.com/Comfy-Org/ComfyUI.git", commit=COMFY_COMMIT) _install_filtered_requirements(COMFY / "requirements.txt") custom_root = COMFY / "custom_nodes" custom_root.mkdir(parents=True, exist_ok=True) for name, url in CUSTOM_NODES: node_path = custom_root / name if url is None: # local node — copy from space repo src = ROOT / "custom_nodes" / name if src.exists() and not node_path.exists(): shutil.copytree(str(src), str(node_path)) else: _ensure_repo(node_path, url) _install_filtered_requirements(node_path / "requirements.txt") _apply_comfy_utils_namespace_fix() _comfy_ready = True print("[startup] ComfyUI + custom nodes ready.", flush=True) def _ensure_models() -> None: global _models_ready if _models_ready: return token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") for item in DOWNLOADS: dest = pathlib.Path(item["dest"]) if dest.exists(): continue print(f"[startup] downloading {item['label']}...", flush=True) _download_to_dest(item["repo"], item["file"], dest, token) _models_ready = True print("[startup] all models ready.", flush=True) def _init_comfy_nodes() -> None: global _nodes_ready if _nodes_ready: return comfy_path = str(COMFY) sys.path = [p for p in sys.path if p != comfy_path] sys.path.insert(0, comfy_path) for module_name in list(sys.modules): if module_name == "utils" or module_name.startswith("utils."): del sys.modules[module_name] os.chdir(COMFY) import types as _types, importlib.util as _ilu _ta = _types.ModuleType("torchaudio") _ta.__spec__ = _ilu.spec_from_loader("torchaudio", loader=None) _ta.__version__ = "0.0.0" sys.modules.setdefault("torchaudio", _ta) for _sub in ["torchaudio.functional", "torchaudio.transforms", "torchaudio._extension"]: _m = _types.ModuleType(_sub) _m.__spec__ = _ilu.spec_from_loader(_sub, loader=None) sys.modules.setdefault(_sub, _m) import execution import nodes import server loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) server_instance = server.PromptServer(loop) execution.PromptQueue(server_instance) loop.run_until_complete(nodes.init_extra_nodes()) _nodes_ready = True print("[startup] ComfyUI nodes initialized.", flush=True) import numpy as _np def _compute_split(total_steps, flow_shift=3.0, boundary_ratio=0.9, num_train_timesteps=1000): sigmas = _np.linspace(1, 0, total_steps + 1)[:-1] sigma_shifted = flow_shift * sigmas / (1 + (flow_shift - 1) * sigmas) timesteps = sigma_shifted * (num_train_timesteps - 1) n_above = int(_np.sum(timesteps >= boundary_ratio * num_train_timesteps)) return total_steps - n_above DEFAULT_NEGATIVE = ( "色调艳丽,过曝,静态,场景切换,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰," "最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部," "畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走" ) R2V_TEMPLATE = """You are an expert at writing subject-driven video generation prompts. I'm providing you with: 1. {image_num} reference image(s) of the subject(s) that will appear in the video (referred to as image0, image1, image2, ... in order). 2. An original video description text. Your task is to rewrite the original description into a new format with TWO parts concatenated together: **Part 1 - Short instruction**: A concise sentence describing who the subject(s) from the reference image(s) are, what they look like briefly, where they are, and what key action/motion they perform. Reference the subject(s) using "image0", "image1", etc. to link them to the provided reference images. **Part 2 - Long instruction**: A detailed "Generate a video where..." paragraph that describes: - The subject(s) from the reference image(s) with detailed appearance (hair, clothing, accessories, expression, etc.), referencing them as "the person/man/woman from image0" etc. - The scene/environment in detail (background, lighting, objects, atmosphere). - The motion and actions in a step-by-step temporal sequence (at the start..., then..., after that...). - The motion should remain natural and realistic. Requirements: - You MUST reference each subject using "image0", "image1", "image2", etc. to correspond to the provided reference images in order. - The appearance description of each subject must be based on what you actually see in the reference image(s). Do NOT hallucinate details not visible in the images. - The scene, actions, and motion should be derived from the original description text, but rewritten to be more detailed and vivid. - The output must be entirely in English. - Return ONLY a JSON object with one key: "rewritten_text". The value should be the full rewritten text (short instruction + long instruction concatenated as one string). No extra text. Original description: {original_text} """ def _enhance_prompt_r2v(prompt: str, image_paths: list[str]) -> str | None: """Call grok-4.3 via xAI API to enhance prompt for r2v. Returns None on failure.""" api_key = os.environ.get("XAI_API_KEY") if not api_key: print("[enhancer] XAI_API_KEY not set, skipping enhancement", flush=True) return None try: import base64 import json as _json from openai import OpenAI client = OpenAI(api_key=api_key, base_url="https://api.x.ai/v1") image_num = len(image_paths) user_text = R2V_TEMPLATE.format(image_num=max(image_num, 1), original_text=prompt) content: list = [{"type": "text", "text": user_text}] for i, path in enumerate(image_paths): if not path or not os.path.exists(path): continue with open(path, "rb") as f: b64 = base64.b64encode(f.read()).decode("utf-8") content.append({"type": "text", "text": f"\n[Image {i}]:"}) content.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}) messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": content}, ] resp = client.chat.completions.create( model="grok-4.3", messages=messages, max_completion_tokens=8192, response_format={"type": "json_object"}, ) text = resp.choices[0].message.content or "" enhanced = _json.loads(text).get("rewritten_text", "").strip() if enhanced: print(f"[enhancer] enhanced prompt ({len(enhanced)} chars)", flush=True) return enhanced return None except Exception as e: print(f"[enhancer] failed: {e}", flush=True) return None def _load_workflow() -> dict[str, Any]: wf_path = ROOT / WORKFLOW_FILE return json.loads(wf_path.read_text(encoding="utf-8")) NODE_OUTPUT = 67 def _convert_visual_to_api(visual: dict) -> dict[str, Any]: import nodes as comfy_nodes link_map: dict[int, Any] = {} for link in visual.get("links", []): link_id, src_node, src_slot, *_ = link link_map[int(link_id)] = [str(src_node), src_slot] BYPASS_TYPES = {"PathchSageAttentionKJ"} for node in visual.get("nodes", []): if node.get("type") not in BYPASS_TYPES: continue src_ref = None for inp in node.get("inputs") or []: lid = inp.get("link") if lid is not None and lid in link_map: src_ref = link_map[lid] break if src_ref is None: continue for out in node.get("outputs") or []: for out_lid in (out.get("links") or []): if out_lid in link_map: link_map[out_lid] = list(src_ref) set_sources: dict[str, Any] = {} set_node_sources: dict[int, Any] = {} for node in visual.get("nodes", []): if node.get("type") not in {"SetNode", "SetNodeAny"}: continue name = (node.get("widgets_values") or [""])[0] for inp in node.get("inputs") or []: lid = inp.get("link") if lid in link_map: set_sources[name] = link_map[lid] set_node_sources[int(node["id"])] = link_map[lid] changed = True while changed: changed = False for lid, src in list(link_map.items()): if isinstance(src, list): try: sid = int(src[0]) except (ValueError, TypeError): continue if sid in set_node_sources: rep = set_node_sources[sid] if link_map[lid] != rep: link_map[lid] = rep changed = True for node in visual.get("nodes", []): if node.get("type") not in {"GetNode", "GetNodeAny"}: continue name = (node.get("widgets_values") or [""])[0] if name not in set_sources: continue for lid, src in list(link_map.items()): if isinstance(src, list) and src[0] == str(node["id"]): rep = set_sources[name] if link_map[lid] != rep: link_map[lid] = rep changed = True skip_types = {"Note", "NoteNode", "MarkdownNote", "GetNode", "GetNodeAny", "SetNode", "SetNodeAny", "PathchSageAttentionKJ"} MUTED_MODE = 4 api: dict[str, Any] = {} for node in visual.get("nodes", []): node_id = int(node["id"]) class_type = node.get("type", "") if class_type in skip_types: continue if node.get("mode") == MUTED_MODE: print(f"[workflow] skipping muted node {class_type} id={node_id}", flush=True) continue if class_type not in comfy_nodes.NODE_CLASS_MAPPINGS: print(f"[workflow] skipping unknown node {class_type} id={node_id}", flush=True) continue inputs: dict[str, Any] = {} for inp in node.get("inputs") or []: lid = inp.get("link") if lid is not None and lid in link_map: inputs[inp["name"]] = link_map[lid] widgets = node.get("widgets_values") or [] if widgets: if isinstance(widgets, dict): for k, v in widgets.items(): if k != "videopreview": inputs.setdefault(k, v) else: param_names = [ inp["name"] for inp in node.get("inputs") or [] if isinstance(inp.get("widget"), dict) and inp["widget"].get("name") ] if not param_names: cls = comfy_nodes.NODE_CLASS_MAPPINGS[class_type] try: cls_inp = cls.INPUT_TYPES() for grp in ("required", "optional"): for pname, spec in cls_inp.get(grp, {}).items(): typ = spec[0] if isinstance(spec, (tuple, list)) and spec else spec if isinstance(typ, (list, tuple)) or str(typ).upper() in {"FLOAT", "INT", "STRING", "BOOLEAN", "COMBO"}: param_names.append(pname) except Exception: pass CTRL = {'randomize', 'fixed', 'increment', 'decrement'} wi = 0 for pname in param_names: if wi >= len(widgets): break inputs.setdefault(pname, widgets[wi]) wi += 1 if wi < len(widgets) and isinstance(widgets[wi], str) and widgets[wi] in CTRL: wi += 1 api[str(node_id)] = {"class_type": class_type, "inputs": inputs} return api _LORA_CHAIN = [ { "id": 2003, "chain": "high", "file": "DR34ML4Y_I2V_14B_HIGH.safetensors", "default": 0.3, }, { "id": 2001, "chain": "high", "file": "wamu_v3_lora_high_noise_r128.safetensors", "default": 1.0, }, { "id": 2002, "chain": "high", "file": "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", "default": -1.0, }, { "id": 2004, "chain": "low", "file": "wamu_v3_lora_low_noise_r128.safetensors", "default": 0.5, }, { "id": 2005, "chain": "low", "file": "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", "default": -1.0, }, { "id": 2006, "chain": "low", "file": "DR34ML4Y_I2V_14B_LOW.safetensors", "default": 0.5, }, ] LORA_DEFAULTS = {l["id"]: l["default"] for l in _LORA_CHAIN} RIFE_MULTIPLIERS = {"no rife": 1, "2x rife": 2, "4x rife": 4} def _patch_api_workflow( api: dict[str, Any], prompt: str, negative: str, image_names: list[str], seed: int, width: int, height: int, num_steps: int = 6, num_frames: int = 145, sampler_name: str = "uni_pc", base_fps: int = 15, rife_mode: str = "no rife", loras_enabled: bool = False, lora_strengths: dict[int, float] | None = None, ipnc_enabled: bool = False, ipnc_strengths: dict[str, float] | None = None, ) -> dict[str, Any]: if ipnc_strengths is None: ipnc_strengths = {"wamu_h": 100, "dreamly_h": 100, "wamu_l": 100, "dreamly_l": 100, "svicamera_h": 100, "svicamera_l": 100} def _set(node_id: int, key: str, val: Any) -> None: k = str(node_id) if k in api: api[k]["inputs"][key] = val _set(6, "text", f"You are a helpful assistant specialized in subject-to-video generation. {prompt}") _set(7, "text", negative) _set(112, "value", width) _set(114, "value", height) _set(128, "value", seed) _set(57, "noise_seed", seed) _set(58, "noise_seed", 0) _set(132, "height", height) _set(132, "width", width) _set(132, "length", num_frames) _set(119, "image", image_names[0]) split = _compute_split(num_steps) _set(57, "steps", num_steps) _set(57, "start_at_step", 0) _set(57, "end_at_step", split) _set(57, "sampler_name", sampler_name) _set(58, "steps", num_steps) _set(58, "start_at_step", split) _set(58, "end_at_step", num_steps) _set(58, "sampler_name", sampler_name) for i, name in enumerate(image_names[1:], 1): nid = 5000 + i api[str(nid)] = { "class_type": "LoadImage", "inputs": {"image": name, "upload": "image"}, } api["136"]["inputs"][f"images.image{i}"] = [str(nid), 0] if loras_enabled: strengths = lora_strengths or LORA_DEFAULTS high_src = ["71", 0] low_src = ["56", 0] # IDs 2001-2006: main loras (existing) # IDs 2011-2014: IPNC loras injected after specific mains # Chain HIGH: # DR34ML4Y(2003) -> DR34ML4Y_IPNC(2011) -> wamu(2001) -> # lightx2v_I2V(2002) -> wamu_IPNC(2012) -> lightx2v_T2V(baked) # Chain LOW: # wamu(2004) -> lightx2v_I2V(2005) -> wamu_IPNC(2013) -> # lightx2v_T2V(baked) -> DR34ML4Y(2006) -> DR34ML4Y_IPNC(2014) s_wamu_h_val = strengths.get(2001, 1.0) s_dreamly_h_val = strengths.get(2003, 0.3) s_wamu_l_val = strengths.get(2004, 0.5) s_dreamly_l_val = strengths.get(2006, 0.5) s_svicamera_h_val = strengths.get(2021, 0.0) s_svicamera_l_val = strengths.get(2023, 0.0) def _add_lora(nid, src, fname, strength): api[str(nid)] = { "class_type": "LoraLoaderModelOnly", "inputs": {"model": src, "lora_name": fname, "strength_model": strength}, } return [str(nid), 0] # ── HIGH chain ────────────────────────────────────────────── # DR34ML4Y HIGH high_src = _add_lora(2003, high_src, "DR34ML4Y_I2V_14B_HIGH.safetensors", s_dreamly_h_val) # DR34ML4Y_IPNC HIGH if ipnc_enabled and ipnc_strengths.get("dreamly_h", 0) > 0: ipnc_s = s_dreamly_h_val * ipnc_strengths["dreamly_h"] / 100.0 high_src = _add_lora(2011, high_src, "Wan22_BerniniR_DR34ML4Y_HIGH_Rank1_IPNC.safetensors", ipnc_s) # wamu HIGH high_src = _add_lora(2001, high_src, "wamu_v3_lora_high_noise_r128.safetensors", s_wamu_h_val) # lightx2v_I2V HIGH (negation) high_src = _add_lora(2002, high_src, "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", -s_wamu_h_val) # wamu_IPNC HIGH (after lightx2v_I2V negation) if ipnc_enabled and ipnc_strengths.get("wamu_h", 0) > 0: ipnc_s = s_wamu_h_val * ipnc_strengths["wamu_h"] / 100.0 high_src = _add_lora(2012, high_src, "Wan22_BerniniR_wamu_v3_HIGH_Rank1_IPNC.safetensors", ipnc_s) # svicamera HIGH high_src = _add_lora(2021, high_src, "nsfwsvicamera_lora_high_r128.safetensors", s_svicamera_h_val) # svicamera_IPNC HIGH if ipnc_enabled and ipnc_strengths.get("svicamera_h", 0) > 0: ipnc_s = s_svicamera_h_val * ipnc_strengths["svicamera_h"] / 100.0 high_src = _add_lora(2022, high_src, "Wan22_BerniniR_nsfwsvicamera_HIGH_Rank1_IPNC.safetensors", ipnc_s) # ── LOW chain ─────────────────────────────────────────────── # wamu LOW low_src = _add_lora(2004, low_src, "wamu_v3_lora_low_noise_r128.safetensors", s_wamu_l_val) # lightx2v_I2V LOW (negation) low_src = _add_lora(2005, low_src, "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors", -s_wamu_l_val) # wamu_IPNC LOW (after lightx2v_I2V negation) if ipnc_enabled and ipnc_strengths.get("wamu_l", 0) > 0: ipnc_s = s_wamu_l_val * ipnc_strengths["wamu_l"] / 100.0 low_src = _add_lora(2013, low_src, "Wan22_BerniniR_wamu_v3_LOW_Rank1_IPNC.safetensors", ipnc_s) # DR34ML4Y LOW low_src = _add_lora(2006, low_src, "DR34ML4Y_I2V_14B_LOW.safetensors", s_dreamly_l_val) # DR34ML4Y_IPNC LOW if ipnc_enabled and ipnc_strengths.get("dreamly_l", 0) > 0: ipnc_s = s_dreamly_l_val * ipnc_strengths["dreamly_l"] / 100.0 low_src = _add_lora(2014, low_src, "Wan22_BerniniR_DR34ML4Y_LOW_Rank1_IPNC.safetensors", ipnc_s) # svicamera LOW low_src = _add_lora(2023, low_src, "nsfwsvicamera_lora_low_r128.safetensors", s_svicamera_l_val) # svicamera_IPNC LOW if ipnc_enabled and ipnc_strengths.get("svicamera_l", 0) > 0: ipnc_s = s_svicamera_l_val * ipnc_strengths["svicamera_l"] / 100.0 low_src = _add_lora(2024, low_src, "Wan22_BerniniR_nsfwsvicamera_LOW_Rank1_IPNC.safetensors", ipnc_s) api["63"]["inputs"]["model"] = high_src api["64"]["inputs"]["model"] = low_src multiplier = RIFE_MULTIPLIERS.get(rife_mode, 1) output_fps = base_fps * multiplier if multiplier > 1: api["3001"] = { "class_type": "RIFE VFI", "inputs": { "frames": ["8", 0], "ckpt_name": "rife49.pth", "clear_cache_after_n_frames": 10, "multiplier": multiplier, "fast_mode": True, "ensemble": True, "scale_factor": 1, "dtype": "float32", "torch_compile": False, "batch_size": 1, }, } api["67"]["inputs"]["images"] = ["3001", 0] if "67" in api and isinstance(api["67"]["inputs"].get("frame_rate"), (int, float)): api["67"]["inputs"]["frame_rate"] = output_fps return api def _execute_workflow(api: dict[str, Any], output_node: int | str = None) -> str: import execution, server as comfy_server loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) srv = comfy_server.PromptServer(loop) try: executor = execution.PromptExecutor( srv, cache_type=execution.CacheType.RAM_PRESSURE, cache_args={"lru": 0, "ram": 2.0, "ram_inactive": 8.0}, ) except Exception: executor = execution.PromptExecutor(srv) prompt_id = str(uuid.uuid4()) loop.run_until_complete(executor.execute_async(api, prompt_id, extra_data={}, execute_outputs=[str(output_node if output_node is not None else NODE_OUTPUT)])) if not getattr(executor, "success", True): msgs = getattr(executor, "status_messages", []) msg = msgs[-1] if msgs else "comfy execution failed" raise RuntimeError(str(msg)) for output in (executor.history_result or {}).get("outputs", {}).values(): for items in output.values(): if not isinstance(items, list): continue for item in items: fname = item.get("filename") if isinstance(item, dict) else None if not fname: continue subfolder = item.get("subfolder", "") kind = item.get("type", "output") base = OUTPUT if kind == "output" else COMFY / kind candidate = base / subfolder / fname if subfolder else base / fname if candidate.exists(): return str(candidate) return "" # No video produced (intermediate chunk step — latent saved to /tmp) def _estimate_duration(num_steps, duration_secs, base_fps, aspect_ratio, use_ref_aspect, rife_mode): BASE_FRAMES = 45 BASE_STEP_S = 5.3 STEP_EXP = 1.2 n_frames = max(1, round(float(duration_secs) * int(base_fps))) gen_w, gen_h = ASPECT_PRESETS.get(str(aspect_ratio), (848, 480)) frame_factor = (n_frames * gen_w * gen_h) / (BASE_FRAMES * 848 * 480) step_time = BASE_STEP_S * frame_factor ** STEP_EXP sampler_t = int(num_steps) * step_time swap_t = 12 + 6 * frame_factor ** 0.5 overhead_t = 20 + 8 * max(0, frame_factor - 1) rife_mult = {"2x rife": 2, "4x rife": 4}.get(str(rife_mode), 1) rife_t = n_frames * (rife_mult - 1) * 0.05 if rife_mult > 1 else 0 total = (sampler_t + swap_t + overhead_t + rife_t) * 1.05 return max(30, int(total)) def _get_duration( prompt, negative, image_input, seed, aspect_ratio, use_ref_aspect, gpu_budget, num_steps, duration_secs, sampler_name, base_fps, rife_mode, *args, **kwargs, ): if gpu_budget and int(gpu_budget) > 0: return int(gpu_budget) return _estimate_duration(num_steps, duration_secs, base_fps, aspect_ratio, use_ref_aspect, rife_mode) def _chunk_get_duration( prompt, negative, image_input, seed, aspect_ratio, use_ref_aspect, gpu_budget, num_steps, duration_secs, sampler_name, base_fps, rife_mode, *args, **kwargs, ): """Each chunk is self-contained — request 120s for the GPU.""" if gpu_budget and int(gpu_budget) > 0: return int(gpu_budget) return 120 def _coerce_gallery(g) -> list: paths = [] for item in (g or []): if isinstance(item, str): paths.append(item) elif isinstance(item, dict): p = item.get("path") or item.get("name") if p: paths.append(p) elif isinstance(item, (list, tuple)) and item: p = item[0] if isinstance(p, str): paths.append(p) elif isinstance(p, dict): paths.append(p.get("path","")) return [p for p in paths if p and os.path.exists(p)][:5] @spaces.GPU(duration=_chunk_get_duration) def generate_chunk_handler( prompt: str, negative: str, image_input: Any, seed: int, aspect_ratio: str = "16:9", use_ref_aspect: bool = False, gpu_budget: int = 120, num_steps: int = 6, duration_secs: float = 5.0, sampler_name: str = "uni_pc", base_fps: int = 15, rife_mode: str = "no rife", loras_enabled: bool = False, s_wamu_h: float = 1.0, s_dreamly_h: float = 0.3, s_wamu_l: float = 0.5, s_dreamly_l: float = 0.5, enhance_prompt: bool = False, ipnc_enabled: bool = False, ipnc_wamu_h: float = 100.0, ipnc_dreamly_h: float = 100.0, ipnc_wamu_l: float = 100.0, ipnc_dreamly_l: float = 100.0, s_svicamera_h: float = 0.0, s_svicamera_l: float = 0.0, ipnc_svicamera_h: float = 100.0, ipnc_svicamera_l: float = 100.0, # chunking params session_id: str = "", steps_per_chunk: str = "2", chunk_step: str = "0", progress=gr.Progress(track_tqdm=True), ): """Run ONE denoising step for chunked long-video generation. Returns: (None, "PARTIAL:{session_id}:{next_step}") if more steps remain (video_path, "Done: ...") on the final step """ import traceback as _tb try: if not (prompt or "").strip(): return None, "enter a prompt" final_seed = int(seed) if seed else random.randint(0, MAX_SEED) negative = negative or DEFAULT_NEGATIVE image_paths = _coerce_gallery(image_input) if not image_paths: return None, "upload at least one reference image" INPUT.mkdir(parents=True, exist_ok=True) dest_names = [] for p in image_paths: dn = f"ref_{uuid.uuid4().hex[:8]}_{os.path.basename(p)}" shutil.copy(p, INPUT / dn) dest_names.append(dn) gen_w, gen_h = _compute_dims(str(aspect_ratio), image_paths[0], bool(use_ref_aspect)) n_frames = max(1, round(float(duration_secs) * int(base_fps))) steps = int(num_steps) STEPS_PER_CHUNK = max(1, int(float(steps_per_chunk or 2))) chunk_idx = int(float(chunk_step or 0)) step_start = chunk_idx * STEPS_PER_CHUNK step_end = min(step_start + STEPS_PER_CHUNK, steps) is_final = (step_end == steps) print(f"[chunk] session={session_id} chunk={chunk_idx} steps=[{step_start}-{step_end})/{steps}", flush=True) progress(step_end / steps, desc=f"chunk {chunk_idx+1} — steps {step_start+1}-{step_end}/{steps}") # Load base workflow and patch it identically visual_wf = _load_workflow() api_wf = _convert_visual_to_api(visual_wf) api_wf = _patch_api_workflow( api_wf, prompt, negative, dest_names, final_seed, gen_w, gen_h, num_steps=steps, num_frames=n_frames, sampler_name=str(sampler_name), base_fps=int(base_fps), rife_mode=str(rife_mode), loras_enabled=bool(loras_enabled), lora_strengths={ 2001: float(s_wamu_h), 2002: -float(s_wamu_h), 2003: float(s_dreamly_h), 2004: float(s_wamu_l), 2005: -float(s_wamu_l), 2006: float(s_dreamly_l), 2021: float(s_svicamera_h), 2023: float(s_svicamera_l), }, ipnc_enabled=bool(ipnc_enabled), ipnc_strengths={ "wamu_h": float(ipnc_wamu_h), "dreamly_h": float(ipnc_dreamly_h), "wamu_l": float(ipnc_wamu_l), "dreamly_l": float(ipnc_dreamly_l), "svicamera_h": float(ipnc_svicamera_h), "svicamera_l": float(ipnc_svicamera_l), }, ) split = _compute_split(steps) use_high = (step_start < split) sampler_id = "57" if use_high else "58" # Inject chunk nodes: save/load latent, save/load conditioning # Batch multiple steps per chunk (set by the caller) api_wf[sampler_id]["inputs"]["steps"] = steps api_wf[sampler_id]["inputs"]["start_at_step"] = step_start api_wf[sampler_id]["inputs"]["end_at_step"] = step_end api_wf[sampler_id]["inputs"]["return_with_leftover_noise"] = "disable" if is_final else "enable" if chunk_idx == 0: # first chunk only # Step 0: add noise, save conditioning + latent api_wf[sampler_id]["inputs"]["add_noise"] = "enable" api_wf[sampler_id]["inputs"]["noise_seed"] = final_seed # Save cond from node 132 (BerniniConditioning) to capture full visual context latents # Find the actual original connections for the sampler orig_pos = api_wf[sampler_id]["inputs"]["positive"] orig_neg = api_wf[sampler_id]["inputs"]["negative"] api_wf["901"] = {"class_type": "BerniniChunkSaveCond", "inputs": { "positive": orig_pos, "negative": orig_neg, "session_id": session_id, }} # Save output latent api_wf["902"] = {"class_type": "BerniniChunkSaveLatent", "inputs": { "samples": [sampler_id, 0], "session_id": session_id, }} # On step 0, sampler keeps its original positive/negative connections to node 132 else: # Step k>0: no noise, load saved cond + latent api_wf[sampler_id]["inputs"]["add_noise"] = "disable" # Load saved conditioning (fallback to node 132 if file missing) orig_pos = api_wf[sampler_id]["inputs"]["positive"] orig_neg = api_wf[sampler_id]["inputs"]["negative"] api_wf["901"] = {"class_type": "BerniniChunkLoadCond", "inputs": { "fallback_pos": orig_pos, "fallback_neg": orig_neg, "session_id": session_id, }} # Overwrite sampler inputs with the loaded conditioning api_wf[sampler_id]["inputs"]["positive"] = ["901", 0] api_wf[sampler_id]["inputs"]["negative"] = ["901", 1] # Load previous latent, feed into sampler api_wf["902"] = {"class_type": "BerniniChunkLoadLatent", "inputs": { "fallback": ["132", 2], # fallback to noise latent from conditioning "session_id": session_id, }} api_wf[sampler_id]["inputs"]["latent_image"] = ["902", 0] # Save output latent (for next step, or final decode) api_wf["903"] = {"class_type": "BerniniChunkSaveLatent", "inputs": { "samples": [sampler_id, 0], "session_id": session_id, }} # Output node for this step — determines what the executor returns exec_node = NODE_OUTPUT if is_final else (902 if chunk_idx == 0 else 903) output_video = _execute_workflow(api_wf, exec_node) except Exception as e: tb = _tb.format_exc() print(f"[chunk] EXCEPTION: {tb}", flush=True) return None, f"Generation failed: {type(e).__name__}: {e}" if is_final and output_video: ts = time.strftime("%Y%m%d_%H%M%S") out_path = os.path.join(SAVE_BASE, f"bernini_chunk_{ts}.mp4") shutil.copy(output_video, out_path) return out_path, f"Done: {out_path}" return None, f"PARTIAL:{session_id}:{chunk_idx+1}" @spaces.GPU(duration=_get_duration) def generate_handler( prompt: str, negative: str, image_input: Any, seed: int, aspect_ratio: str = "16:9", use_ref_aspect: bool = False, gpu_budget: int = 0, # 0 = auto-estimate num_steps: int = 6, duration_secs: float = 10.0, sampler_name: str = "uni_pc", base_fps: int = 15, rife_mode: str = "no rife", loras_enabled: bool = False, s_wamu_h: float = 1.0, s_dreamly_h: float = 0.3, s_wamu_l: float = 0.5, s_dreamly_l: float = 0.5, enhance_prompt: bool = False, ipnc_enabled: bool = False, ipnc_wamu_h: float = 100.0, ipnc_dreamly_h: float = 100.0, ipnc_wamu_l: float = 100.0, ipnc_dreamly_l: float = 100.0, s_svicamera_h: float = 0.0, s_svicamera_l: float = 0.0, ipnc_svicamera_h: float = 100.0, ipnc_svicamera_l: float = 100.0, progress=gr.Progress(track_tqdm=True), ): import traceback as _tb try: if not (prompt or "").strip(): return None, "enter a prompt" final_seed = int(seed) if seed else random.randint(0, MAX_SEED) negative = negative or DEFAULT_NEGATIVE image_paths = _coerce_gallery(image_input) if not image_paths: return None, "upload at least one reference image" INPUT.mkdir(parents=True, exist_ok=True) dest_names = [] for p in image_paths: dn = f"ref_{uuid.uuid4().hex[:8]}_{os.path.basename(p)}" shutil.copy(p, INPUT / dn) dest_names.append(dn) gen_w, gen_h = _compute_dims(str(aspect_ratio), image_paths[0], bool(use_ref_aspect)) enhanced_prompt_text = None if enhance_prompt: progress(0.05, desc="enhancing prompt...") enhanced_prompt_text = _enhance_prompt_r2v(prompt, image_paths) if enhanced_prompt_text: prompt = enhanced_prompt_text progress(0.1, desc="building workflow...") visual_wf = _load_workflow() api_wf = _convert_visual_to_api(visual_wf) api_wf = _patch_api_workflow( api_wf, prompt, negative, dest_names, final_seed, gen_w, gen_h, num_steps=int(num_steps), num_frames=max(1, round(float(duration_secs) * int(base_fps))), sampler_name=str(sampler_name), base_fps=int(base_fps), rife_mode=str(rife_mode), loras_enabled=bool(loras_enabled), lora_strengths={ 2001: float(s_wamu_h), 2002: -float(s_wamu_h), 2003: float(s_dreamly_h), 2004: float(s_wamu_l), 2005: -float(s_wamu_l), 2006: float(s_dreamly_l), 2021: float(s_svicamera_h), 2023: float(s_svicamera_l), }, ipnc_enabled=bool(ipnc_enabled), ipnc_strengths={ "wamu_h": float(ipnc_wamu_h), "dreamly_h": float(ipnc_dreamly_h), "wamu_l": float(ipnc_wamu_l), "dreamly_l": float(ipnc_dreamly_l), "svicamera_h": float(ipnc_svicamera_h), "svicamera_l": float(ipnc_svicamera_l), }, ) progress(0.3, desc="generating...") output_video = _execute_workflow(api_wf) except Exception as e: tb = _tb.format_exc() print(f"[generate] EXCEPTION: {tb}", flush=True) return None, f"Generation failed: {type(e).__name__}: {e}" ts = time.strftime("%Y%m%d_%H%M%S") out_path = os.path.join(SAVE_BASE, f"r2v_{ts}.mp4") shutil.copy(output_video, out_path) status = f"Done: {out_path}" if enhanced_prompt_text: status += f"\n\n--- Enhanced prompt ---\n{enhanced_prompt_text}" return out_path, status with gr.Blocks(title="Bernini-R Wan 2.2 R2V Lightning") as demo: gr.Markdown("# Bernini-R Wan 2.2 R2V Lightning") with gr.Row(): with gr.Column(scale=1): image_input = gr.Gallery( label="Reference images (up to 5)", columns=5, type="filepath", height=160, ) prompt = gr.Textbox( label="Prompt", lines=3, placeholder="Describe the subject's action in detail. Reference uploaded images by name (e.g. image0, image1). Example: the person in image0, wearing the outfit from image1, walks confidently through a busy train station.", value="Keeping the exact identity and appearance the same as in image0, the person in image0 dances in a supermarket.", ) with gr.Accordion("Negative prompt", open=False): negative = gr.Textbox( label="", lines=2, value=DEFAULT_NEGATIVE, ) with gr.Group(): aspect_ratio = gr.Radio( choices=list(ASPECT_PRESETS.keys()), value="16:9", label="Aspect ratio", ) use_ref_aspect = gr.Checkbox( label="use first reference image aspect ratio", value=False, ) with gr.Group(elem_id="loras_9999"): loras_enabled = gr.Checkbox(label="optional loras", value=False) with gr.Column(visible=False) as loras_section: with gr.Accordion("loras", open=True): with gr.Group(): gr.Markdown("
High
") with gr.Row(): s_wamu_h = gr.Slider(-2, 2, value=0.9, step=0.05, label="wamu") s_dreamly_h = gr.Slider(-2, 2, value=0.9, step=0.05, label="dreamly") s_svicamera_h = gr.Slider(-2, 2, value=0.0, step=0.05, label="svicamera") with gr.Group(): gr.Markdown("
Low
") with gr.Row(): s_wamu_l = gr.Slider(-2, 2, value=0.5, step=0.05, label="wamu") s_dreamly_l = gr.Slider(-2, 2, value=0.7, step=0.05, label="dreamly") s_svicamera_l = gr.Slider(-2, 2, value=0.0, step=0.05, label="svicamera") gr.Markdown("
") ipnc_enabled = gr.Checkbox(label="IPNC", value=False) with gr.Column(visible=False) as ipnc_section: with gr.Group(): gr.Markdown("
High
") with gr.Row(): ipnc_wamu_h = gr.Slider(0, 200, value=100, step=1, label="wamu") ipnc_dreamly_h = gr.Slider(0, 200, value=100, step=1, label="dreamly") ipnc_svicamera_h = gr.Slider(0, 200, value=100, step=1, label="svicamera") with gr.Group(): gr.Markdown("
Low
") with gr.Row(): ipnc_wamu_l = gr.Slider(0, 200, value=100, step=1, label="wamu") ipnc_dreamly_l = gr.Slider(0, 200, value=100, step=1, label="dreamly") ipnc_svicamera_l = gr.Slider(0, 200, value=100, step=1, label="svicamera") loras_enabled.change( fn=lambda x: gr.update(visible=x), inputs=loras_enabled, outputs=loras_section, ) ipnc_enabled.change( fn=lambda x: gr.update(visible=x), inputs=ipnc_enabled, outputs=ipnc_section, ) with gr.Row(): seed = gr.Number(value=0, precision=0, label="Seed (0=random)") gpu_budget = gr.Slider(0, 540, value=0, step=10, label="ZeroGPU budget (0=auto)") with gr.Row(): num_steps = gr.Slider(4, 20, value=6, step=1, label="Steps") duration_secs = gr.Slider(1, 25, value=5, step=0.5, label="Duration (s)") sampler_name = gr.Dropdown(choices=["uni_pc", "lcm"], value="lcm", label="Sampler") base_fps = gr.Number(value=15, precision=0, label="Combine FPS") rife_mode = gr.Dropdown( choices=["no rife", "2x rife", "4x rife"], value="no rife", label="RIFE interpolation", ) enhance_prompt = gr.Checkbox(label="enhance prompt", value=False) generate_btn = gr.Button("Generate", variant="primary", size="lg") with gr.Column(scale=1): output_video = gr.Video(label="Generated video") output_status = gr.Textbox(label="Status", interactive=False) generate_btn.click( fn=generate_handler, inputs=[ prompt, negative, image_input, seed, aspect_ratio, use_ref_aspect, gpu_budget, num_steps, duration_secs, sampler_name, base_fps, rife_mode, loras_enabled, s_wamu_h, s_dreamly_h, s_wamu_l, s_dreamly_l, enhance_prompt, ipnc_enabled, ipnc_wamu_h, ipnc_dreamly_h, ipnc_wamu_l, ipnc_dreamly_l, s_svicamera_h, s_svicamera_l, ipnc_svicamera_h, ipnc_svicamera_l, ], outputs=[output_video, output_status], ) _chunk_session_id = gr.Textbox(visible=False, value="") _chunk_spc_txt = gr.Textbox(visible=False, value="2") _chunk_step_txt = gr.Textbox(visible=False, value="0") _chunk_btn = gr.Button(visible=False) _chunk_btn.click( fn=generate_chunk_handler, inputs=[ prompt, negative, image_input, seed, aspect_ratio, use_ref_aspect, gpu_budget, num_steps, duration_secs, sampler_name, base_fps, rife_mode, loras_enabled, s_wamu_h, s_dreamly_h, s_wamu_l, s_dreamly_l, enhance_prompt, ipnc_enabled, ipnc_wamu_h, ipnc_dreamly_h, ipnc_wamu_l, ipnc_dreamly_l, s_svicamera_h, s_svicamera_l, ipnc_svicamera_h, ipnc_svicamera_l, _chunk_session_id, _chunk_spc_txt, _chunk_step_txt, ], outputs=[output_video, output_status], api_name="generate_chunk", ) if __name__ == "__main__": _ensure_comfy() _ensure_models() _init_comfy_nodes() demo.queue().launch()