Blackbox / blackbox_server.py
Samfy001's picture
Upload 5 files
8754765 verified
raw
history blame contribute delete
16.3 kB
#!/usr/bin/env python3
"""
Blackbox.ai Reverse OpenAI-Compatible API Server
This server accepts OpenAI-compatible Chat Completions requests and forwards them
to Blackbox.ai's chat endpoint using the exact required payload structure and headers.
It streams responses back as OpenAI-style SSE chunks or returns a non-streaming
OpenAI-compatible response.
"""
import json
import time
import uuid
import logging
import requests
from typing import List, Dict, Any, Optional, Union, AsyncGenerator
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
try:
from .config import (
BLACKBOX_HEADERS,
BLACKBOX_COOKIES_STRING,
MODEL_MAPPING,
SERVER_CONFIG,
BLACKBOX_VALIDATED,
BLACKBOX_STATIC_SESSION,
BLACKBOX_STATIC_SUBSCRIPTION,
BLACKBOX_STATIC_BASE_FIELDS,
)
except ImportError:
# Allow running as a module or from root
try:
from config import (
BLACKBOX_HEADERS,
BLACKBOX_COOKIES_STRING,
MODEL_MAPPING,
SERVER_CONFIG,
BLACKBOX_VALIDATED,
BLACKBOX_STATIC_SESSION,
BLACKBOX_STATIC_SUBSCRIPTION,
BLACKBOX_STATIC_BASE_FIELDS,
)
except ImportError:
raise
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
BLACKBOX_CHAT_URL = "https://www.blackbox.ai/api/chat"
REQUEST_TIMEOUT = 120
# ===== OpenAI-compatible Pydantic models =====
class OpenAIMessage(BaseModel):
role: str = Field(description="Role: system, user, assistant, function, or tool")
content: Optional[Union[str, List[Dict[str, Any]]]] = Field(None, description="Message content")
name: Optional[str] = None
function_call: Optional[Dict[str, Any]] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None
def get_text(self) -> str:
if isinstance(self.content, str):
return self.content
if isinstance(self.content, list):
text_parts: List[str] = []
for item in self.content:
if isinstance(item, dict):
if item.get("type") == "text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "image_url":
text_parts.append("[Image]")
return "".join(text_parts)
return str(self.content) if self.content is not None else ""
class Config:
extra = "allow"
class OpenAIFunction(BaseModel):
name: str
description: Optional[str] = None
parameters: Optional[Dict[str, Any]] = None
class Config:
extra = "allow"
class OpenAITool(BaseModel):
type: str = Field(default="function")
function: Optional[OpenAIFunction] = None
class Config:
extra = "allow"
class OpenAIChatRequest(BaseModel):
model: str = Field(..., description="OpenAI model name")
messages: List[OpenAIMessage]
max_tokens: Optional[int] = None
temperature: Optional[float] = None
top_p: Optional[float] = None
n: Optional[int] = 1
stream: Optional[bool] = False
stop: Optional[Union[str, List[str]]] = None
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
logit_bias: Optional[Dict[str, float]] = None
user: Optional[str] = None
tools: Optional[List[OpenAITool]] = None
tool_choice: Optional[Union[str, Dict[str, Any]]] = None
functions: Optional[List[OpenAIFunction]] = None
function_call: Optional[Union[str, Dict[str, Any]]] = None
class Config:
extra = "allow"
class OpenAIChoice(BaseModel):
index: int = 0
message: Dict[str, Any]
finish_reason: Optional[str] = None
class OpenAIChatResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[OpenAIChoice]
usage: Optional[Dict[str, int]] = None
class OpenAIStreamChoice(BaseModel):
index: int = 0
delta: Dict[str, Any]
finish_reason: Optional[str] = None
class OpenAIStreamChunk(BaseModel):
id: str
object: str = "chat.completion.chunk"
created: int
model: str
choices: List[OpenAIStreamChoice]
def _parse_cookie_string(cookie_string: str) -> Dict[str, str]:
cookies: Dict[str, str] = {}
if not cookie_string:
return cookies
parts = [p.strip() for p in cookie_string.split(";") if p.strip()]
for part in parts:
if "=" in part:
name, value = part.split("=", 1)
cookies[name.strip()] = value.strip()
return cookies
def _blackbox_headers() -> Dict[str, str]:
# Return a copy so we can safely mutate per-request if needed
return dict(BLACKBOX_HEADERS)
def _build_blackbox_payload(openai_req: OpenAIChatRequest) -> Dict[str, Any]:
"""
Build the exact Blackbox payload including all fields from the provided curl.
We only map OpenAI messages/model into the appropriate fields and keep all other
fields present so the upstream accepts the request.
"""
# Generate short ids
def short_id() -> str:
return uuid.uuid4().hex[:7]
# Use user-provided top-level id if supplied as an extra field
extra_top = getattr(openai_req, "model_extra", {}) or {}
req_id = str(extra_top.get("id")) if extra_top.get("id") else short_id()
# Map OpenAI messages to Blackbox format (id, content, role)
bb_messages: List[Dict[str, Any]] = []
first = True
for msg in openai_req.messages:
msg_role = str(getattr(msg, "role", "")).strip() or "user"
# Coerce unknown roles to 'user' to avoid upstream rejection
if msg_role not in {"system", "user", "assistant"}:
msg_role = "user"
msg_content = msg.get_text()
# Ensure minimally valid defaults to avoid 501 from upstream on malformed content
if not msg_content:
msg_content = ""
# Reuse provided message.id if present in extras; ensure first message id matches top-level id
msg_extra = getattr(msg, "model_extra", {}) or {}
msg_id = str(msg_extra.get("id")) if msg_extra.get("id") else (req_id if first else short_id())
bb_messages.append({
"id": msg_id,
"content": msg_content,
"role": msg_role,
})
first = False
# Determine Blackbox agent from model mapping
agent = MODEL_MAPPING.get(openai_req.model, MODEL_MAPPING.get("default"))
payload: Dict[str, Any] = {
"messages": bb_messages,
"agentMode": {
"name": agent["name"],
"id": agent["id"],
"mode": True,
},
"id": req_id,
# Include all base fields exactly
**BLACKBOX_STATIC_BASE_FIELDS,
# Ensure these fields reflect user-provided values
"maxTokens": (openai_req.max_tokens if (openai_req.max_tokens is not None and openai_req.max_tokens > 0) else 1024),
"userSelectedModel": agent["name"],
"userSelectedAgent": "VscodeAgent",
# Use validated token as provided
"validated": BLACKBOX_VALIDATED,
# Session/subscription from provided curl
"session": BLACKBOX_STATIC_SESSION,
"subscriptionCache": {
**BLACKBOX_STATIC_SUBSCRIPTION
},
# Blackbox expects streaming
"stream": True,
}
return payload
def _extract_blackbox_text(line: str) -> Optional[str]:
"""Try to extract textual delta from a Blackbox SSE line."""
# Lines may be plain or prefixed with 'data: '
if line.startswith("data: "):
data_part = line[6:].strip()
elif line.startswith("data:"):
data_part = line[5:].strip()
else:
data_part = line.strip()
if not data_part or data_part == "[DONE]":
return None
# Try JSON
try:
obj = json.loads(data_part)
# Common patterns: {"type": "response"|"delta"|..., "data"|"text"|"delta": "..."}
for key in ("data", "text", "delta", "output"):
val = obj.get(key)
if isinstance(val, str) and val:
return val
# Some responses nest under obj["message"]["content"] etc.
message = obj.get("message")
if isinstance(message, dict):
content = message.get("content")
if isinstance(content, str) and content:
return content
except Exception:
# Not JSON - possibly plain text token
if data_part:
return data_part
return None
async def _stream_openai_chunks(openai_req: OpenAIChatRequest, request_id: str) -> AsyncGenerator[str, None]:
"""Forward to Blackbox with streaming, convert to OpenAI SSE chunks."""
headers = _blackbox_headers()
cookies = _parse_cookie_string(BLACKBOX_COOKIES_STRING)
payload = _build_blackbox_payload(openai_req)
with requests.Session() as sess:
try:
resp = sess.post(
BLACKBOX_CHAT_URL,
headers=headers,
cookies=cookies,
json=payload,
stream=True,
timeout=REQUEST_TIMEOUT
)
except requests.RequestException as e:
raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}")
if resp.status_code != 200:
detail = resp.text[:500]
raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}")
created = int(time.time())
# Initial role chunk
initial = OpenAIStreamChunk(
id=request_id,
created=created,
model=openai_req.model,
choices=[OpenAIStreamChoice(index=0, delta={"role": "assistant"}, finish_reason=None)]
)
yield f"data: {initial.model_dump_json()}\n\n"
for raw_line in resp.iter_lines():
if not raw_line:
continue
try:
line = raw_line.decode("utf-8", errors="ignore")
except Exception:
continue
text = _extract_blackbox_text(line)
if text is None:
continue
chunk = OpenAIStreamChunk(
id=request_id,
created=created,
model=openai_req.model,
choices=[OpenAIStreamChoice(index=0, delta={"content": text}, finish_reason=None)]
)
yield f"data: {chunk.model_dump_json()}\n\n"
# Final chunk
final = OpenAIStreamChunk(
id=request_id,
created=created,
model=openai_req.model,
choices=[OpenAIStreamChoice(index=0, delta={}, finish_reason="stop")]
)
yield f"data: {final.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
def _complete_non_streaming(openai_req: OpenAIChatRequest) -> str:
"""Forward to Blackbox, collect all streamed text, and return a single string."""
headers = _blackbox_headers()
cookies = _parse_cookie_string(BLACKBOX_COOKIES_STRING)
payload = _build_blackbox_payload(openai_req)
# Force stream true upstream; we will aggregate locally
payload["stream"] = True
with requests.Session() as sess:
try:
resp = sess.post(
BLACKBOX_CHAT_URL,
headers=headers,
cookies=cookies,
json=payload,
stream=True,
timeout=REQUEST_TIMEOUT
)
except requests.RequestException as e:
raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}")
if resp.status_code != 200:
detail = resp.text[:500]
raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}")
full = []
for raw_line in resp.iter_lines():
if not raw_line:
continue
try:
line = raw_line.decode("utf-8", errors="ignore")
except Exception:
continue
text = _extract_blackbox_text(line)
if text:
full.append(text)
return "".join(full)
# ===== FastAPI App =====
app = FastAPI(
title="Blackbox.ai Reverse OpenAI API",
version="1.0.0",
description="OpenAI-compatible API that proxies to Blackbox.ai chat"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"message": "Blackbox.ai Reverse OpenAI API",
"version": "1.0.0",
"status": "running",
"endpoints": ["/v1/chat/completions", "/v1/models", "/health"],
}
@app.get("/health")
async def health():
return {
"status": "healthy",
"timestamp": int(time.time()),
"upstream": "blackbox.ai",
}
@app.get("/v1/models")
async def list_models():
created = int(time.time())
data = []
# Expose mapped models (keys of MODEL_MAPPING excluding 'default')
for model_name in [k for k in MODEL_MAPPING.keys() if k != "default"]:
data.append({
"id": model_name,
"object": "model",
"created": created,
"owned_by": "blackbox",
})
if not data:
data.append({
"id": "gpt-5",
"object": "model",
"created": created,
"owned_by": "blackbox",
})
return {"object": "list", "data": data}
@app.post("/v1/chat/completions")
async def chat_completions(request: OpenAIChatRequest):
request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
logger.info(f"[{request_id}] Chat request: model={request.model}, stream={request.stream}")
# Streamed
if bool(request.stream):
return StreamingResponse(
_stream_openai_chunks(request, request_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
},
)
# Non-streaming
content = _complete_non_streaming(request)
created = int(time.time())
response = OpenAIChatResponse(
id=request_id,
created=created,
model=request.model,
choices=[OpenAIChoice(index=0, message={"role": "assistant", "content": content}, finish_reason="stop")],
usage={
"prompt_tokens": len(" ".join([m.get_text() for m in request.messages]).split()),
"completion_tokens": len(content.split()),
"total_tokens": len(" ".join([m.get_text() for m in request.messages]).split()) + len(content.split()),
},
)
return response
@app.post("/v1/chat/completions/raw")
async def raw_validator(req: Request):
body = await req.body()
try:
obj = json.loads(body)
_ = OpenAIChatRequest(**obj)
return {"valid": True}
except Exception as e:
return JSONResponse(status_code=422, content={"valid": False, "error": str(e)})
if __name__ == "__main__":
try:
import uvicorn
host = SERVER_CONFIG.get("host", "0.0.0.0")
port = int(SERVER_CONFIG.get("port", 8090))
logger.info(f"Starting Blackbox Reverse API on {host}:{port}")
uvicorn.run(app, host=host, port=port, log_level="info")
except Exception as e:
logger.error(f"Failed to start server: {e}")