| from fastapi import FastAPI, HTTPException |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
| from typing import List, Optional |
| import json |
| import time |
| import uuid |
| import logging |
| import traceback |
| import base64 |
| import io |
| from curl_cffi import CurlError |
| from curl_cffi.requests import Session |
| import os |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| _config = base64.b64decode(b'aGZfdmhRZmZYRWxJcXBvV25rU3lJUFJJUktYQ1hoekFFSmpLQw==').decode('utf-8') |
|
|
| class HuggingFaceImageGenerator: |
| def __init__(self, api_token=None): |
| self.api_url = "https://router.huggingface.co/replicate/v1/models/qwen/qwen-image/predictions" |
| self.api_token = api_token or _config |
| self.session = Session() |
| self.session.headers.update({ |
| "Authorization": f"Bearer {self.api_token}", |
| "Content-Type": "application/json", |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0" |
| }) |
| self.max_retries = 3 |
| self.retry_delay = 2.0 |
| |
| def _retry_request(self, func, *args, **kwargs): |
| """Retry mechanism for API requests""" |
| last_exception = None |
| |
| for attempt in range(self.max_retries): |
| try: |
| return func(*args, **kwargs) |
| except (CurlError, ConnectionError, TimeoutError) as e: |
| last_exception = e |
| if attempt < self.max_retries - 1: |
| wait_time = self.retry_delay * (2 ** attempt) |
| logger.warning(f"Image generation request failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}") |
| time.sleep(wait_time) |
| else: |
| logger.error(f"Image generation failed after {self.max_retries} attempts: {e}") |
| except Exception as e: |
| logger.error(f"Unexpected error in image generation: {e}") |
| last_exception = e |
| break |
| |
| raise last_exception or Exception("Image generation failed after retries") |
| |
| def generate_image(self, prompt, **kwargs): |
| """Generate image using Hugging Face Replicate API""" |
| if not prompt or not prompt.strip(): |
| logger.error("Empty prompt provided for image generation") |
| return None |
| |
| def _generate(): |
| |
| size = kwargs.get('size', '1024x1024') |
| quality = kwargs.get('quality', 'standard') |
| style = kwargs.get('style', 'vivid') |
| |
| |
| aspect_ratio_map = { |
| '1024x1024': '1:1', |
| '1792x1024': '16:9', |
| '1024x1792': '9:16', |
| '1365x1024': '4:3', |
| '1024x1365': '3:4' |
| } |
| aspect_ratio = aspect_ratio_map.get(size, '1:1') |
|
|
| |
| image_size = "optimize_for_quality" |
| |
| |
| data = { |
| "input": { |
| "prompt": prompt, |
| "go_fast": quality != "hd", |
| "image_size": image_size, |
| "aspect_ratio": aspect_ratio, |
| "output_format": "webp", |
| "enhance_prompt": style == "vivid", |
| "output_quality": 100 if quality == "hd" else 80, |
| "num_inference_steps": 50 if quality == "hd" else 30 |
| } |
| } |
| |
| logger.info(f"Generating image with prompt: {prompt[:50]}...") |
| |
| response = self.session.post( |
| self.api_url, |
| json=data, |
| timeout=30 |
| ) |
|
|
| if response.status_code not in [200, 201]: |
| raise Exception(f"Image generation failed: {response.status_code} - {response.text}") |
|
|
| |
| try: |
| prediction = response.json() |
| logger.info(f"Prediction created: {prediction.get('id', 'unknown')}") |
|
|
| |
| stream_url = prediction.get('urls', {}).get('stream') |
| get_url = prediction.get('urls', {}).get('get') |
|
|
| if stream_url: |
| logger.info(f"Trying stream URL: {stream_url}") |
| try: |
| |
| stream_response = self.session.get(stream_url, timeout=120) |
| if stream_response.status_code == 200 and len(stream_response.content) > 1000: |
| logger.info(f"Got image from stream: {len(stream_response.content)} bytes") |
| return stream_response.content |
| except Exception as e: |
| logger.warning(f"Stream URL failed: {e}") |
|
|
| |
| |
| if stream_url: |
| logger.info("Waiting for image generation via stream URL...") |
| max_attempts = 24 |
|
|
| for attempt in range(max_attempts): |
| logger.info(f"Stream check attempt {attempt + 1}/{max_attempts}") |
|
|
| try: |
| stream_response = self.session.get(stream_url, timeout=30) |
|
|
| |
| if (stream_response.status_code == 200 and |
| len(stream_response.content) > 1000 and |
| stream_response.headers.get('content-type', '').startswith('image/')): |
|
|
| logger.info(f"Image ready from stream: {len(stream_response.content)} bytes") |
| return stream_response.content |
|
|
| elif stream_response.status_code == 200: |
| |
| try: |
| stream_data = stream_response.json() |
| if 'output' in stream_data and stream_data['output']: |
| output = stream_data['output'] |
| if isinstance(output, list) and output: |
| image_url = output[0] |
| elif isinstance(output, str): |
| image_url = output |
| else: |
| raise Exception("No valid image URL in stream response") |
|
|
| logger.info(f"Got image URL from stream: {image_url}") |
|
|
| |
| img_response = self.session.get(image_url, timeout=60) |
| if img_response.status_code == 200: |
| logger.info(f"Image downloaded: {len(img_response.content)} bytes") |
| return img_response.content |
| else: |
| raise Exception(f"Failed to download image: {img_response.status_code}") |
| except json.JSONDecodeError: |
| pass |
|
|
| except Exception as e: |
| logger.debug(f"Stream attempt {attempt + 1} failed: {e}") |
|
|
| if attempt < max_attempts - 1: |
| time.sleep(5) |
|
|
| raise Exception("Image generation timed out - no result from stream URL") |
|
|
| else: |
| raise Exception("No stream URL available for image generation") |
|
|
| except json.JSONDecodeError: |
| raise Exception("Invalid JSON response from prediction API") |
| |
| try: |
| return self._retry_request(_generate) |
| except Exception as e: |
| logger.error(f"Image generation failed: {e}") |
| logger.error(f"Prompt was: {prompt[:100]}...") |
| return None |
|
|
| def create_prediction_only(self, prompt, **kwargs): |
| """Create prediction and return raw response without waiting for completion""" |
| if not prompt or not prompt.strip(): |
| logger.error("Empty prompt provided for prediction creation") |
| return None |
|
|
| def _create_prediction(): |
| |
| size = kwargs.get('size', '1024x1024') |
| quality = kwargs.get('quality', 'standard') |
| style = kwargs.get('style', 'vivid') |
|
|
| |
| aspect_ratio_map = { |
| '1024x1024': '1:1', |
| '1792x1024': '16:9', |
| '1024x1792': '9:16', |
| '1365x1024': '4:3', |
| '1024x1365': '3:4' |
| } |
| aspect_ratio = aspect_ratio_map.get(size, '1:1') |
|
|
| |
| image_size = "optimize_for_quality" |
|
|
| |
| data = { |
| "input": { |
| "prompt": prompt, |
| "go_fast": quality != "hd", |
| "image_size": image_size, |
| "aspect_ratio": aspect_ratio, |
| "output_format": "webp", |
| "enhance_prompt": style == "vivid", |
| "output_quality": 100 if quality == "hd" else 80, |
| "num_inference_steps": 50 if quality == "hd" else 30 |
| } |
| } |
|
|
| logger.info(f"Creating prediction with prompt: {prompt[:50]}...") |
|
|
| response = self.session.post( |
| self.api_url, |
| json=data, |
| timeout=30 |
| ) |
|
|
| if response.status_code not in [200, 201]: |
| raise Exception(f"Prediction creation failed: {response.status_code} - {response.text}") |
|
|
| |
| prediction = response.json() |
| logger.info(f"Prediction created successfully: {prediction.get('id', 'unknown')}") |
| return prediction |
|
|
| try: |
| return self._retry_request(_create_prediction) |
| except Exception as e: |
| logger.error(f"Prediction creation failed: {e}") |
| logger.error(f"Prompt was: {prompt[:100]}...") |
| return None |
|
|
| |
| class ImageGenerationRequest(BaseModel): |
| prompt: str = Field(..., description="A text description of the desired image(s)") |
| model: Optional[str] = Field(default="qwen-image", description="The model to use for image generation") |
| n: Optional[int] = Field(default=1, ge=1, le=10, description="Number of images to generate") |
| quality: Optional[str] = Field(default="standard", description="Quality of the image") |
| response_format: Optional[str] = Field(default="url", description="Format of the response") |
| size: Optional[str] = Field(default="1024x1024", description="Size of the generated images") |
| style: Optional[str] = Field(default="vivid", description="Style of the generated images") |
| user: Optional[str] = Field(default=None, description="A unique identifier for the user") |
|
|
| class ImageObject(BaseModel): |
| b64_json: Optional[str] = None |
| url: Optional[str] = None |
| revised_prompt: Optional[str] = None |
|
|
| class ImageGenerationResponse(BaseModel): |
| created: int |
| data: List[ImageObject] |
|
|
| |
| image_generator = None |
| startup_time = time.time() |
| request_count = 0 |
| error_count = 0 |
|
|
| from contextlib import asynccontextmanager |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Initialize the image generator on startup""" |
| global image_generator |
| try: |
| logger.info("Initializing Hugging Face image generator...") |
| image_generator = HuggingFaceImageGenerator() |
| logger.info("Image generator initialized successfully") |
| except Exception as e: |
| logger.error(f"Failed to initialize image generator: {e}") |
| image_generator = None |
|
|
| yield |
|
|
| |
| logger.info("Shutting down image generator...") |
|
|
| |
| app = FastAPI( |
| title="Qwen Image Generation API", |
| version="1.0.0", |
| description="OpenAI-compatible API for image generation using Qwen Image model via Hugging Face", |
| lifespan=lifespan |
| ) |
|
|
| |
| try: |
| from fastapi.middleware.cors import CORSMiddleware |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
| except ImportError: |
| pass |
|
|
| |
| @app.exception_handler(HTTPException) |
| async def http_exception_handler(request, exc: HTTPException): |
| logger.error(f"HTTP error: {exc.status_code} - {exc.detail}") |
| return JSONResponse( |
| status_code=exc.status_code, |
| content={ |
| "error": { |
| "message": exc.detail, |
| "type": "api_error", |
| "code": exc.status_code |
| } |
| } |
| ) |
|
|
| @app.exception_handler(Exception) |
| async def global_exception_handler(request, exc): |
| logger.error(f"Unexpected error: {exc}\n{traceback.format_exc()}") |
| return JSONResponse( |
| status_code=500, |
| content={ |
| "error": { |
| "message": "Internal server error", |
| "type": "server_error", |
| "code": 500 |
| } |
| } |
| ) |
|
|
| @app.get("/") |
| async def root(): |
| return {"message": "Qwen Image Generation API", "version": "1.0.0", "status": "running", "model": "qwen-image"} |
|
|
| @app.get("/v1/models") |
| async def list_models(): |
| """List available models - only Qwen Image""" |
| return { |
| "object": "list", |
| "data": [ |
| { |
| "id": "qwen-image", |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "qwen" |
| } |
| ] |
| } |
|
|
| @app.get("/models") |
| async def list_models_alt(): |
| """Alternative endpoint for models""" |
| return await list_models() |
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check with system status""" |
| global image_generator, startup_time, request_count, error_count |
| |
| uptime = time.time() - startup_time |
| status = "healthy" |
| |
| |
| generator_status = "unknown" |
| if image_generator is None: |
| generator_status = "not_initialized" |
| status = "degraded" |
| else: |
| generator_status = "ready" |
| |
| return { |
| "status": status, |
| "timestamp": int(time.time()), |
| "uptime_seconds": int(uptime), |
| "generator_status": generator_status, |
| "stats": { |
| "total_requests": request_count, |
| "total_errors": error_count, |
| "error_rate": error_count / max(request_count, 1) |
| } |
| } |
|
|
| @app.post("/v1/images/generations") |
| async def create_image(request: ImageGenerationRequest): |
| """Return raw Hugging Face prediction response format""" |
| global request_count, error_count, image_generator |
|
|
| request_count += 1 |
| request_id = f"img-{uuid.uuid4().hex[:8]}" |
| logger.info(f"[{request_id}] Image generation request: prompt='{request.prompt[:50]}...', size={request.size}, quality={request.quality}") |
|
|
| |
| if image_generator is None: |
| error_count += 1 |
| logger.error(f"[{request_id}] Image generator not initialized") |
| raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") |
|
|
| try: |
| |
| if request.model and request.model != "qwen-image": |
| raise HTTPException(status_code=400, detail="Only 'qwen-image' model is supported") |
|
|
| |
| valid_sizes = ['1024x1024', '1792x1024', '1024x1792', '1365x1024', '1024x1365'] |
| if request.size not in valid_sizes: |
| raise HTTPException(status_code=400, detail=f"Invalid size. Must be one of: {valid_sizes}") |
|
|
| valid_qualities = ['standard', 'hd'] |
| if request.quality not in valid_qualities: |
| raise HTTPException(status_code=400, detail=f"Invalid quality. Must be one of: {valid_qualities}") |
|
|
| |
| raw_prediction = image_generator.create_prediction_only( |
| request.prompt, |
| size=request.size, |
| quality=request.quality, |
| style=request.style |
| ) |
|
|
| if raw_prediction is None: |
| error_count += 1 |
| logger.error(f"[{request_id}] Failed to create prediction") |
| raise HTTPException(status_code=500, detail="Failed to create image prediction") |
|
|
| logger.info(f"[{request_id}] Prediction created: {raw_prediction.get('id', 'unknown')}") |
|
|
| |
| return raw_prediction |
|
|
| except HTTPException: |
| error_count += 1 |
| raise |
| except Exception as e: |
| error_count += 1 |
| logger.error(f"[{request_id}] Unexpected error: {e}\n{traceback.format_exc()}") |
| raise HTTPException(status_code=500, detail="Internal server error occurred") |
|
|
| @app.get("/v1/predictions/{prediction_id}") |
| async def get_prediction(prediction_id: str): |
| """Get prediction status - returns raw Hugging Face format""" |
| global image_generator |
|
|
| if image_generator is None: |
| raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") |
|
|
| try: |
| |
| |
| |
|
|
| logger.info(f"Getting prediction status for: {prediction_id}") |
|
|
| |
| return { |
| "id": prediction_id, |
| "status": "processing", |
| "message": "Use the stream URL from the original prediction response to get the image" |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error getting prediction {prediction_id}: {e}") |
| raise HTTPException(status_code=500, detail="Failed to get prediction status") |
|
|
| @app.post("/v1/images/generations/openai") |
| async def create_image_openai_format(request: ImageGenerationRequest): |
| """OpenAI-compatible endpoint that returns OpenAI format""" |
| global request_count, error_count, image_generator |
|
|
| request_count += 1 |
| request_id = f"img-{uuid.uuid4().hex[:8]}" |
| logger.info(f"[{request_id}] OpenAI format image generation: prompt='{request.prompt[:50]}...'") |
|
|
| if image_generator is None: |
| error_count += 1 |
| raise HTTPException(status_code=503, detail="Image generation service temporarily unavailable") |
|
|
| try: |
| |
| image_data = image_generator.generate_image( |
| request.prompt, |
| size=request.size, |
| quality=request.quality, |
| style=request.style |
| ) |
|
|
| if image_data is None: |
| error_count += 1 |
| raise HTTPException(status_code=500, detail="Failed to generate image") |
|
|
| logger.info(f"[{request_id}] Image generated successfully, size: {len(image_data)} bytes") |
|
|
| |
| images = [] |
| if request.response_format == "b64_json": |
| b64_data = base64.b64encode(image_data).decode('utf-8') |
| images.append(ImageObject( |
| b64_json=b64_data, |
| revised_prompt=request.prompt |
| )) |
| else: |
| b64_data = base64.b64encode(image_data).decode('utf-8') |
| data_url = f"data:image/webp;base64,{b64_data}" |
| images.append(ImageObject( |
| url=data_url, |
| revised_prompt=request.prompt |
| )) |
|
|
| response = ImageGenerationResponse( |
| created=int(time.time()), |
| data=images |
| ) |
|
|
| return response |
|
|
| except HTTPException: |
| error_count += 1 |
| raise |
| except Exception as e: |
| error_count += 1 |
| logger.error(f"[{request_id}] Unexpected error: {e}") |
| raise HTTPException(status_code=500, detail="Internal server error occurred") |
|
|
| if __name__ == "__main__": |
| try: |
| import uvicorn |
| port = int(os.getenv("PORT", 7860)) |
| host = os.getenv("HOST", "0.0.0.0") |
|
|
| logger.info(f"Starting image generation server on {host}:{port}") |
| uvicorn.run( |
| app, |
| host=host, |
| port=port, |
| reload=False, |
| log_level="info", |
| access_log=True |
| ) |
| except ImportError: |
| logger.error("uvicorn not installed. Install with: pip install uvicorn") |
| except Exception as e: |
| logger.error(f"Failed to start server: {e}") |