yichuan-huang's picture
Initial upload
dd1ad6f
raw
history blame
12.5 kB
import gradio as gr
import torch
from diffusers import Flux2Pipeline
from diffusers.utils import load_image
from PIL import Image
import os
import zipfile
import py7zr
import tempfile
import shutil
from pathlib import Path
import numpy as np
from skimage.metrics import peak_signal_noise_ratio, structural_similarity
from skimage.util import img_as_float
import io
# Load Flux model
print("Loading Flux model...")
pipe = Flux2Pipeline.from_pretrained(
"diffusers/FLUX.2-dev-bnb-4bit", torch_dtype=torch.bfloat16, fix_mistral_regex=True
)
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe.to(device)
print("Model loaded successfully")
# Default enhancement prompt
DEFAULT_PROMPT = "enhance microscopy image with subtle improvements, gently increase cellular boundary clarity, preserve original morphological structure, maintain authentic texture patterns, minimal noise reduction while keeping fine details intact"
# Enhancement parameters
GUIDANCE_SCALE = 2.0
NUM_INFERENCE_STEPS = 30
# Supported image extensions
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"}
def calculate_psnr_ssim(original, enhanced):
"""Calculate PSNR and SSIM between original and enhanced images"""
# Convert images to float arrays
orig_float = img_as_float(np.array(original))
enhanced_float = img_as_float(np.array(enhanced))
# Ensure both images have the same shape
if orig_float.shape != enhanced_float.shape:
min_h = min(orig_float.shape[0], enhanced_float.shape[0])
min_w = min(orig_float.shape[1], enhanced_float.shape[1])
orig_float = orig_float[:min_h, :min_w]
enhanced_float = enhanced_float[:min_h, :min_w]
# Calculate PSNR
psnr = peak_signal_noise_ratio(orig_float, enhanced_float, data_range=1.0)
# Calculate SSIM
if len(orig_float.shape) == 3: # Color image
ssim = structural_similarity(
orig_float, enhanced_float, data_range=1.0, channel_axis=-1
)
else: # Grayscale image
ssim = structural_similarity(orig_float, enhanced_float, data_range=1.0)
return psnr, ssim
def extract_archive(archive_path, extract_to):
"""Extract zip or 7z archive"""
file_ext = Path(archive_path).suffix.lower()
if file_ext == ".zip":
with zipfile.ZipFile(archive_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
elif file_ext == ".7z":
with py7zr.SevenZipFile(archive_path, mode="r") as archive:
archive.extractall(path=extract_to)
else:
raise ValueError(f"Unsupported archive format: {file_ext}")
def find_images(directory):
"""Recursively find all images in directory"""
image_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if Path(file).suffix.lower() in IMAGE_EXTENSIONS:
image_files.append(os.path.join(root, file))
return image_files
def enhance_single_image(image, prompt, guidance_scale, num_steps):
"""Enhance a single image"""
if isinstance(image, str):
input_image = load_image(image)
else:
input_image = image
enhanced_image = pipe(
image=input_image,
prompt=prompt,
guidance_scale=guidance_scale,
num_inference_steps=num_steps,
).images[0]
return input_image, enhanced_image
def process_images(files, prompt, guidance_scale, num_steps, progress=gr.Progress()):
"""Process uploaded files (images or archives)"""
if not files:
return None, None, "Please upload at least one file."
if not prompt or prompt.strip() == "":
prompt = DEFAULT_PROMPT
# Create temporary directories
temp_dir = tempfile.mkdtemp()
output_dir = tempfile.mkdtemp()
try:
all_images = []
results = []
metrics_summary = []
progress(0, desc="Processing files...")
# Process each uploaded file
for file_obj in files:
file_path = file_obj.name if hasattr(file_obj, "name") else file_obj
file_ext = Path(file_path).suffix.lower()
# Check if it's an archive
if file_ext in [".zip", ".7z"]:
progress(0.1, desc=f"Extracting archive: {Path(file_path).name}...")
extract_dir = os.path.join(temp_dir, Path(file_path).stem)
extract_archive(file_path, extract_dir)
# Find all images in extracted directory
images = find_images(extract_dir)
for img_path in images:
# Get relative path to maintain directory structure
rel_path = os.path.relpath(img_path, extract_dir)
all_images.append((img_path, rel_path, extract_dir))
# Check if it's an image
elif file_ext in IMAGE_EXTENSIONS:
all_images.append((file_path, Path(file_path).name, None))
if not all_images:
return None, None, "No valid images found in uploaded files."
total_images = len(all_images)
progress(0.2, desc=f"Found {total_images} images. Starting enhancement...")
# Process each image
for idx, (img_path, rel_path, base_dir) in enumerate(all_images):
progress(
(0.2 + 0.7 * idx / total_images),
desc=f"Processing {idx + 1}/{total_images}: {Path(img_path).name}...",
)
# Enhance image
original, enhanced = enhance_single_image(
img_path, prompt, guidance_scale, num_steps
)
# Calculate metrics
psnr, ssim = calculate_psnr_ssim(original, enhanced)
# Prepare output path
if base_dir:
# For archive images, maintain directory structure
output_rel_path = rel_path
output_path = os.path.join(output_dir, output_rel_path)
else:
# For standalone images
output_path = os.path.join(output_dir, rel_path)
# Create output directory if needed
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Add _flux suffix to filename
output_name = Path(output_path).stem + "_flux" + Path(output_path).suffix
output_path = os.path.join(os.path.dirname(output_path), output_name)
# Save enhanced image
enhanced.save(output_path)
results.append(
{
"original": original,
"enhanced": enhanced,
"filename": rel_path,
"output_path": output_path,
"psnr": psnr,
"ssim": ssim,
}
)
metrics_summary.append(f"{rel_path}: PSNR={psnr:.2f} dB, SSIM={ssim:.4f}")
progress(0.9, desc="Creating output package...")
# Create output zip file
output_zip_path = os.path.join(
tempfile.gettempdir(), "enhanced_images_flux.zip"
)
with zipfile.ZipFile(output_zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(output_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, output_dir)
zipf.write(file_path, arcname)
# Calculate average metrics
avg_psnr = np.mean([r["psnr"] for r in results])
avg_ssim = np.mean([r["ssim"] for r in results])
# Create summary text
summary = f"✅ Processing completed!\n\n"
summary += f"Total images processed: {total_images}\n"
summary += f"Average PSNR: {avg_psnr:.2f} dB\n"
summary += f"Average SSIM: {avg_ssim:.4f}\n\n"
summary += "Individual metrics:\n"
summary += "\n".join(metrics_summary)
progress(1.0, desc="Done!")
# For display in gallery, show first few results
gallery_images = []
for result in results[:10]: # Show first 10 results
gallery_images.append(
(result["original"], f"Original: {result['filename']}")
)
gallery_images.append(
(
result["enhanced"],
f"Enhanced: {result['filename']}\nPSNR: {result['psnr']:.2f} dB, SSIM: {result['ssim']:.4f}",
)
)
return gallery_images, output_zip_path, summary
except Exception as e:
return None, None, f"Error during processing: {str(e)}"
finally:
# Cleanup temporary directory
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# Create Gradio interface
with gr.Blocks(title="Flux Microscopy Image Enhancement") as demo:
gr.Markdown(
"""
# 🔬 Flux Microscopy Image Enhancement
Upload microscopy images (individual files or compressed archives) for AI-powered enhancement.
**Supported formats:**
- Images: JPG, PNG, BMP, TIFF
- Archives: ZIP, 7Z (will process all images inside)
**Features:**
- Batch processing support
- Custom enhancement prompts
- Quality metrics (PSNR & SSIM)
- Download results as ZIP with `_flux` suffix
"""
)
with gr.Row():
with gr.Column(scale=1):
# File upload
file_input = gr.File(
label="Upload Images or Archive (ZIP/7Z)",
file_count="multiple",
file_types=["image", ".zip", ".7z"],
)
# Prompt input
prompt_input = gr.Textbox(
label="Enhancement Prompt",
placeholder="Enter custom prompt or leave empty for default",
value=DEFAULT_PROMPT,
lines=3,
)
# Parameter controls
gr.Markdown("### Enhancement Parameters")
guidance_scale_input = gr.Slider(
minimum=1.0,
maximum=5.0,
value=GUIDANCE_SCALE,
step=0.1,
label="Guidance Scale",
info="Controls enhancement strength (lower = more conservative, higher = more creative)",
)
num_steps_input = gr.Slider(
minimum=10,
maximum=50,
value=NUM_INFERENCE_STEPS,
step=1,
label="Inference Steps",
info="Number of processing steps (more steps = better quality but slower)",
)
# Process button
process_btn = gr.Button("🚀 Enhance Images", variant="primary", size="lg")
# Example
gr.Markdown("### Example")
gr.Examples(
examples=[
[None, DEFAULT_PROMPT, GUIDANCE_SCALE, NUM_INFERENCE_STEPS],
],
inputs=[
file_input,
prompt_input,
guidance_scale_input,
num_steps_input,
],
)
with gr.Column(scale=2):
# Gallery for results
gallery_output = gr.Gallery(
label="Results Preview (Original vs Enhanced)",
columns=2,
rows=2,
height="auto",
object_fit="contain",
)
# Download button
download_output = gr.File(label="📥 Download All Enhanced Images (ZIP)")
# Metrics summary
summary_output = gr.Textbox(
label="Processing Summary & Metrics", lines=10, max_lines=20
)
# Process button click
process_btn.click(
fn=process_images,
inputs=[file_input, prompt_input, guidance_scale_input, num_steps_input],
outputs=[gallery_output, download_output, summary_output],
)
gr.Markdown(
"""
---
### Default Parameters
- **Guidance Scale**: 2.0 (conservative for natural enhancement)
- **Inference Steps**: 30 (balanced quality and speed)
💡 You can adjust these parameters above to customize the enhancement process.
### Quality Metrics
- **PSNR** (Peak Signal-to-Noise Ratio): Higher is better (>30 dB is good)
- **SSIM** (Structural Similarity Index): Closer to 1.0 is better (>0.9 is excellent)
"""
)
# Launch the app
if __name__ == "__main__":
demo.launch(share=False)