signsur4739379373's picture
duration max 25, steps_per_chunk param
7c6328c
Raw
History Blame Contribute Delete
56.9 kB
from __future__ import annotations
import asyncio
import json
import os
import pathlib
import random
import re
import shutil
import subprocess
import sys
import tempfile
import time
import uuid
from typing import Any
from PIL import Image as _PILImage
_TARGET_MP = 848 * 480
ASPECT_PRESETS: dict[str, tuple[int, int]] = {
"16:9": (848, 480),
"9:16": (480, 848),
"1:1": (640, 640),
"4:3": (736, 560),
"3:4": (560, 736),
}
def _compute_dims(aspect_ratio: str, first_image_path: str | None, use_ref: bool) -> tuple[int, int]:
if use_ref and first_image_path and os.path.exists(first_image_path):
try:
with _PILImage.open(first_image_path) as im:
iw, ih = im.size
ratio = iw / ih
h = round((_TARGET_MP / ratio) ** 0.5 / 16) * 16
w = round((_TARGET_MP / h / 16)) * 16
h = max(16, h)
w = max(16, w)
return w, h
except Exception:
pass
return ASPECT_PRESETS.get(aspect_ratio, (848, 480))
def _setup_cuda_lib_path() -> None:
candidates = [
"/cuda-image/usr/local/cuda-13.0/targets/x86_64-linux/lib",
"/cuda-image/usr/local/cuda-13.0/lib64",
"/usr/local/cuda-13.0/targets/x86_64-linux/lib",
"/usr/local/cuda-13.0/lib64",
"/usr/local/cuda/targets/x86_64-linux/lib",
"/usr/local/cuda/lib64",
]
for base in ("/cuda-image/usr/local", "/usr/local"):
bp = pathlib.Path(base)
if bp.exists():
for found in bp.rglob("libcudart.so.13*"):
candidates.insert(0, str(found.parent))
break
for p in candidates:
if pathlib.Path(p).is_dir():
cur = os.environ.get("LD_LIBRARY_PATH", "")
os.environ["LD_LIBRARY_PATH"] = f"{p}:{cur}"
print(f"[cuda] LD_LIBRARY_PATH set to include {p}", flush=True)
return
print("[cuda] WARNING: could not find libcudart.so.13 directory", flush=True)
_setup_cuda_lib_path()
import gradio as gr
import spaces
import torch
ROOT = pathlib.Path(__file__).resolve().parent
COMFY = ROOT / "ComfyUI"
MODELS = COMFY / "models"
INPUT = COMFY / "input"
OUTPUT = COMFY / "output"
WORKFLOW_FILE = "bernini_r2v_base.json"
COMFY_COMMIT = "4e1f7cb1db1c26bb9ee61cf1875776517e2abae8"
CUSTOM_NODES = [
("bernini_chunk", None),
("ComfyUI-WanVideoWrapper", "https://github.com/kijai/ComfyUI-WanVideoWrapper.git"),
("ComfyUI-KJNodes", "https://github.com/kijai/ComfyUI-KJNodes.git"),
("ComfyUI-RH-Bernini", "https://github.com/RH-RunningHub/ComfyUI-RH-Bernini.git"),
("ComfyUI-VideoHelperSuite", "https://github.com/Kosinkadink/ComfyUI-VideoHelperSuite.git"),
("ComfyUI-Frame-Interpolation", "https://github.com/Fannovel16/ComfyUI-Frame-Interpolation.git"),
]
DOWNLOADS = [
{
"repo": "Comfy-Org/Bernini-R",
"file": "diffusion_models/wan2.2_bernini_r_high_noise_fp8_scaled.safetensors",
"dest": MODELS / "diffusion_models" / "Wan22_Bernini_HIGH_fp8_e4m3fn_scaled.safetensors",
"label": "bernini HIGH fp8",
},
{
"repo": "Comfy-Org/Bernini-R",
"file": "diffusion_models/wan2.2_bernini_r_low_noise_fp8_scaled.safetensors",
"dest": MODELS / "diffusion_models" / "Wan22_Bernini_LOW_fp8_e4m3fn_scaled.safetensors",
"label": "bernini LOW fp8",
},
{
"repo": "Osrivers/nsfw_wan_umt5-xxl_fp8_scaled.safetensors",
"file": "nsfw_wan_umt5-xxl_fp8_scaled.safetensors",
"dest": MODELS / "text_encoders" / "umt5_xxl_fp8_e4m3fn_scaled.safetensors",
"label": "t5 text encoder fp8",
},
{
"repo": "Comfy-Org/Wan_2.1_ComfyUI_repackaged",
"file": "split_files/vae/wan_2.1_vae.safetensors",
"dest": MODELS / "vae" / "wan_2.1_vae.safetensors",
"label": "wan vae",
},
{
"repo": "Kijai/WanVideo_comfy",
"file": "Lightx2v/lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank64_bf16.safetensors",
"dest": MODELS / "loras" / "lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank64_bf16.safetensors",
"label": "lightx2v lora",
},
{
"repo": "Comfy-Org/Wan_2.1_ComfyUI_repackaged",
"file": "split_files/clip_vision/clip_vision_h.safetensors",
"dest": MODELS / "clip_vision" / "clip_vision_h.safetensors",
"label": "clip vision h",
},
{
"repo": "signsur4739379373/archive",
"file": "wan22/wamu_v3_lightning_lora_high_noise_r128.safetensors",
"dest": MODELS / "loras" / "wamu_v3_lora_high_noise_r128.safetensors",
"label": "wamu high lora",
},
{
"repo": "signsur4739379373/archive",
"file": "wan22/wamu_v3_lightning_lora_low_noise_r128.safetensors",
"dest": MODELS / "loras" / "wamu_v3_lora_low_noise_r128.safetensors",
"label": "wamu low lora",
},
{
"repo": "Kijai/WanVideo_comfy",
"file": "Lightx2v/lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors",
"dest": MODELS / "loras" / "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors",
"label": "lightx2v I2V lora",
},
{
"repo": "yeqiu168182/DR34ML4Y_I2V_14B_HIGH",
"file": "DR34ML4Y_I2V_14B_HIGH.safetensors",
"dest": MODELS / "loras" / "DR34ML4Y_I2V_14B_HIGH.safetensors",
"label": "dreamly high lora",
},
{
"repo": "yeqiu168182/DR34ML4Y_I2V_14B_LOW",
"file": "DR34ML4Y_I2V_14B_LOW.safetensors",
"dest": MODELS / "loras" / "DR34ML4Y_I2V_14B_LOW.safetensors",
"label": "dreamly low lora",
},
{
"repo": "signsur4739379373/ipnc_antiloras_archive",
"file": "Wan22_BerniniR_DR34ML4Y_HIGH_Rank1_IPNC.safetensors",
"dest": MODELS / "loras" / "Wan22_BerniniR_DR34ML4Y_HIGH_Rank1_IPNC.safetensors",
"label": "dreamly high IPNC",
},
{
"repo": "signsur4739379373/ipnc_antiloras_archive",
"file": "Wan22_BerniniR_DR34ML4Y_LOW_Rank1_IPNC.safetensors",
"dest": MODELS / "loras" / "Wan22_BerniniR_DR34ML4Y_LOW_Rank1_IPNC.safetensors",
"label": "dreamly low IPNC",
},
{
"repo": "signsur4739379373/ipnc_antiloras_archive",
"file": "Wan22_BerniniR_wamu_v3_HIGH_Rank1_IPNC.safetensors",
"dest": MODELS / "loras" / "Wan22_BerniniR_wamu_v3_HIGH_Rank1_IPNC.safetensors",
"label": "wamu high IPNC",
},
{
"repo": "signsur4739379373/ipnc_antiloras_archive",
"file": "Wan22_BerniniR_wamu_v3_LOW_Rank1_IPNC.safetensors",
"dest": MODELS / "loras" / "Wan22_BerniniR_wamu_v3_LOW_Rank1_IPNC.safetensors",
"label": "wamu low IPNC",
},
{
"repo": "signsur4739379373/archive",
"file": "wan22/nsfwsvicamera_lora_high_r128.safetensors",
"dest": MODELS / "loras" / "nsfwsvicamera_lora_high_r128.safetensors",
"label": "svicamera high lora",
},
{
"repo": "signsur4739379373/archive",
"file": "wan22/nsfwsvicamera_lora_low_r128.safetensors",
"dest": MODELS / "loras" / "nsfwsvicamera_lora_low_r128.safetensors",
"label": "svicamera low lora",
},
{
"repo": "signsur4739379373/ipnc_antiloras_archive",
"file": "Wan22_BerniniR_nsfwsvicamera_HIGH_Rank1_IPNC.safetensors",
"dest": MODELS / "loras" / "Wan22_BerniniR_nsfwsvicamera_HIGH_Rank1_IPNC.safetensors",
"label": "svicamera high IPNC",
},
{
"repo": "signsur4739379373/ipnc_antiloras_archive",
"file": "Wan22_BerniniR_nsfwsvicamera_LOW_Rank1_IPNC.safetensors",
"dest": MODELS / "loras" / "Wan22_BerniniR_nsfwsvicamera_LOW_Rank1_IPNC.safetensors",
"label": "svicamera low IPNC",
},
{
"repo": "VMTamashii/rife49",
"file": "rife49.pth",
"dest": COMFY / "custom_nodes" / "ComfyUI-Frame-Interpolation" / "ckpts" / "rife" / "rife49.pth",
"label": "rife49 model",
},
]
SAVE_BASE = tempfile.mkdtemp(prefix="bernini_comfy_")
os.makedirs(SAVE_BASE, exist_ok=True)
MAX_SEED = 2_147_483_647
_comfy_ready = False
_nodes_ready = False
_models_ready = False
def _run(cmd: list[str], cwd: pathlib.Path | None = None, check: bool = True) -> subprocess.CompletedProcess:
print("[setup]", " ".join(str(x) for x in cmd), flush=True)
return subprocess.run(cmd, cwd=str(cwd) if cwd else None, check=check)
def _pip_install(args: list[str], check: bool = True) -> None:
_run([sys.executable, "-m", "pip", "install", "--no-cache-dir", *args], check=check)
def _install_filtered_requirements(req_path: pathlib.Path) -> None:
if not req_path.exists():
return
blocked = {"torch", "torchvision", "torchaudio", "transformers", "huggingface-hub", "accelerate"}
safe: list[str] = []
for line in req_path.read_text(encoding="utf-8", errors="ignore").splitlines():
item = line.strip()
if not item or item.startswith("#"):
continue
low = item.lower().replace("_", "-")
package = re.split(r"[<>=!~;\[\s]", low, maxsplit=1)[0]
if package in blocked:
continue
safe.append(item)
if safe:
_pip_install(safe, check=False)
def _ensure_repo(path: pathlib.Path, url: str, commit: str | None = None) -> None:
if not path.exists():
_run(["git", "clone", "--depth", "1", url, str(path)])
if commit:
_run(["git", "fetch", "--depth", "1", "origin", commit], cwd=path, check=False)
_run(["git", "checkout", commit], cwd=path, check=False)
def _apply_comfy_utils_namespace_fix() -> None:
utils_path = COMFY / "utils"
utilities_path = COMFY / "utilities"
if utils_path.exists() and not utilities_path.exists():
utils_path.rename(utilities_path)
replacements = [
(re.compile(r"(^|\n)(\s*)from utils(\s|\.)"), r"\1\2from utilities\3"),
(re.compile(r"(^|\n)(\s*)import utils(\s|\.|$)"), r"\1\2import utilities\3"),
]
for path in COMFY.rglob("*.py"):
if "__pycache__" in path.parts:
continue
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
continue
updated = text
for pattern, repl in replacements:
updated = pattern.sub(repl, updated)
updated = updated.replace("from utils import", "from utilities import")
if updated != text:
path.write_text(updated, encoding="utf-8")
def _download_to_dest(repo: str, file: str, dest: pathlib.Path, token: str | None = None) -> None:
from huggingface_hub import hf_hub_download
dest.parent.mkdir(parents=True, exist_ok=True)
downloaded = hf_hub_download(repo_id=repo, filename=file, token=token)
if not dest.exists():
shutil.copy(downloaded, dest)
def _ensure_comfy() -> None:
global _comfy_ready
if _comfy_ready:
return
_ensure_repo(COMFY, "https://github.com/Comfy-Org/ComfyUI.git", commit=COMFY_COMMIT)
_install_filtered_requirements(COMFY / "requirements.txt")
custom_root = COMFY / "custom_nodes"
custom_root.mkdir(parents=True, exist_ok=True)
for name, url in CUSTOM_NODES:
node_path = custom_root / name
if url is None:
# local node — copy from space repo
src = ROOT / "custom_nodes" / name
if src.exists() and not node_path.exists():
shutil.copytree(str(src), str(node_path))
else:
_ensure_repo(node_path, url)
_install_filtered_requirements(node_path / "requirements.txt")
_apply_comfy_utils_namespace_fix()
_comfy_ready = True
print("[startup] ComfyUI + custom nodes ready.", flush=True)
def _ensure_models() -> None:
global _models_ready
if _models_ready:
return
token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN")
for item in DOWNLOADS:
dest = pathlib.Path(item["dest"])
if dest.exists():
continue
print(f"[startup] downloading {item['label']}...", flush=True)
_download_to_dest(item["repo"], item["file"], dest, token)
_models_ready = True
print("[startup] all models ready.", flush=True)
def _init_comfy_nodes() -> None:
global _nodes_ready
if _nodes_ready:
return
comfy_path = str(COMFY)
sys.path = [p for p in sys.path if p != comfy_path]
sys.path.insert(0, comfy_path)
for module_name in list(sys.modules):
if module_name == "utils" or module_name.startswith("utils."):
del sys.modules[module_name]
os.chdir(COMFY)
import types as _types, importlib.util as _ilu
_ta = _types.ModuleType("torchaudio")
_ta.__spec__ = _ilu.spec_from_loader("torchaudio", loader=None)
_ta.__version__ = "0.0.0"
sys.modules.setdefault("torchaudio", _ta)
for _sub in ["torchaudio.functional", "torchaudio.transforms", "torchaudio._extension"]:
_m = _types.ModuleType(_sub)
_m.__spec__ = _ilu.spec_from_loader(_sub, loader=None)
sys.modules.setdefault(_sub, _m)
import execution
import nodes
import server
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
server_instance = server.PromptServer(loop)
execution.PromptQueue(server_instance)
loop.run_until_complete(nodes.init_extra_nodes())
_nodes_ready = True
print("[startup] ComfyUI nodes initialized.", flush=True)
import numpy as _np
def _compute_split(total_steps, flow_shift=3.0, boundary_ratio=0.9, num_train_timesteps=1000):
sigmas = _np.linspace(1, 0, total_steps + 1)[:-1]
sigma_shifted = flow_shift * sigmas / (1 + (flow_shift - 1) * sigmas)
timesteps = sigma_shifted * (num_train_timesteps - 1)
n_above = int(_np.sum(timesteps >= boundary_ratio * num_train_timesteps))
return total_steps - n_above
DEFAULT_NEGATIVE = (
"色调艳丽,过曝,静态,场景切换,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,"
"最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,"
"畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走"
)
R2V_TEMPLATE = """You are an expert at writing subject-driven video generation prompts. I'm providing you with:
1. {image_num} reference image(s) of the subject(s) (referred to as image0, image1, ... in order).
2. An original video description text.
Rewrite the description into TWO concatenated parts:
Part 1 - Short: A concise sentence describing who appears (reference as image0/image1/etc.), where, and what key action/motion.
Part 2 - Long: A detailed "Generate a video where..." paragraph with full appearance details referencing each subject as "the person from image0" etc., detailed scene/environment, and step-by-step temporal motion sequence.
Requirements:
- Reference each subject as image0/image1/etc., base appearance on what you see in the images (no hallucination), output entirely in English.
- For every action or scenario described, identify and explicitly state all implied visual elements that are not mentioned but must be present for the scene to exist: who or what else is in the frame, what the subject is wearing or holding that the activity requires, what the environment necessarily contains, any other participants or objects involved.
- For each action, explicitly state all visually relevant body states that are not mentioned but would be visible on camera: hand positions (open, clenched, raised, at side), facial expression, direction of gaze, posture, weight distribution, foot placement. Do not assume any body state is obvious -- state it explicitly.
- The final prompt must be detailed enough that a complete mental image of the scene can be formed without seeing the reference images. Every visual element a film director would need to brief their crew on -- blocking, attire, props, environment, participant positions -- must appear in the text.
- Do not robotically enumerate biomechanics. Write naturally while ensuring no visual element is left implicit.
{extra_rule}
- For any close physical interaction or insertion, you MUST explicitly define the spatial occlusion. State exactly what is physically connected, what is penetrating or entering a space, and what is visually hidden inside the other object/body versus what remains visible outside.
Return ONLY a JSON object with one key: "rewritten_text".
Original description:
{original_text}
"""
ENHANCE_EXTRA_RULE_NOREDESCRIBE = (
"Do not describe the inherent visual appearance of subjects from the reference images "
"(their face, hair, body type, baseline clothing) -- the model already sees those. "
"However, DO describe any scene-specific additions to reference subjects' appearance: "
"equipment, props, or attire added for this scene that would not be present in the reference image."
)
ENHANCE_EXTRA_RULE_DEFAULT = (
"Include full appearance details of reference subjects."
)
def _enhance_prompt_r2v(prompt: str, image_paths: list[str], no_redescribe: bool = False) -> str | None:
"""Call grok-4.3 via xAI API. Direct connection."""
import base64, mimetypes as _mt, json as _json
try:
image_num = len([p for p in image_paths if p and os.path.exists(p)])
extra_rule = ENHANCE_EXTRA_RULE_NOREDESCRIBE if no_redescribe else ENHANCE_EXTRA_RULE_DEFAULT
user_text = R2V_TEMPLATE.format(
image_num=max(image_num, 1),
extra_rule=extra_rule,
original_text=prompt,
)
content_msgs: list = [{"type": "text", "text": user_text}]
for i, path in enumerate(image_paths[:5]):
if not path or not os.path.exists(path):
continue
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
mime, _ = _mt.guess_type(path)
mime = mime or "image/jpeg"
content_msgs.append({"type": "text", "text": f"\\n[Image {i}]:"})
content_msgs.append({"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}})
payload = {
"model": "grok-4.3",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": content_msgs},
],
"reasoning_effort": "xhigh",
"response_format": {"type": "json_object"},
}
import requests as _req
r = _req.post(
"https://api.x.ai/v1/chat/completions",
json=payload,
headers={
"Authorization": f"Bearer {os.environ.get('XAI_API_KEY', '')}",
"Content-Type": "application/json",
},
timeout=300,
)
r.raise_for_status()
text = r.json()["choices"][0]["message"]["content"]
enhanced = _json.loads(text).get("rewritten_text", "").strip()
if enhanced:
print(f"[enhancer] enhanced ({len(enhanced)} chars)", flush=True)
return enhanced
return None
except Exception as e:
print(f"[enhancer] failed: {e}", flush=True)
return None
def _load_workflow() -> dict[str, Any]:
wf_path = ROOT / WORKFLOW_FILE
return json.loads(wf_path.read_text(encoding="utf-8"))
NODE_OUTPUT = 67
def _convert_visual_to_api(visual: dict) -> dict[str, Any]:
import nodes as comfy_nodes
link_map: dict[int, Any] = {}
for link in visual.get("links", []):
link_id, src_node, src_slot, *_ = link
link_map[int(link_id)] = [str(src_node), src_slot]
BYPASS_TYPES = {"PathchSageAttentionKJ"}
for node in visual.get("nodes", []):
if node.get("type") not in BYPASS_TYPES:
continue
src_ref = None
for inp in node.get("inputs") or []:
lid = inp.get("link")
if lid is not None and lid in link_map:
src_ref = link_map[lid]
break
if src_ref is None:
continue
for out in node.get("outputs") or []:
for out_lid in (out.get("links") or []):
if out_lid in link_map:
link_map[out_lid] = list(src_ref)
set_sources: dict[str, Any] = {}
set_node_sources: dict[int, Any] = {}
for node in visual.get("nodes", []):
if node.get("type") not in {"SetNode", "SetNodeAny"}:
continue
name = (node.get("widgets_values") or [""])[0]
for inp in node.get("inputs") or []:
lid = inp.get("link")
if lid in link_map:
set_sources[name] = link_map[lid]
set_node_sources[int(node["id"])] = link_map[lid]
changed = True
while changed:
changed = False
for lid, src in list(link_map.items()):
if isinstance(src, list):
try:
sid = int(src[0])
except (ValueError, TypeError):
continue
if sid in set_node_sources:
rep = set_node_sources[sid]
if link_map[lid] != rep:
link_map[lid] = rep
changed = True
for node in visual.get("nodes", []):
if node.get("type") not in {"GetNode", "GetNodeAny"}:
continue
name = (node.get("widgets_values") or [""])[0]
if name not in set_sources:
continue
for lid, src in list(link_map.items()):
if isinstance(src, list) and src[0] == str(node["id"]):
rep = set_sources[name]
if link_map[lid] != rep:
link_map[lid] = rep
changed = True
skip_types = {"Note", "NoteNode", "MarkdownNote", "GetNode", "GetNodeAny", "SetNode", "SetNodeAny",
"PathchSageAttentionKJ"}
MUTED_MODE = 4
api: dict[str, Any] = {}
for node in visual.get("nodes", []):
node_id = int(node["id"])
class_type = node.get("type", "")
if class_type in skip_types:
continue
if node.get("mode") == MUTED_MODE:
print(f"[workflow] skipping muted node {class_type} id={node_id}", flush=True)
continue
if class_type not in comfy_nodes.NODE_CLASS_MAPPINGS:
print(f"[workflow] skipping unknown node {class_type} id={node_id}", flush=True)
continue
inputs: dict[str, Any] = {}
for inp in node.get("inputs") or []:
lid = inp.get("link")
if lid is not None and lid in link_map:
inputs[inp["name"]] = link_map[lid]
widgets = node.get("widgets_values") or []
if widgets:
if isinstance(widgets, dict):
for k, v in widgets.items():
if k != "videopreview":
inputs.setdefault(k, v)
else:
param_names = [
inp["name"]
for inp in node.get("inputs") or []
if isinstance(inp.get("widget"), dict) and inp["widget"].get("name")
]
if not param_names:
cls = comfy_nodes.NODE_CLASS_MAPPINGS[class_type]
try:
cls_inp = cls.INPUT_TYPES()
for grp in ("required", "optional"):
for pname, spec in cls_inp.get(grp, {}).items():
typ = spec[0] if isinstance(spec, (tuple, list)) and spec else spec
if isinstance(typ, (list, tuple)) or str(typ).upper() in {"FLOAT", "INT", "STRING", "BOOLEAN", "COMBO"}:
param_names.append(pname)
except Exception:
pass
CTRL = {'randomize', 'fixed', 'increment', 'decrement'}
wi = 0
for pname in param_names:
if wi >= len(widgets):
break
inputs.setdefault(pname, widgets[wi])
wi += 1
if wi < len(widgets) and isinstance(widgets[wi], str) and widgets[wi] in CTRL:
wi += 1
api[str(node_id)] = {"class_type": class_type, "inputs": inputs}
return api
_LORA_CHAIN = [
{
"id": 2003,
"chain": "high",
"file": "DR34ML4Y_I2V_14B_HIGH.safetensors",
"default": 0.3,
},
{
"id": 2001,
"chain": "high",
"file": "wamu_v3_lora_high_noise_r128.safetensors",
"default": 1.0,
},
{
"id": 2002,
"chain": "high",
"file": "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors",
"default": -1.0,
},
{
"id": 2004,
"chain": "low",
"file": "wamu_v3_lora_low_noise_r128.safetensors",
"default": 0.5,
},
{
"id": 2005,
"chain": "low",
"file": "lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors",
"default": -1.0,
},
{
"id": 2006,
"chain": "low",
"file": "DR34ML4Y_I2V_14B_LOW.safetensors",
"default": 0.5,
},
]
LORA_DEFAULTS = {l["id"]: l["default"] for l in _LORA_CHAIN}
RIFE_MULTIPLIERS = {"no rife": 1, "2x rife": 2, "4x rife": 4}
def _patch_api_workflow(
api: dict[str, Any],
prompt: str,
negative: str,
image_names: list[str],
seed: int,
width: int,
height: int,
num_steps: int = 6,
num_frames: int = 145,
sampler_name: str = "uni_pc",
base_fps: int = 15,
rife_mode: str = "no rife",
loras_enabled: bool = False,
lora_strengths: dict[int, float] | None = None,
ipnc_enabled: bool = False,
ipnc_strengths: dict[str, float] | None = None,
) -> dict[str, Any]:
if ipnc_strengths is None:
ipnc_strengths = {"wamu_h": 100, "dreamly_h": 100, "wamu_l": 100, "dreamly_l": 100, "svicamera_h": 100, "svicamera_l": 100}
def _set(node_id: int, key: str, val: Any) -> None:
k = str(node_id)
if k in api:
api[k]["inputs"][key] = val
_set(6, "text", f"You are a helpful assistant specialized in subject-to-video generation. {prompt}")
_set(7, "text", negative)
_set(112, "value", width)
_set(114, "value", height)
_set(128, "value", seed)
_set(57, "noise_seed", seed)
_set(58, "noise_seed", 0)
_set(132, "height", height)
_set(132, "width", width)
_set(132, "length", num_frames)
_set(119, "image", image_names[0])
split = _compute_split(num_steps)
_set(57, "steps", num_steps)
_set(57, "start_at_step", 0)
_set(57, "end_at_step", split)
_set(57, "sampler_name", sampler_name)
_set(58, "steps", num_steps)
_set(58, "start_at_step", split)
_set(58, "end_at_step", num_steps)
_set(58, "sampler_name", sampler_name)
for i, name in enumerate(image_names[1:], 1):
nid = 5000 + i
api[str(nid)] = {
"class_type": "LoadImage",
"inputs": {"image": name, "upload": "image"},
}
api["136"]["inputs"][f"images.image{i}"] = [str(nid), 0]
if loras_enabled:
strengths = lora_strengths or LORA_DEFAULTS
high_src = ["71", 0]
low_src = ["56", 0]
# IDs 2001-2006: main loras (existing)
# IDs 2011-2014: IPNC loras injected after specific mains
# Chain HIGH:
# DR34ML4Y(2003) -> DR34ML4Y_IPNC(2011) -> wamu(2001) ->
# lightx2v_I2V(2002) -> wamu_IPNC(2012) -> lightx2v_T2V(baked)
# Chain LOW:
# wamu(2004) -> lightx2v_I2V(2005) -> wamu_IPNC(2013) ->
# lightx2v_T2V(baked) -> DR34ML4Y(2006) -> DR34ML4Y_IPNC(2014)
s_wamu_h_val = strengths.get(2001, 1.0)
s_dreamly_h_val = strengths.get(2003, 0.3)
s_wamu_l_val = strengths.get(2004, 0.5)
s_dreamly_l_val = strengths.get(2006, 0.5)
s_svicamera_h_val = strengths.get(2021, 0.0)
s_svicamera_l_val = strengths.get(2023, 0.0)
def _add_lora(nid, src, fname, strength):
api[str(nid)] = {
"class_type": "LoraLoaderModelOnly",
"inputs": {"model": src, "lora_name": fname,
"strength_model": strength},
}
return [str(nid), 0]
# ── HIGH chain ──────────────────────────────────────────────
# DR34ML4Y HIGH
high_src = _add_lora(2003, high_src,
"DR34ML4Y_I2V_14B_HIGH.safetensors", s_dreamly_h_val)
# DR34ML4Y_IPNC HIGH
if ipnc_enabled and ipnc_strengths.get("dreamly_h", 0) > 0:
ipnc_s = s_dreamly_h_val * ipnc_strengths["dreamly_h"] / 100.0
high_src = _add_lora(2011, high_src,
"Wan22_BerniniR_DR34ML4Y_HIGH_Rank1_IPNC.safetensors", ipnc_s)
# wamu HIGH
high_src = _add_lora(2001, high_src,
"wamu_v3_lora_high_noise_r128.safetensors", s_wamu_h_val)
# lightx2v_I2V HIGH (negation)
high_src = _add_lora(2002, high_src,
"lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors",
-s_wamu_h_val)
# wamu_IPNC HIGH (after lightx2v_I2V negation)
if ipnc_enabled and ipnc_strengths.get("wamu_h", 0) > 0:
ipnc_s = s_wamu_h_val * ipnc_strengths["wamu_h"] / 100.0
high_src = _add_lora(2012, high_src,
"Wan22_BerniniR_wamu_v3_HIGH_Rank1_IPNC.safetensors", ipnc_s)
# svicamera HIGH
high_src = _add_lora(2021, high_src, "nsfwsvicamera_lora_high_r128.safetensors",
s_svicamera_h_val)
# svicamera_IPNC HIGH
if ipnc_enabled and ipnc_strengths.get("svicamera_h", 0) > 0:
ipnc_s = s_svicamera_h_val * ipnc_strengths["svicamera_h"] / 100.0
high_src = _add_lora(2022, high_src,
"Wan22_BerniniR_nsfwsvicamera_HIGH_Rank1_IPNC.safetensors", ipnc_s)
# ── LOW chain ───────────────────────────────────────────────
# wamu LOW
low_src = _add_lora(2004, low_src,
"wamu_v3_lora_low_noise_r128.safetensors", s_wamu_l_val)
# lightx2v_I2V LOW (negation)
low_src = _add_lora(2005, low_src,
"lightx2v_I2V_14B_480p_cfg_step_distill_rank128_bf16.safetensors",
-s_wamu_l_val)
# wamu_IPNC LOW (after lightx2v_I2V negation)
if ipnc_enabled and ipnc_strengths.get("wamu_l", 0) > 0:
ipnc_s = s_wamu_l_val * ipnc_strengths["wamu_l"] / 100.0
low_src = _add_lora(2013, low_src,
"Wan22_BerniniR_wamu_v3_LOW_Rank1_IPNC.safetensors", ipnc_s)
# DR34ML4Y LOW
low_src = _add_lora(2006, low_src,
"DR34ML4Y_I2V_14B_LOW.safetensors", s_dreamly_l_val)
# DR34ML4Y_IPNC LOW
if ipnc_enabled and ipnc_strengths.get("dreamly_l", 0) > 0:
ipnc_s = s_dreamly_l_val * ipnc_strengths["dreamly_l"] / 100.0
low_src = _add_lora(2014, low_src,
"Wan22_BerniniR_DR34ML4Y_LOW_Rank1_IPNC.safetensors", ipnc_s)
# svicamera LOW
low_src = _add_lora(2023, low_src, "nsfwsvicamera_lora_low_r128.safetensors",
s_svicamera_l_val)
# svicamera_IPNC LOW
if ipnc_enabled and ipnc_strengths.get("svicamera_l", 0) > 0:
ipnc_s = s_svicamera_l_val * ipnc_strengths["svicamera_l"] / 100.0
low_src = _add_lora(2024, low_src,
"Wan22_BerniniR_nsfwsvicamera_LOW_Rank1_IPNC.safetensors", ipnc_s)
api["63"]["inputs"]["model"] = high_src
api["64"]["inputs"]["model"] = low_src
multiplier = RIFE_MULTIPLIERS.get(rife_mode, 1)
output_fps = base_fps * multiplier
if multiplier > 1:
api["3001"] = {
"class_type": "RIFE VFI",
"inputs": {
"frames": ["8", 0],
"ckpt_name": "rife49.pth",
"clear_cache_after_n_frames": 10,
"multiplier": multiplier,
"fast_mode": True,
"ensemble": True,
"scale_factor": 1,
"dtype": "float32",
"torch_compile": False,
"batch_size": 1,
},
}
api["67"]["inputs"]["images"] = ["3001", 0]
if "67" in api and isinstance(api["67"]["inputs"].get("frame_rate"), (int, float)):
api["67"]["inputs"]["frame_rate"] = output_fps
return api
def _execute_workflow(api: dict[str, Any], output_node: int | str = None) -> str:
import execution, server as comfy_server
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
srv = comfy_server.PromptServer(loop)
try:
executor = execution.PromptExecutor(
srv,
cache_type=execution.CacheType.RAM_PRESSURE,
cache_args={"lru": 0, "ram": 2.0, "ram_inactive": 8.0},
)
except Exception:
executor = execution.PromptExecutor(srv)
prompt_id = str(uuid.uuid4())
loop.run_until_complete(executor.execute_async(api, prompt_id, extra_data={}, execute_outputs=[str(output_node if output_node is not None else NODE_OUTPUT)]))
if not getattr(executor, "success", True):
msgs = getattr(executor, "status_messages", [])
msg = msgs[-1] if msgs else "comfy execution failed"
raise RuntimeError(str(msg))
for output in (executor.history_result or {}).get("outputs", {}).values():
for items in output.values():
if not isinstance(items, list):
continue
for item in items:
fname = item.get("filename") if isinstance(item, dict) else None
if not fname:
continue
subfolder = item.get("subfolder", "")
kind = item.get("type", "output")
base = OUTPUT if kind == "output" else COMFY / kind
candidate = base / subfolder / fname if subfolder else base / fname
if candidate.exists():
return str(candidate)
return "" # No video produced (intermediate chunk step — latent saved to /tmp)
def _estimate_duration(num_steps, duration_secs, base_fps, aspect_ratio, use_ref_aspect, rife_mode):
BASE_FRAMES = 45
BASE_STEP_S = 5.3
STEP_EXP = 1.2
n_frames = max(1, round(float(duration_secs) * int(base_fps)))
gen_w, gen_h = ASPECT_PRESETS.get(str(aspect_ratio), (848, 480))
frame_factor = (n_frames * gen_w * gen_h) / (BASE_FRAMES * 848 * 480)
step_time = BASE_STEP_S * frame_factor ** STEP_EXP
sampler_t = int(num_steps) * step_time
swap_t = 12 + 6 * frame_factor ** 0.5
overhead_t = 20 + 8 * max(0, frame_factor - 1)
rife_mult = {"2x rife": 2, "4x rife": 4}.get(str(rife_mode), 1)
rife_t = n_frames * (rife_mult - 1) * 0.05 if rife_mult > 1 else 0
total = (sampler_t + swap_t + overhead_t + rife_t) * 1.05
return max(30, int(total))
def _get_duration(
prompt, negative, image_input, seed, aspect_ratio, use_ref_aspect,
gpu_budget, num_steps, duration_secs, sampler_name, base_fps, rife_mode,
ipnc_enabled, ipnc_wamu_h, ipnc_dreamly_h, ipnc_wamu_l, ipnc_dreamly_l,
ipnc_svicamera_h, ipnc_svicamera_l,
*args, **kwargs,
):
if gpu_budget and int(gpu_budget) > 0:
return int(gpu_budget)
return _estimate_duration(num_steps, duration_secs, base_fps, aspect_ratio, use_ref_aspect, rife_mode)
@spaces.GPU(duration=_get_duration)
def generate_chunk_handler(
prompt: str,
negative: str,
image_input: Any,
seed: int,
aspect_ratio: str = "16:9",
use_ref_aspect: bool = False,
gpu_budget: int = 120,
num_steps: int = 6,
duration_secs: float = 5.0,
sampler_name: str = "lcm",
base_fps: int = 15,
rife_mode: str = "2x rife",
loras_enabled: bool = False,
s_wamu_h: float = 1.0,
s_dreamly_h: float = 1.0,
s_wamu_l: float = 0.5,
s_dreamly_l: float = 0.7,
s_svicamera_h: float = 0.0,
s_svicamera_l: float = 0.0,
ipnc_enabled: bool = True,
ipnc_wamu_h: float = 100.0,
ipnc_dreamly_h: float = 100.0,
ipnc_wamu_l: float = 100.0,
ipnc_dreamly_l: float = 100.0,
ipnc_svicamera_h: float = 100.0,
ipnc_svicamera_l: float = 100.0,
# chunking params
session_id: str = "",
steps_per_chunk: str = "2",
chunk_step: str = "0",
progress=gr.Progress(track_tqdm=True),
):
"""Run ONE denoising step for chunked long-video generation.
Returns:
(None, "PARTIAL:{session_id}:{next_step}") if more steps remain
(video_path, "Done: ...") on the final step
"""
import traceback as _tb
try:
if not (prompt or "").strip():
return None, "enter a prompt"
final_seed = int(seed) if seed else random.randint(0, MAX_SEED)
negative = negative or DEFAULT_NEGATIVE
image_paths = _coerce_gallery(image_input)
if not image_paths:
return None, "upload at least one reference image"
INPUT.mkdir(parents=True, exist_ok=True)
dest_names = []
for p in image_paths:
dn = f"ref_{uuid.uuid4().hex[:8]}_{os.path.basename(p)}"
shutil.copy(p, INPUT / dn)
dest_names.append(dn)
gen_w, gen_h = _compute_dims(str(aspect_ratio), image_paths[0], bool(use_ref_aspect))
n_frames = max(1, round(float(duration_secs) * int(base_fps)))
steps = int(num_steps)
STEPS_PER_CHUNK = max(1, int(float(steps_per_chunk or 2)))
chunk_idx = int(float(chunk_step or 0))
step_start = chunk_idx * STEPS_PER_CHUNK
step_end = min(step_start + STEPS_PER_CHUNK, steps)
is_final = (step_end == steps)
print(f"[chunk] session={session_id} chunk={chunk_idx} steps=[{step_start}-{step_end})/{steps}", flush=True)
progress(step_end / steps, desc=f"chunk {chunk_idx+1} — steps {step_start+1}-{step_end}/{steps}")
# Load base workflow and patch it identically
visual_wf = _load_workflow()
api_wf = _convert_visual_to_api(visual_wf)
api_wf = _patch_api_workflow(
api_wf, prompt, negative, dest_names,
final_seed, gen_w, gen_h,
num_steps=steps,
num_frames=n_frames,
sampler_name=str(sampler_name),
base_fps=int(base_fps),
rife_mode=str(rife_mode),
loras_enabled=bool(loras_enabled),
lora_strengths={
2001: float(s_wamu_h), 2002: -float(s_wamu_h),
2003: float(s_dreamly_h), 2004: float(s_wamu_l),
2005: -float(s_wamu_l), 2006: float(s_dreamly_l),
2021: float(s_svicamera_h), 2023: float(s_svicamera_l),
},
ipnc_enabled=bool(ipnc_enabled),
ipnc_strengths={
"wamu_h": float(ipnc_wamu_h), "dreamly_h": float(ipnc_dreamly_h),
"wamu_l": float(ipnc_wamu_l), "dreamly_l": float(ipnc_dreamly_l),
"svicamera_h": float(ipnc_svicamera_h), "svicamera_l": float(ipnc_svicamera_l),
},
)
split = _compute_split(steps)
use_high = (step_start < split)
sampler_id = "57" if use_high else "58"
# Inject chunk nodes: save/load latent, save/load conditioning
# Batch multiple steps per chunk (set by the caller)
api_wf[sampler_id]["inputs"]["steps"] = steps
api_wf[sampler_id]["inputs"]["start_at_step"] = step_start
api_wf[sampler_id]["inputs"]["end_at_step"] = step_end
api_wf[sampler_id]["inputs"]["return_with_leftover_noise"] = "disable" if is_final else "enable"
if chunk_idx == 0: # first chunk only
# Step 0: add noise, save conditioning + latent
api_wf[sampler_id]["inputs"]["add_noise"] = "enable"
api_wf[sampler_id]["inputs"]["noise_seed"] = final_seed
# Save cond from node 132 (BerniniConditioning) to capture full visual context latents
# Find the actual original connections for the sampler
orig_pos = api_wf[sampler_id]["inputs"]["positive"]
orig_neg = api_wf[sampler_id]["inputs"]["negative"]
api_wf["901"] = {"class_type": "BerniniChunkSaveCond", "inputs": {
"positive": orig_pos,
"negative": orig_neg,
"session_id": session_id,
}}
# Save output latent
api_wf["902"] = {"class_type": "BerniniChunkSaveLatent", "inputs": {
"samples": [sampler_id, 0],
"session_id": session_id,
}}
# On step 0, sampler keeps its original positive/negative connections to node 132
else:
# Step k>0: no noise, load saved cond + latent
api_wf[sampler_id]["inputs"]["add_noise"] = "disable"
# Load saved conditioning (fallback to node 132 if file missing)
orig_pos = api_wf[sampler_id]["inputs"]["positive"]
orig_neg = api_wf[sampler_id]["inputs"]["negative"]
api_wf["901"] = {"class_type": "BerniniChunkLoadCond", "inputs": {
"fallback_pos": orig_pos,
"fallback_neg": orig_neg,
"session_id": session_id,
}}
# Overwrite sampler inputs with the loaded conditioning
api_wf[sampler_id]["inputs"]["positive"] = ["901", 0]
api_wf[sampler_id]["inputs"]["negative"] = ["901", 1]
# Load previous latent, feed into sampler
api_wf["902"] = {"class_type": "BerniniChunkLoadLatent", "inputs": {
"fallback": ["132", 2], # fallback to noise latent from conditioning
"session_id": session_id,
}}
api_wf[sampler_id]["inputs"]["latent_image"] = ["902", 0]
# Save output latent (for next step, or final decode)
api_wf["903"] = {"class_type": "BerniniChunkSaveLatent", "inputs": {
"samples": [sampler_id, 0],
"session_id": session_id,
}}
# Output node for this step — determines what the executor returns
exec_node = NODE_OUTPUT if is_final else (902 if chunk_idx == 0 else 903)
output_video = _execute_workflow(api_wf, exec_node)
except Exception as e:
tb = _tb.format_exc()
print(f"[chunk] EXCEPTION: {tb}", flush=True)
return None, f"Generation failed: {type(e).__name__}: {e}"
if is_final and output_video:
ts = time.strftime("%Y%m%d_%H%M%S")
out_path = os.path.join(SAVE_BASE, f"bernini_chunk_{ts}.mp4")
shutil.copy(output_video, out_path)
return out_path, f"Done: {out_path}"
return None, f"PARTIAL:{session_id}:{chunk_idx+1}"
@spaces.GPU(duration=_get_duration)
def generate_handler(
prompt: str,
negative: str,
image_input: Any,
seed: int,
aspect_ratio: str = "16:9",
use_ref_aspect: bool = False,
gpu_budget: int = 0,
num_steps: int = 6,
duration_secs: float = 10.0,
sampler_name: str = "lcm",
base_fps: int = 15,
rife_mode: str = "2x rife",
loras_enabled: bool = False,
s_wamu_h: float = 1.0,
s_dreamly_h: float = 1.0,
s_wamu_l: float = 0.5,
s_dreamly_l: float = 0.7,
s_svicamera_h: float = 0.0,
s_svicamera_l: float = 0.0,
ipnc_enabled: bool = True,
ipnc_wamu_h: float = 100.0,
ipnc_dreamly_h: float = 100.0,
ipnc_wamu_l: float = 100.0,
ipnc_dreamly_l: float = 100.0,
ipnc_svicamera_h: float = 100.0,
ipnc_svicamera_l: float = 100.0,
progress=gr.Progress(track_tqdm=True),
):
import traceback as _tb
try:
if not (prompt or "").strip():
return None, "enter a prompt"
final_seed = int(seed) if seed else random.randint(0, MAX_SEED)
negative = negative or DEFAULT_NEGATIVE
def _coerce_gallery(g):
paths = []
for item in (g or []):
if isinstance(item, str):
paths.append(item)
elif isinstance(item, dict):
p = item.get("path") or item.get("name")
if p: paths.append(p)
elif isinstance(item, (list, tuple)) and item:
p = item[0]
if isinstance(p, str): paths.append(p)
elif isinstance(p, dict): paths.append(p.get("path",""))
return [p for p in paths if p and os.path.exists(p)][:5]
image_paths = _coerce_gallery(image_input)
if not image_paths:
return None, "upload at least one reference image"
INPUT.mkdir(parents=True, exist_ok=True)
dest_names = []
for p in image_paths:
dn = f"ref_{uuid.uuid4().hex[:8]}_{os.path.basename(p)}"
shutil.copy2(p, INPUT / dn)
dest_names.append(dn)
gen_w, gen_h = _compute_dims(str(aspect_ratio), image_paths[0], bool(use_ref_aspect))
print(prompt)
progress(0.1, desc="building workflow...")
visual_wf = _load_workflow()
api_wf = _convert_visual_to_api(visual_wf)
api_wf = _patch_api_workflow(
api_wf, prompt, negative,
dest_names,
final_seed, gen_w, gen_h,
num_steps=int(num_steps),
num_frames=max(1, round(float(duration_secs) * int(base_fps))),
sampler_name=str(sampler_name),
base_fps=int(base_fps),
rife_mode=str(rife_mode),
loras_enabled=bool(loras_enabled),
lora_strengths={
2001: float(s_wamu_h),
2002: -float(s_wamu_h),
2003: float(s_dreamly_h),
2004: float(s_wamu_l),
2005: -float(s_wamu_l),
2006: float(s_dreamly_l),
2021: float(s_svicamera_h),
2023: float(s_svicamera_l),
},
ipnc_enabled=bool(ipnc_enabled),
ipnc_strengths={
"wamu_h": float(ipnc_wamu_h),
"dreamly_h": float(ipnc_dreamly_h),
"wamu_l": float(ipnc_wamu_l),
"dreamly_l": float(ipnc_dreamly_l),
"svicamera_h": float(ipnc_svicamera_h),
"svicamera_l": float(ipnc_svicamera_l),
},
)
progress(0.3, desc="generating...")
output_video = _execute_workflow(api_wf)
except Exception as e:
tb = _tb.format_exc()
print(f"[generate] EXCEPTION: {tb}", flush=True)
return None, f"Generation failed: {type(e).__name__}: {e}"
ts = time.strftime("%Y%m%d_%H%M%S")
out_path = os.path.join(SAVE_BASE, f"r2v_{ts}.mp4")
shutil.copy2(output_video, out_path)
return out_path, f"Seed: {final_seed}\n{out_path}"
with gr.Blocks(title="Bernini-R Wan 2.2 R2V Lightning") as demo:
gr.Markdown("# Bernini-R Wan 2.2 R2V Lightning")
with gr.Row():
with gr.Column(scale=1):
image_input = gr.Gallery(
label="Reference images (up to 5)",
columns=5,
type="filepath",
height=160,
)
prompt = gr.Textbox(
label="Prompt",
lines=3,
placeholder="Describe the subject's action in detail...",
value="Keeping the exact identity and appearance the same as in image0, the person in image0 dances in a supermarket.",
)
with gr.Row():
enhance_btn = gr.Button("Enhance prompt", variant="secondary", size="sm")
no_redescribe = gr.Checkbox(
label="don't redescribe reference subjects",
value=False,
)
with gr.Group():
aspect_ratio = gr.Radio(
choices=list(ASPECT_PRESETS.keys()),
value="16:9",
label="Aspect ratio",
)
use_ref_aspect = gr.Checkbox(
label="use first reference image aspect ratio",
value=False,
)
with gr.Row():
duration_secs = gr.Slider(1, 20, value=5, step=0.5, label="Duration (s)")
base_fps = gr.Number(value=15, precision=0, label="Base FPS")
with gr.Row():
seed = gr.Number(value=0, precision=0, label="Seed (0=random)")
gpu_budget = gr.Slider(0, 540, value=0, step=10, label="ZeroGPU budget (0=auto)")
with gr.Group(elem_id="loras_9999"):
loras_enabled = gr.Checkbox(label="optional loras", value=False)
with gr.Column(visible=False) as loras_section:
with gr.Accordion("loras", open=True):
with gr.Group():
gr.Markdown("<div style='padding-left:8px'>High</div>")
with gr.Row():
s_wamu_h = gr.Slider(-2, 2, value=1.0, step=0.05, label="wamu")
s_dreamly_h = gr.Slider(-2, 2, value=1.0, step=0.05, label="dreamly")
s_svicamera_h = gr.Slider(-2, 2, value=0.0, step=0.05, label="svicamera")
with gr.Group():
gr.Markdown("<div style='padding-left:8px'>Low</div>")
with gr.Row():
s_wamu_l = gr.Slider(-2, 2, value=0.5, step=0.05, label="wamu")
s_dreamly_l = gr.Slider(-2, 2, value=0.7, step=0.05, label="dreamly")
s_svicamera_l = gr.Slider(-2, 2, value=0.0, step=0.05, label="svicamera")
loras_enabled.change(
fn=lambda x: gr.update(visible=x),
inputs=loras_enabled,
outputs=loras_section,
)
with gr.Accordion("Advanced", open=False):
with gr.Row():
num_steps = gr.Slider(4, 20, value=6, step=1, label="Steps")
sampler_name = gr.Dropdown(choices=["uni_pc", "lcm"], value="lcm", label="Sampler")
rife_mode = gr.Dropdown(
choices=["no rife", "2x rife", "4x rife"],
value="2x rife",
label="RIFE interpolation",
)
with gr.Accordion("Negative prompt", open=False):
negative = gr.Textbox(
label="",
lines=2,
value=DEFAULT_NEGATIVE,
)
with gr.Accordion("IPNC", open=False):
ipnc_enabled = gr.Checkbox(label="Enable", value=True)
with gr.Column(visible=True) as ipnc_section:
with gr.Group():
gr.Markdown("<div style='padding-left:8px'>High</div>")
with gr.Row():
ipnc_wamu_h = gr.Slider(0, 200, value=100, step=1, label="wamu")
ipnc_dreamly_h = gr.Slider(0, 200, value=100, step=1, label="dreamly")
ipnc_svicamera_h = gr.Slider(0, 200, value=100, step=1, label="svicamera")
with gr.Group():
gr.Markdown("<div style='padding-left:8px'>Low</div>")
with gr.Row():
ipnc_wamu_l = gr.Slider(0, 200, value=100, step=1, label="wamu")
ipnc_dreamly_l = gr.Slider(0, 200, value=100, step=1, label="dreamly")
ipnc_svicamera_l = gr.Slider(0, 200, value=100, step=1, label="svicamera")
ipnc_enabled.change(
fn=lambda x: gr.update(visible=x),
inputs=ipnc_enabled,
outputs=ipnc_section,
)
generate_btn = gr.Button("Generate", variant="primary", size="lg")
with gr.Column(scale=1):
# hidden: chunked generation endpoint
chunk_session_id = gr.Textbox(visible=False, value="")
chunk_steps_per = gr.Textbox(visible=False, value="2")
chunk_step_idx = gr.Textbox(visible=False, value="0")
chunk_btn = gr.Button(visible=False)
chunk_btn.click(
fn=generate_chunk_handler,
inputs=[
prompt, negative, image_input, seed, aspect_ratio, use_ref_aspect, gpu_budget,
num_steps, duration_secs, sampler_name, base_fps, rife_mode,
loras_enabled,
s_wamu_h, s_dreamly_h,
s_wamu_l, s_dreamly_l,
s_svicamera_h, s_svicamera_l,
ipnc_enabled,
ipnc_wamu_h, ipnc_dreamly_h,
ipnc_wamu_l, ipnc_dreamly_l,
ipnc_svicamera_h, ipnc_svicamera_l,
chunk_session_id, chunk_steps_per, chunk_step_idx,
],
outputs=[output_video, output_status],
)
output_video = gr.Video(label="Generated video")
output_status = gr.Textbox(label="Status", interactive=False, lines=4)
# ── enhance handler ──────────────────────────────────────────────────
def enhance_handler(prompt: str, image_input: Any, no_redescribe: bool,
progress=gr.Progress()):
paths = []
for item in (image_input or []):
if isinstance(item, str) and os.path.exists(item):
paths.append(item)
elif isinstance(item, dict):
p = item.get("path") or item.get("name")
if p and os.path.exists(p): paths.append(p)
elif isinstance(item, (list, tuple)) and item:
p = item[0] if isinstance(item[0], str) else (item[0].get("path") if isinstance(item[0], dict) else None)
if p and os.path.exists(p): paths.append(p)
paths = [p for p in paths if p and os.path.exists(p)][:5]
if not paths:
raise gr.Error("upload at least one reference image")
if not (prompt or "").strip():
raise gr.Error("enter a prompt")
result = _enhance_prompt_r2v(prompt, paths, no_redescribe=bool(no_redescribe))
if result:
return result
raise gr.Error("enhancement failed")
enhance_btn.click(
fn=enhance_handler,
inputs=[prompt, image_input, no_redescribe],
outputs=[prompt],
)
generate_btn.click(
fn=generate_handler,
inputs=[
prompt, negative, image_input, seed, aspect_ratio, use_ref_aspect, gpu_budget,
num_steps, duration_secs, sampler_name, base_fps, rife_mode,
loras_enabled,
s_wamu_h, s_dreamly_h,
s_wamu_l, s_dreamly_l,
s_svicamera_h, s_svicamera_l,
ipnc_enabled,
ipnc_wamu_h, ipnc_dreamly_h,
ipnc_wamu_l, ipnc_dreamly_l,
ipnc_svicamera_h, ipnc_svicamera_l,
],
outputs=[output_video, output_status],
)
if __name__ == "__main__":
_ensure_comfy()
_ensure_models()
_init_comfy_nodes()
demo.queue().launch()