|
|
| """
|
| Blackbox.ai Reverse OpenAI-Compatible API Server
|
|
|
| This server accepts OpenAI-compatible Chat Completions requests and forwards them
|
| to Blackbox.ai's chat endpoint using the exact required payload structure and headers.
|
| It streams responses back as OpenAI-style SSE chunks or returns a non-streaming
|
| OpenAI-compatible response.
|
| """
|
| import json
|
| import time
|
| import uuid
|
| import logging
|
| import requests
|
| from typing import List, Dict, Any, Optional, Union, AsyncGenerator
|
|
|
| from fastapi import FastAPI, HTTPException, Request
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from fastapi.responses import StreamingResponse, JSONResponse
|
| from pydantic import BaseModel, Field
|
|
|
| try:
|
| from .config import (
|
| BLACKBOX_HEADERS,
|
| BLACKBOX_COOKIES_STRING,
|
| MODEL_MAPPING,
|
| SERVER_CONFIG,
|
| BLACKBOX_VALIDATED,
|
| BLACKBOX_STATIC_SESSION,
|
| BLACKBOX_STATIC_SUBSCRIPTION,
|
| BLACKBOX_STATIC_BASE_FIELDS,
|
| )
|
| except ImportError:
|
|
|
| try:
|
| from config import (
|
| BLACKBOX_HEADERS,
|
| BLACKBOX_COOKIES_STRING,
|
| MODEL_MAPPING,
|
| SERVER_CONFIG,
|
| BLACKBOX_VALIDATED,
|
| BLACKBOX_STATIC_SESSION,
|
| BLACKBOX_STATIC_SUBSCRIPTION,
|
| BLACKBOX_STATIC_BASE_FIELDS,
|
| )
|
| except ImportError:
|
| raise
|
|
|
| logger = logging.getLogger(__name__)
|
| logging.basicConfig(level=logging.INFO)
|
|
|
| BLACKBOX_CHAT_URL = "https://www.blackbox.ai/api/chat"
|
| REQUEST_TIMEOUT = 120
|
|
|
|
|
|
|
| class OpenAIMessage(BaseModel):
|
| role: str = Field(description="Role: system, user, assistant, function, or tool")
|
| content: Optional[Union[str, List[Dict[str, Any]]]] = Field(None, description="Message content")
|
| name: Optional[str] = None
|
| function_call: Optional[Dict[str, Any]] = None
|
| tool_calls: Optional[List[Dict[str, Any]]] = None
|
| tool_call_id: Optional[str] = None
|
|
|
| def get_text(self) -> str:
|
| if isinstance(self.content, str):
|
| return self.content
|
| if isinstance(self.content, list):
|
| text_parts: List[str] = []
|
| for item in self.content:
|
| if isinstance(item, dict):
|
| if item.get("type") == "text":
|
| text_parts.append(item.get("text", ""))
|
| elif item.get("type") == "image_url":
|
| text_parts.append("[Image]")
|
| return "".join(text_parts)
|
| return str(self.content) if self.content is not None else ""
|
|
|
| class Config:
|
| extra = "allow"
|
|
|
|
|
| class OpenAIFunction(BaseModel):
|
| name: str
|
| description: Optional[str] = None
|
| parameters: Optional[Dict[str, Any]] = None
|
|
|
| class Config:
|
| extra = "allow"
|
|
|
|
|
| class OpenAITool(BaseModel):
|
| type: str = Field(default="function")
|
| function: Optional[OpenAIFunction] = None
|
|
|
| class Config:
|
| extra = "allow"
|
|
|
|
|
| class OpenAIChatRequest(BaseModel):
|
| model: str = Field(..., description="OpenAI model name")
|
| messages: List[OpenAIMessage]
|
| max_tokens: Optional[int] = None
|
| temperature: Optional[float] = None
|
| top_p: Optional[float] = None
|
| n: Optional[int] = 1
|
| stream: Optional[bool] = False
|
| stop: Optional[Union[str, List[str]]] = None
|
| presence_penalty: Optional[float] = None
|
| frequency_penalty: Optional[float] = None
|
| logit_bias: Optional[Dict[str, float]] = None
|
| user: Optional[str] = None
|
| tools: Optional[List[OpenAITool]] = None
|
| tool_choice: Optional[Union[str, Dict[str, Any]]] = None
|
| functions: Optional[List[OpenAIFunction]] = None
|
| function_call: Optional[Union[str, Dict[str, Any]]] = None
|
|
|
| class Config:
|
| extra = "allow"
|
|
|
|
|
| class OpenAIChoice(BaseModel):
|
| index: int = 0
|
| message: Dict[str, Any]
|
| finish_reason: Optional[str] = None
|
|
|
|
|
| class OpenAIChatResponse(BaseModel):
|
| id: str
|
| object: str = "chat.completion"
|
| created: int
|
| model: str
|
| choices: List[OpenAIChoice]
|
| usage: Optional[Dict[str, int]] = None
|
|
|
|
|
| class OpenAIStreamChoice(BaseModel):
|
| index: int = 0
|
| delta: Dict[str, Any]
|
| finish_reason: Optional[str] = None
|
|
|
|
|
| class OpenAIStreamChunk(BaseModel):
|
| id: str
|
| object: str = "chat.completion.chunk"
|
| created: int
|
| model: str
|
| choices: List[OpenAIStreamChoice]
|
|
|
|
|
| def _parse_cookie_string(cookie_string: str) -> Dict[str, str]:
|
| cookies: Dict[str, str] = {}
|
| if not cookie_string:
|
| return cookies
|
| parts = [p.strip() for p in cookie_string.split(";") if p.strip()]
|
| for part in parts:
|
| if "=" in part:
|
| name, value = part.split("=", 1)
|
| cookies[name.strip()] = value.strip()
|
| return cookies
|
|
|
|
|
| def _blackbox_headers() -> Dict[str, str]:
|
|
|
| return dict(BLACKBOX_HEADERS)
|
|
|
|
|
| def _build_blackbox_payload(openai_req: OpenAIChatRequest) -> Dict[str, Any]:
|
| """
|
| Build the exact Blackbox payload including all fields from the provided curl.
|
| We only map OpenAI messages/model into the appropriate fields and keep all other
|
| fields present so the upstream accepts the request.
|
| """
|
|
|
| def short_id() -> str:
|
| return uuid.uuid4().hex[:7]
|
|
|
|
|
| extra_top = getattr(openai_req, "model_extra", {}) or {}
|
| req_id = str(extra_top.get("id")) if extra_top.get("id") else short_id()
|
|
|
|
|
| bb_messages: List[Dict[str, Any]] = []
|
| first = True
|
| for msg in openai_req.messages:
|
| msg_role = str(getattr(msg, "role", "")).strip() or "user"
|
|
|
| if msg_role not in {"system", "user", "assistant"}:
|
| msg_role = "user"
|
| msg_content = msg.get_text()
|
|
|
| if not msg_content:
|
| msg_content = ""
|
|
|
| msg_extra = getattr(msg, "model_extra", {}) or {}
|
| msg_id = str(msg_extra.get("id")) if msg_extra.get("id") else (req_id if first else short_id())
|
| bb_messages.append({
|
| "id": msg_id,
|
| "content": msg_content,
|
| "role": msg_role,
|
| })
|
| first = False
|
|
|
|
|
| agent = MODEL_MAPPING.get(openai_req.model, MODEL_MAPPING.get("default"))
|
|
|
| payload: Dict[str, Any] = {
|
| "messages": bb_messages,
|
| "agentMode": {
|
| "name": agent["name"],
|
| "id": agent["id"],
|
| "mode": True,
|
| },
|
| "id": req_id,
|
|
|
| **BLACKBOX_STATIC_BASE_FIELDS,
|
|
|
| "maxTokens": (openai_req.max_tokens if (openai_req.max_tokens is not None and openai_req.max_tokens > 0) else 1024),
|
| "userSelectedModel": agent["name"],
|
| "userSelectedAgent": "VscodeAgent",
|
|
|
| "validated": BLACKBOX_VALIDATED,
|
|
|
| "session": BLACKBOX_STATIC_SESSION,
|
| "subscriptionCache": {
|
| **BLACKBOX_STATIC_SUBSCRIPTION
|
| },
|
|
|
| "stream": True,
|
| }
|
|
|
| return payload
|
|
|
|
|
| def _extract_blackbox_text(line: str) -> Optional[str]:
|
| """Try to extract textual delta from a Blackbox SSE line."""
|
|
|
| if line.startswith("data: "):
|
| data_part = line[6:].strip()
|
| elif line.startswith("data:"):
|
| data_part = line[5:].strip()
|
| else:
|
| data_part = line.strip()
|
|
|
| if not data_part or data_part == "[DONE]":
|
| return None
|
|
|
|
|
| try:
|
| obj = json.loads(data_part)
|
|
|
| for key in ("data", "text", "delta", "output"):
|
| val = obj.get(key)
|
| if isinstance(val, str) and val:
|
| return val
|
|
|
| message = obj.get("message")
|
| if isinstance(message, dict):
|
| content = message.get("content")
|
| if isinstance(content, str) and content:
|
| return content
|
| except Exception:
|
|
|
| if data_part:
|
| return data_part
|
| return None
|
|
|
|
|
| async def _stream_openai_chunks(openai_req: OpenAIChatRequest, request_id: str) -> AsyncGenerator[str, None]:
|
| """Forward to Blackbox with streaming, convert to OpenAI SSE chunks."""
|
| headers = _blackbox_headers()
|
| cookies = _parse_cookie_string(BLACKBOX_COOKIES_STRING)
|
| payload = _build_blackbox_payload(openai_req)
|
|
|
| with requests.Session() as sess:
|
| try:
|
| resp = sess.post(
|
| BLACKBOX_CHAT_URL,
|
| headers=headers,
|
| cookies=cookies,
|
| json=payload,
|
| stream=True,
|
| timeout=REQUEST_TIMEOUT
|
| )
|
| except requests.RequestException as e:
|
| raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}")
|
|
|
| if resp.status_code != 200:
|
| detail = resp.text[:500]
|
| raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}")
|
|
|
| created = int(time.time())
|
|
|
|
|
| initial = OpenAIStreamChunk(
|
| id=request_id,
|
| created=created,
|
| model=openai_req.model,
|
| choices=[OpenAIStreamChoice(index=0, delta={"role": "assistant"}, finish_reason=None)]
|
| )
|
| yield f"data: {initial.model_dump_json()}\n\n"
|
|
|
| for raw_line in resp.iter_lines():
|
| if not raw_line:
|
| continue
|
| try:
|
| line = raw_line.decode("utf-8", errors="ignore")
|
| except Exception:
|
| continue
|
|
|
| text = _extract_blackbox_text(line)
|
| if text is None:
|
| continue
|
|
|
| chunk = OpenAIStreamChunk(
|
| id=request_id,
|
| created=created,
|
| model=openai_req.model,
|
| choices=[OpenAIStreamChoice(index=0, delta={"content": text}, finish_reason=None)]
|
| )
|
| yield f"data: {chunk.model_dump_json()}\n\n"
|
|
|
|
|
| final = OpenAIStreamChunk(
|
| id=request_id,
|
| created=created,
|
| model=openai_req.model,
|
| choices=[OpenAIStreamChoice(index=0, delta={}, finish_reason="stop")]
|
| )
|
| yield f"data: {final.model_dump_json()}\n\n"
|
| yield "data: [DONE]\n\n"
|
|
|
|
|
| def _complete_non_streaming(openai_req: OpenAIChatRequest) -> str:
|
| """Forward to Blackbox, collect all streamed text, and return a single string."""
|
| headers = _blackbox_headers()
|
| cookies = _parse_cookie_string(BLACKBOX_COOKIES_STRING)
|
| payload = _build_blackbox_payload(openai_req)
|
|
|
| payload["stream"] = True
|
|
|
| with requests.Session() as sess:
|
| try:
|
| resp = sess.post(
|
| BLACKBOX_CHAT_URL,
|
| headers=headers,
|
| cookies=cookies,
|
| json=payload,
|
| stream=True,
|
| timeout=REQUEST_TIMEOUT
|
| )
|
| except requests.RequestException as e:
|
| raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}")
|
|
|
| if resp.status_code != 200:
|
| detail = resp.text[:500]
|
| raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}")
|
|
|
| full = []
|
| for raw_line in resp.iter_lines():
|
| if not raw_line:
|
| continue
|
| try:
|
| line = raw_line.decode("utf-8", errors="ignore")
|
| except Exception:
|
| continue
|
| text = _extract_blackbox_text(line)
|
| if text:
|
| full.append(text)
|
| return "".join(full)
|
|
|
|
|
|
|
| app = FastAPI(
|
| title="Blackbox.ai Reverse OpenAI API",
|
| version="1.0.0",
|
| description="OpenAI-compatible API that proxies to Blackbox.ai chat"
|
| )
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
| @app.get("/")
|
| async def root():
|
| return {
|
| "message": "Blackbox.ai Reverse OpenAI API",
|
| "version": "1.0.0",
|
| "status": "running",
|
| "endpoints": ["/v1/chat/completions", "/v1/models", "/health"],
|
| }
|
|
|
|
|
| @app.get("/health")
|
| async def health():
|
| return {
|
| "status": "healthy",
|
| "timestamp": int(time.time()),
|
| "upstream": "blackbox.ai",
|
| }
|
|
|
|
|
| @app.get("/v1/models")
|
| async def list_models():
|
| created = int(time.time())
|
| data = []
|
|
|
| for model_name in [k for k in MODEL_MAPPING.keys() if k != "default"]:
|
| data.append({
|
| "id": model_name,
|
| "object": "model",
|
| "created": created,
|
| "owned_by": "blackbox",
|
| })
|
| if not data:
|
| data.append({
|
| "id": "gpt-5",
|
| "object": "model",
|
| "created": created,
|
| "owned_by": "blackbox",
|
| })
|
| return {"object": "list", "data": data}
|
|
|
|
|
| @app.post("/v1/chat/completions")
|
| async def chat_completions(request: OpenAIChatRequest):
|
| request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
|
| logger.info(f"[{request_id}] Chat request: model={request.model}, stream={request.stream}")
|
|
|
|
|
| if bool(request.stream):
|
| return StreamingResponse(
|
| _stream_openai_chunks(request, request_id),
|
| media_type="text/event-stream",
|
| headers={
|
| "Cache-Control": "no-cache",
|
| "Connection": "keep-alive",
|
| "X-Accel-Buffering": "no",
|
| "Access-Control-Allow-Origin": "*",
|
| "Access-Control-Allow-Headers": "*",
|
| },
|
| )
|
|
|
|
|
| content = _complete_non_streaming(request)
|
| created = int(time.time())
|
| response = OpenAIChatResponse(
|
| id=request_id,
|
| created=created,
|
| model=request.model,
|
| choices=[OpenAIChoice(index=0, message={"role": "assistant", "content": content}, finish_reason="stop")],
|
| usage={
|
| "prompt_tokens": len(" ".join([m.get_text() for m in request.messages]).split()),
|
| "completion_tokens": len(content.split()),
|
| "total_tokens": len(" ".join([m.get_text() for m in request.messages]).split()) + len(content.split()),
|
| },
|
| )
|
| return response
|
|
|
|
|
| @app.post("/v1/chat/completions/raw")
|
| async def raw_validator(req: Request):
|
| body = await req.body()
|
| try:
|
| obj = json.loads(body)
|
| _ = OpenAIChatRequest(**obj)
|
| return {"valid": True}
|
| except Exception as e:
|
| return JSONResponse(status_code=422, content={"valid": False, "error": str(e)})
|
|
|
|
|
| if __name__ == "__main__":
|
| try:
|
| import uvicorn
|
| host = SERVER_CONFIG.get("host", "0.0.0.0")
|
| port = int(SERVER_CONFIG.get("port", 8090))
|
| logger.info(f"Starting Blackbox Reverse API on {host}:{port}")
|
| uvicorn.run(app, host=host, port=port, log_level="info")
|
| except Exception as e:
|
| logger.error(f"Failed to start server: {e}")
|
|
|
|
|
|
|