#!/usr/bin/env python3 """ Blackbox.ai Reverse OpenAI-Compatible API Server This server accepts OpenAI-compatible Chat Completions requests and forwards them to Blackbox.ai's chat endpoint using the exact required payload structure and headers. It streams responses back as OpenAI-style SSE chunks or returns a non-streaming OpenAI-compatible response. """ import json import time import uuid import logging import requests from typing import List, Dict, Any, Optional, Union, AsyncGenerator from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field try: from .config import ( BLACKBOX_HEADERS, BLACKBOX_COOKIES_STRING, MODEL_MAPPING, SERVER_CONFIG, BLACKBOX_VALIDATED, BLACKBOX_STATIC_SESSION, BLACKBOX_STATIC_SUBSCRIPTION, BLACKBOX_STATIC_BASE_FIELDS, ) except ImportError: # Allow running as a module or from root try: from config import ( BLACKBOX_HEADERS, BLACKBOX_COOKIES_STRING, MODEL_MAPPING, SERVER_CONFIG, BLACKBOX_VALIDATED, BLACKBOX_STATIC_SESSION, BLACKBOX_STATIC_SUBSCRIPTION, BLACKBOX_STATIC_BASE_FIELDS, ) except ImportError: raise logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) BLACKBOX_CHAT_URL = "https://www.blackbox.ai/api/chat" REQUEST_TIMEOUT = 120 # ===== OpenAI-compatible Pydantic models ===== class OpenAIMessage(BaseModel): role: str = Field(description="Role: system, user, assistant, function, or tool") content: Optional[Union[str, List[Dict[str, Any]]]] = Field(None, description="Message content") name: Optional[str] = None function_call: Optional[Dict[str, Any]] = None tool_calls: Optional[List[Dict[str, Any]]] = None tool_call_id: Optional[str] = None def get_text(self) -> str: if isinstance(self.content, str): return self.content if isinstance(self.content, list): text_parts: List[str] = [] for item in self.content: if isinstance(item, dict): if item.get("type") == "text": text_parts.append(item.get("text", "")) elif item.get("type") == "image_url": text_parts.append("[Image]") return "".join(text_parts) return str(self.content) if self.content is not None else "" class Config: extra = "allow" class OpenAIFunction(BaseModel): name: str description: Optional[str] = None parameters: Optional[Dict[str, Any]] = None class Config: extra = "allow" class OpenAITool(BaseModel): type: str = Field(default="function") function: Optional[OpenAIFunction] = None class Config: extra = "allow" class OpenAIChatRequest(BaseModel): model: str = Field(..., description="OpenAI model name") messages: List[OpenAIMessage] max_tokens: Optional[int] = None temperature: Optional[float] = None top_p: Optional[float] = None n: Optional[int] = 1 stream: Optional[bool] = False stop: Optional[Union[str, List[str]]] = None presence_penalty: Optional[float] = None frequency_penalty: Optional[float] = None logit_bias: Optional[Dict[str, float]] = None user: Optional[str] = None tools: Optional[List[OpenAITool]] = None tool_choice: Optional[Union[str, Dict[str, Any]]] = None functions: Optional[List[OpenAIFunction]] = None function_call: Optional[Union[str, Dict[str, Any]]] = None class Config: extra = "allow" class OpenAIChoice(BaseModel): index: int = 0 message: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIChatResponse(BaseModel): id: str object: str = "chat.completion" created: int model: str choices: List[OpenAIChoice] usage: Optional[Dict[str, int]] = None class OpenAIStreamChoice(BaseModel): index: int = 0 delta: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIStreamChunk(BaseModel): id: str object: str = "chat.completion.chunk" created: int model: str choices: List[OpenAIStreamChoice] def _parse_cookie_string(cookie_string: str) -> Dict[str, str]: cookies: Dict[str, str] = {} if not cookie_string: return cookies parts = [p.strip() for p in cookie_string.split(";") if p.strip()] for part in parts: if "=" in part: name, value = part.split("=", 1) cookies[name.strip()] = value.strip() return cookies def _blackbox_headers() -> Dict[str, str]: # Return a copy so we can safely mutate per-request if needed return dict(BLACKBOX_HEADERS) def _build_blackbox_payload(openai_req: OpenAIChatRequest) -> Dict[str, Any]: """ Build the exact Blackbox payload including all fields from the provided curl. We only map OpenAI messages/model into the appropriate fields and keep all other fields present so the upstream accepts the request. """ # Generate short ids def short_id() -> str: return uuid.uuid4().hex[:7] # Use user-provided top-level id if supplied as an extra field extra_top = getattr(openai_req, "model_extra", {}) or {} req_id = str(extra_top.get("id")) if extra_top.get("id") else short_id() # Map OpenAI messages to Blackbox format (id, content, role) bb_messages: List[Dict[str, Any]] = [] first = True for msg in openai_req.messages: msg_role = str(getattr(msg, "role", "")).strip() or "user" # Coerce unknown roles to 'user' to avoid upstream rejection if msg_role not in {"system", "user", "assistant"}: msg_role = "user" msg_content = msg.get_text() # Ensure minimally valid defaults to avoid 501 from upstream on malformed content if not msg_content: msg_content = "" # Reuse provided message.id if present in extras; ensure first message id matches top-level id msg_extra = getattr(msg, "model_extra", {}) or {} msg_id = str(msg_extra.get("id")) if msg_extra.get("id") else (req_id if first else short_id()) bb_messages.append({ "id": msg_id, "content": msg_content, "role": msg_role, }) first = False # Determine Blackbox agent from model mapping agent = MODEL_MAPPING.get(openai_req.model, MODEL_MAPPING.get("default")) payload: Dict[str, Any] = { "messages": bb_messages, "agentMode": { "name": agent["name"], "id": agent["id"], "mode": True, }, "id": req_id, # Include all base fields exactly **BLACKBOX_STATIC_BASE_FIELDS, # Ensure these fields reflect user-provided values "maxTokens": (openai_req.max_tokens if (openai_req.max_tokens is not None and openai_req.max_tokens > 0) else 1024), "userSelectedModel": agent["name"], "userSelectedAgent": "VscodeAgent", # Use validated token as provided "validated": BLACKBOX_VALIDATED, # Session/subscription from provided curl "session": BLACKBOX_STATIC_SESSION, "subscriptionCache": { **BLACKBOX_STATIC_SUBSCRIPTION }, # Blackbox expects streaming "stream": True, } return payload def _extract_blackbox_text(line: str) -> Optional[str]: """Try to extract textual delta from a Blackbox SSE line.""" # Lines may be plain or prefixed with 'data: ' if line.startswith("data: "): data_part = line[6:].strip() elif line.startswith("data:"): data_part = line[5:].strip() else: data_part = line.strip() if not data_part or data_part == "[DONE]": return None # Try JSON try: obj = json.loads(data_part) # Common patterns: {"type": "response"|"delta"|..., "data"|"text"|"delta": "..."} for key in ("data", "text", "delta", "output"): val = obj.get(key) if isinstance(val, str) and val: return val # Some responses nest under obj["message"]["content"] etc. message = obj.get("message") if isinstance(message, dict): content = message.get("content") if isinstance(content, str) and content: return content except Exception: # Not JSON - possibly plain text token if data_part: return data_part return None async def _stream_openai_chunks(openai_req: OpenAIChatRequest, request_id: str) -> AsyncGenerator[str, None]: """Forward to Blackbox with streaming, convert to OpenAI SSE chunks.""" headers = _blackbox_headers() cookies = _parse_cookie_string(BLACKBOX_COOKIES_STRING) payload = _build_blackbox_payload(openai_req) with requests.Session() as sess: try: resp = sess.post( BLACKBOX_CHAT_URL, headers=headers, cookies=cookies, json=payload, stream=True, timeout=REQUEST_TIMEOUT ) except requests.RequestException as e: raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}") if resp.status_code != 200: detail = resp.text[:500] raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}") created = int(time.time()) # Initial role chunk initial = OpenAIStreamChunk( id=request_id, created=created, model=openai_req.model, choices=[OpenAIStreamChoice(index=0, delta={"role": "assistant"}, finish_reason=None)] ) yield f"data: {initial.model_dump_json()}\n\n" for raw_line in resp.iter_lines(): if not raw_line: continue try: line = raw_line.decode("utf-8", errors="ignore") except Exception: continue text = _extract_blackbox_text(line) if text is None: continue chunk = OpenAIStreamChunk( id=request_id, created=created, model=openai_req.model, choices=[OpenAIStreamChoice(index=0, delta={"content": text}, finish_reason=None)] ) yield f"data: {chunk.model_dump_json()}\n\n" # Final chunk final = OpenAIStreamChunk( id=request_id, created=created, model=openai_req.model, choices=[OpenAIStreamChoice(index=0, delta={}, finish_reason="stop")] ) yield f"data: {final.model_dump_json()}\n\n" yield "data: [DONE]\n\n" def _complete_non_streaming(openai_req: OpenAIChatRequest) -> str: """Forward to Blackbox, collect all streamed text, and return a single string.""" headers = _blackbox_headers() cookies = _parse_cookie_string(BLACKBOX_COOKIES_STRING) payload = _build_blackbox_payload(openai_req) # Force stream true upstream; we will aggregate locally payload["stream"] = True with requests.Session() as sess: try: resp = sess.post( BLACKBOX_CHAT_URL, headers=headers, cookies=cookies, json=payload, stream=True, timeout=REQUEST_TIMEOUT ) except requests.RequestException as e: raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}") if resp.status_code != 200: detail = resp.text[:500] raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}") full = [] for raw_line in resp.iter_lines(): if not raw_line: continue try: line = raw_line.decode("utf-8", errors="ignore") except Exception: continue text = _extract_blackbox_text(line) if text: full.append(text) return "".join(full) # ===== FastAPI App ===== app = FastAPI( title="Blackbox.ai Reverse OpenAI API", version="1.0.0", description="OpenAI-compatible API that proxies to Blackbox.ai chat" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return { "message": "Blackbox.ai Reverse OpenAI API", "version": "1.0.0", "status": "running", "endpoints": ["/v1/chat/completions", "/v1/models", "/health"], } @app.get("/health") async def health(): return { "status": "healthy", "timestamp": int(time.time()), "upstream": "blackbox.ai", } @app.get("/v1/models") async def list_models(): created = int(time.time()) data = [] # Expose mapped models (keys of MODEL_MAPPING excluding 'default') for model_name in [k for k in MODEL_MAPPING.keys() if k != "default"]: data.append({ "id": model_name, "object": "model", "created": created, "owned_by": "blackbox", }) if not data: data.append({ "id": "gpt-5", "object": "model", "created": created, "owned_by": "blackbox", }) return {"object": "list", "data": data} @app.post("/v1/chat/completions") async def chat_completions(request: OpenAIChatRequest): request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" logger.info(f"[{request_id}] Chat request: model={request.model}, stream={request.stream}") # Streamed if bool(request.stream): return StreamingResponse( _stream_openai_chunks(request, request_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", }, ) # Non-streaming content = _complete_non_streaming(request) created = int(time.time()) response = OpenAIChatResponse( id=request_id, created=created, model=request.model, choices=[OpenAIChoice(index=0, message={"role": "assistant", "content": content}, finish_reason="stop")], usage={ "prompt_tokens": len(" ".join([m.get_text() for m in request.messages]).split()), "completion_tokens": len(content.split()), "total_tokens": len(" ".join([m.get_text() for m in request.messages]).split()) + len(content.split()), }, ) return response @app.post("/v1/chat/completions/raw") async def raw_validator(req: Request): body = await req.body() try: obj = json.loads(body) _ = OpenAIChatRequest(**obj) return {"valid": True} except Exception as e: return JSONResponse(status_code=422, content={"valid": False, "error": str(e)}) if __name__ == "__main__": try: import uvicorn host = SERVER_CONFIG.get("host", "0.0.0.0") port = int(SERVER_CONFIG.get("port", 8090)) logger.info(f"Starting Blackbox Reverse API on {host}:{port}") uvicorn.run(app, host=host, port=port, log_level="info") except Exception as e: logger.error(f"Failed to start server: {e}")