| import gradio as gr |
| import torch |
| from diffusers import Flux2Pipeline |
| from diffusers.utils import load_image |
| from PIL import Image |
| import os |
| import zipfile |
| import py7zr |
| import tempfile |
| import shutil |
| from pathlib import Path |
| import numpy as np |
| from skimage.metrics import peak_signal_noise_ratio, structural_similarity |
| from skimage.util import img_as_float |
| import io |
|
|
| |
| print("Loading Flux model...") |
| pipe = Flux2Pipeline.from_pretrained( |
| "diffusers/FLUX.2-dev-bnb-4bit", torch_dtype=torch.bfloat16, fix_mistral_regex=True |
| ) |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| pipe.to(device) |
| print("Model loaded successfully") |
|
|
| |
| DEFAULT_PROMPT = "enhance microscopy image with subtle improvements, gently increase cellular boundary clarity, preserve original morphological structure, maintain authentic texture patterns, minimal noise reduction while keeping fine details intact" |
|
|
| |
| GUIDANCE_SCALE = 2.0 |
| NUM_INFERENCE_STEPS = 30 |
|
|
| |
| IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"} |
|
|
|
|
| def calculate_psnr_ssim(original, enhanced): |
| """Calculate PSNR and SSIM between original and enhanced images""" |
| |
| orig_float = img_as_float(np.array(original)) |
| enhanced_float = img_as_float(np.array(enhanced)) |
|
|
| |
| if orig_float.shape != enhanced_float.shape: |
| min_h = min(orig_float.shape[0], enhanced_float.shape[0]) |
| min_w = min(orig_float.shape[1], enhanced_float.shape[1]) |
| orig_float = orig_float[:min_h, :min_w] |
| enhanced_float = enhanced_float[:min_h, :min_w] |
|
|
| |
| psnr = peak_signal_noise_ratio(orig_float, enhanced_float, data_range=1.0) |
|
|
| |
| if len(orig_float.shape) == 3: |
| ssim = structural_similarity( |
| orig_float, enhanced_float, data_range=1.0, channel_axis=-1 |
| ) |
| else: |
| ssim = structural_similarity(orig_float, enhanced_float, data_range=1.0) |
|
|
| return psnr, ssim |
|
|
|
|
| def extract_archive(archive_path, extract_to): |
| """Extract zip or 7z archive""" |
| file_ext = Path(archive_path).suffix.lower() |
|
|
| if file_ext == ".zip": |
| with zipfile.ZipFile(archive_path, "r") as zip_ref: |
| zip_ref.extractall(extract_to) |
| elif file_ext == ".7z": |
| with py7zr.SevenZipFile(archive_path, mode="r") as archive: |
| archive.extractall(path=extract_to) |
| else: |
| raise ValueError(f"Unsupported archive format: {file_ext}") |
|
|
|
|
| def find_images(directory): |
| """Recursively find all images in directory""" |
| image_files = [] |
| for root, dirs, files in os.walk(directory): |
| for file in files: |
| if Path(file).suffix.lower() in IMAGE_EXTENSIONS: |
| image_files.append(os.path.join(root, file)) |
| return image_files |
|
|
|
|
| def enhance_single_image(image, prompt, guidance_scale, num_steps): |
| """Enhance a single image""" |
| if isinstance(image, str): |
| input_image = load_image(image) |
| else: |
| input_image = image |
|
|
| enhanced_image = pipe( |
| image=input_image, |
| prompt=prompt, |
| guidance_scale=guidance_scale, |
| num_inference_steps=num_steps, |
| ).images[0] |
|
|
| return input_image, enhanced_image |
|
|
|
|
| def process_images(files, prompt, guidance_scale, num_steps, progress=gr.Progress()): |
| """Process uploaded files (images or archives)""" |
| if not files: |
| return None, None, "Please upload at least one file." |
|
|
| if not prompt or prompt.strip() == "": |
| prompt = DEFAULT_PROMPT |
|
|
| |
| temp_dir = tempfile.mkdtemp() |
| output_dir = tempfile.mkdtemp() |
|
|
| try: |
| all_images = [] |
| results = [] |
| metrics_summary = [] |
|
|
| progress(0, desc="Processing files...") |
|
|
| |
| for file_obj in files: |
| file_path = file_obj.name if hasattr(file_obj, "name") else file_obj |
| file_ext = Path(file_path).suffix.lower() |
|
|
| |
| if file_ext in [".zip", ".7z"]: |
| progress(0.1, desc=f"Extracting archive: {Path(file_path).name}...") |
| extract_dir = os.path.join(temp_dir, Path(file_path).stem) |
| extract_archive(file_path, extract_dir) |
|
|
| |
| images = find_images(extract_dir) |
|
|
| for img_path in images: |
| |
| rel_path = os.path.relpath(img_path, extract_dir) |
| all_images.append((img_path, rel_path, extract_dir)) |
|
|
| |
| elif file_ext in IMAGE_EXTENSIONS: |
| all_images.append((file_path, Path(file_path).name, None)) |
|
|
| if not all_images: |
| return None, None, "No valid images found in uploaded files." |
|
|
| total_images = len(all_images) |
| progress(0.2, desc=f"Found {total_images} images. Starting enhancement...") |
|
|
| |
| for idx, (img_path, rel_path, base_dir) in enumerate(all_images): |
| progress( |
| (0.2 + 0.7 * idx / total_images), |
| desc=f"Processing {idx + 1}/{total_images}: {Path(img_path).name}...", |
| ) |
|
|
| |
| original, enhanced = enhance_single_image( |
| img_path, prompt, guidance_scale, num_steps |
| ) |
|
|
| |
| psnr, ssim = calculate_psnr_ssim(original, enhanced) |
|
|
| |
| if base_dir: |
| |
| output_rel_path = rel_path |
| output_path = os.path.join(output_dir, output_rel_path) |
| else: |
| |
| output_path = os.path.join(output_dir, rel_path) |
|
|
| |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) |
|
|
| |
| output_name = Path(output_path).stem + "_flux" + Path(output_path).suffix |
| output_path = os.path.join(os.path.dirname(output_path), output_name) |
|
|
| |
| enhanced.save(output_path) |
|
|
| results.append( |
| { |
| "original": original, |
| "enhanced": enhanced, |
| "filename": rel_path, |
| "output_path": output_path, |
| "psnr": psnr, |
| "ssim": ssim, |
| } |
| ) |
|
|
| metrics_summary.append(f"{rel_path}: PSNR={psnr:.2f} dB, SSIM={ssim:.4f}") |
|
|
| progress(0.9, desc="Creating output package...") |
|
|
| |
| output_zip_path = os.path.join( |
| tempfile.gettempdir(), "enhanced_images_flux.zip" |
| ) |
| with zipfile.ZipFile(output_zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: |
| for root, dirs, files in os.walk(output_dir): |
| for file in files: |
| file_path = os.path.join(root, file) |
| arcname = os.path.relpath(file_path, output_dir) |
| zipf.write(file_path, arcname) |
|
|
| |
| avg_psnr = np.mean([r["psnr"] for r in results]) |
| avg_ssim = np.mean([r["ssim"] for r in results]) |
|
|
| |
| summary = f"✅ Processing completed!\n\n" |
| summary += f"Total images processed: {total_images}\n" |
| summary += f"Average PSNR: {avg_psnr:.2f} dB\n" |
| summary += f"Average SSIM: {avg_ssim:.4f}\n\n" |
| summary += "Individual metrics:\n" |
| summary += "\n".join(metrics_summary) |
|
|
| progress(1.0, desc="Done!") |
|
|
| |
| gallery_images = [] |
| for result in results[:10]: |
| gallery_images.append( |
| (result["original"], f"Original: {result['filename']}") |
| ) |
| gallery_images.append( |
| ( |
| result["enhanced"], |
| f"Enhanced: {result['filename']}\nPSNR: {result['psnr']:.2f} dB, SSIM: {result['ssim']:.4f}", |
| ) |
| ) |
|
|
| return gallery_images, output_zip_path, summary |
|
|
| except Exception as e: |
| return None, None, f"Error during processing: {str(e)}" |
|
|
| finally: |
| |
| if os.path.exists(temp_dir): |
| shutil.rmtree(temp_dir) |
|
|
|
|
| |
| with gr.Blocks(title="Flux Microscopy Image Enhancement") as demo: |
| gr.Markdown( |
| """ |
| # 🔬 Flux Microscopy Image Enhancement |
| |
| Upload microscopy images (individual files or compressed archives) for AI-powered enhancement. |
| |
| **Supported formats:** |
| - Images: JPG, PNG, BMP, TIFF |
| - Archives: ZIP, 7Z (will process all images inside) |
| |
| **Features:** |
| - Batch processing support |
| - Custom enhancement prompts |
| - Quality metrics (PSNR & SSIM) |
| - Download results as ZIP with `_flux` suffix |
| """ |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| file_input = gr.File( |
| label="Upload Images or Archive (ZIP/7Z)", |
| file_count="multiple", |
| file_types=["image", ".zip", ".7z"], |
| ) |
|
|
| |
| prompt_input = gr.Textbox( |
| label="Enhancement Prompt", |
| placeholder="Enter custom prompt or leave empty for default", |
| value=DEFAULT_PROMPT, |
| lines=3, |
| ) |
|
|
| |
| gr.Markdown("### Enhancement Parameters") |
| guidance_scale_input = gr.Slider( |
| minimum=1.0, |
| maximum=5.0, |
| value=GUIDANCE_SCALE, |
| step=0.1, |
| label="Guidance Scale", |
| info="Controls enhancement strength (lower = more conservative, higher = more creative)", |
| ) |
|
|
| num_steps_input = gr.Slider( |
| minimum=10, |
| maximum=50, |
| value=NUM_INFERENCE_STEPS, |
| step=1, |
| label="Inference Steps", |
| info="Number of processing steps (more steps = better quality but slower)", |
| ) |
|
|
| |
| process_btn = gr.Button("🚀 Enhance Images", variant="primary", size="lg") |
|
|
| |
| gr.Markdown("### Example") |
| gr.Examples( |
| examples=[ |
| [None, DEFAULT_PROMPT, GUIDANCE_SCALE, NUM_INFERENCE_STEPS], |
| ], |
| inputs=[ |
| file_input, |
| prompt_input, |
| guidance_scale_input, |
| num_steps_input, |
| ], |
| ) |
|
|
| with gr.Column(scale=2): |
| |
| gallery_output = gr.Gallery( |
| label="Results Preview (Original vs Enhanced)", |
| columns=2, |
| rows=2, |
| height="auto", |
| object_fit="contain", |
| ) |
|
|
| |
| download_output = gr.File(label="📥 Download All Enhanced Images (ZIP)") |
|
|
| |
| summary_output = gr.Textbox( |
| label="Processing Summary & Metrics", lines=10, max_lines=20 |
| ) |
|
|
| |
| process_btn.click( |
| fn=process_images, |
| inputs=[file_input, prompt_input, guidance_scale_input, num_steps_input], |
| outputs=[gallery_output, download_output, summary_output], |
| ) |
|
|
| gr.Markdown( |
| """ |
| --- |
| ### Default Parameters |
| - **Guidance Scale**: 2.0 (conservative for natural enhancement) |
| - **Inference Steps**: 30 (balanced quality and speed) |
| |
| 💡 You can adjust these parameters above to customize the enhancement process. |
| |
| ### Quality Metrics |
| - **PSNR** (Peak Signal-to-Noise Ratio): Higher is better (>30 dB is good) |
| - **SSIM** (Structural Similarity Index): Closer to 1.0 is better (>0.9 is excellent) |
| """ |
| ) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch(share=False) |
|
|