#!/usr/bin/env python3 """ Scira AI Reverse API Server FastAPI server that mimics Scira AI API endpoints """ import json import time import uuid import logging import requests import random import asyncio from typing import List, Dict, Any, Optional from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field from fastapi.middleware.cors import CORSMiddleware # Import configuration try: from config import ( PROXY_CONFIGS, RATE_LIMITING, USER_AGENTS, SEC_CH_UA_OPTIONS, PLATFORMS, ACCEPT_LANGUAGES, SESSION_TOKENS, POSTHOG_TOKENS, BYPASS_CONFIG, SCIRA_CONFIG, SERVER_CONFIG, get_primary_proxy ) except ImportError: # Fallback configuration if config.py is not available print("⚠️ config.py not found, using fallback configuration") PROXY_CONFIGS = [] RATE_LIMITING = {"min_delay": 0.5, "max_delay": 3.0, "request_timeout": 120} USER_AGENTS = ["Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"] SEC_CH_UA_OPTIONS = ['"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"'] PLATFORMS = ['"Windows"'] ACCEPT_LANGUAGES = ["en-US,en;q=0.9"] SESSION_TOKENS = ["QZkIjAJihMAA2Eju4uV54BpqwOFScwUl.Ptbu8Y%2FO%2BU4%2BimLLh6unpkg%2FMGdvFThciJvInvNAV8Y%3D"] POSTHOG_TOKENS = ["%7B%22distinct_id%22%3A%2201970149-15a6-74b8-8abd-93a5932c0b14%22%2C%22%24sesid%22%3A%5B1754648371696%2C%220198891e-aee2-7b9f-962c-305a2214e83c%22%2C1754647146210%5D%2C%22%24epp%22%3Atrue%2C%22%24initial_person_info%22%3A%7B%22r%22%3A%22https%3A%2F%2Fscira.ai%2F%22%2C%22u%22%3A%22https%3A%2F%2Fscira.ai%2F%22%7D%7D"] BYPASS_CONFIG = {"proxy_rotation": {"enabled": False}} SCIRA_CONFIG = {"base_url": "https://scira.ai/api", "search_endpoint": "/search"} SERVER_CONFIG = {"host": "0.0.0.0", "port": 8080} get_primary_proxy = lambda: None # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Scira AI API Configuration SCIRA_BASE_URL = SCIRA_CONFIG["base_url"] SCIRA_SEARCH_URL = f"{SCIRA_BASE_URL}{SCIRA_CONFIG['search_endpoint']}" # Rate limiting settings from config MIN_DELAY = RATE_LIMITING["min_delay"] MAX_DELAY = RATE_LIMITING["max_delay"] REQUEST_TIMEOUT = RATE_LIMITING["request_timeout"] # Request tracking for rate limiting last_request_time = 0 request_count = 0 def get_proxy_config(): """Get proxy configuration for requests""" proxy = get_primary_proxy() if not proxy: return None proxy_url = f"http://{proxy['username']}:{proxy['password']}@{proxy['host']}:{proxy['port']}" return { "http": proxy_url, "https": proxy_url } def get_random_headers(): """Generate randomized headers for rate limit bypass""" user_agent = random.choice(USER_AGENTS) sec_ch_ua = random.choice(SEC_CH_UA_OPTIONS) platform = random.choice(PLATFORMS) accept_language = random.choice(ACCEPT_LANGUAGES) # Generate random X-Forwarded-For IP x_forwarded_for = f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}" headers = { "accept": "text/event-stream", "accept-encoding": "gzip, deflate, br, zstd", "accept-language": accept_language, "cache-control": "no-cache", "content-type": "application/json", "origin": "https://scira.ai", "priority": "u=1, i", "referer": "https://scira.ai/", "sec-ch-ua": sec_ch_ua, "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": platform, "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": user_agent, "x-forwarded-for": x_forwarded_for, "x-real-ip": x_forwarded_for } # Add additional spoofing headers if enabled if BYPASS_CONFIG.get("ip_spoofing", {}).get("x_originating_ip", False): headers["x-originating-ip"] = x_forwarded_for return headers async def apply_rate_limiting(): """Apply rate limiting with random delays""" global last_request_time, request_count current_time = time.time() time_since_last = current_time - last_request_time # Calculate delay based on request frequency delay = random.uniform(MIN_DELAY, MAX_DELAY) # Add extra delay if requests are too frequent if time_since_last < MIN_DELAY: delay += (MIN_DELAY - time_since_last) if delay > 0: logger.info(f"Rate limiting: waiting {delay:.2f} seconds") await asyncio.sleep(delay) last_request_time = time.time() request_count += 1 logger.info(f"Request #{request_count} - Rate limiting applied") # Request tracking for rate limiting last_request_time = 0 request_count = 0 # Request Models class MessagePart(BaseModel): type: str = Field(..., description="Part type") text: str = Field(..., description="Text content") class Message(BaseModel): role: str = Field(..., description="Message role") parts: List[MessagePart] = Field(..., description="Message parts") id: str = Field(..., description="Message ID") class SciraSearchRequest(BaseModel): id: str = Field(..., description="Search ID") messages: List[Message] = Field(..., description="Messages") model: str = Field(default="scira-5-mini", description="Model name") group: str = Field(default="chat", description="Group type") timezone: str = Field(default="Asia/Calcutta", description="Timezone") isCustomInstructionsEnabled: bool = Field(default=True, description="Custom instructions enabled") # OpenAI-compatible models class OpenAIMessage(BaseModel): role: str = Field(description="Role: system, user, assistant, or tool") content: str = Field(description="Message content") name: Optional[str] = Field(None, description="Name of the participant") tool_calls: Optional[List[Dict[str, Any]]] = Field(None, description="Tool calls made by assistant") tool_call_id: Optional[str] = Field(None, description="Tool call ID for tool responses") class OpenAIFunction(BaseModel): name: str = Field(description="Function name") description: Optional[str] = Field(None, description="Function description") parameters: Optional[Dict[str, Any]] = Field(None, description="Function parameters schema") class OpenAITool(BaseModel): type: str = Field(default="function", description="Tool type") function: OpenAIFunction = Field(description="Function definition") class OpenAIStreamOptions(BaseModel): include_usage: Optional[bool] = Field(False, description="Include usage in stream") class OpenAIChatRequest(BaseModel): model: str = Field(default="scira-5-mini", description="Model to use") messages: List[OpenAIMessage] = Field(description="List of messages") max_tokens: Optional[int] = Field(None, description="Maximum tokens to generate") temperature: Optional[float] = Field(None, description="Sampling temperature") top_p: Optional[float] = Field(None, description="Nucleus sampling parameter") n: Optional[int] = Field(1, description="Number of completions") stream: Optional[bool] = Field(False, description="Enable streaming") stop: Optional[List[str]] = Field(None, description="Stop sequences") presence_penalty: Optional[float] = Field(None, description="Presence penalty") frequency_penalty: Optional[float] = Field(None, description="Frequency penalty") logit_bias: Optional[Dict[str, float]] = Field(None, description="Logit bias") user: Optional[str] = Field(None, description="User identifier") tools: Optional[List[OpenAITool]] = Field(None, description="Available tools") tool_choice: Optional[str] = Field(None, description="Tool choice strategy") stream_options: Optional[OpenAIStreamOptions] = Field(None, description="Stream options") # OpenAI Response models class OpenAIUsage(BaseModel): prompt_tokens: int = Field(description="Tokens in prompt") completion_tokens: int = Field(description="Tokens in completion") total_tokens: int = Field(description="Total tokens used") class OpenAIChoice(BaseModel): index: int = Field(description="Choice index") message: OpenAIMessage = Field(description="Generated message") finish_reason: Optional[str] = Field(None, description="Reason for finishing") class OpenAIChatResponse(BaseModel): id: str = Field(description="Response ID") object: str = Field(default="chat.completion", description="Object type") created: int = Field(description="Creation timestamp") model: str = Field(description="Model used") choices: List[OpenAIChoice] = Field(description="Generated choices") usage: Optional[OpenAIUsage] = Field(None, description="Token usage") class OpenAIStreamChoice(BaseModel): index: int = Field(description="Choice index") delta: Dict[str, Any] = Field(description="Delta content") finish_reason: Optional[str] = Field(None, description="Reason for finishing") class OpenAIStreamResponse(BaseModel): id: str = Field(description="Response ID") object: str = Field(default="chat.completion.chunk", description="Object type") created: int = Field(description="Creation timestamp") model: str = Field(description="Model used") choices: List[OpenAIStreamChoice] = Field(description="Stream choices") # FastAPI App app = FastAPI( title="Scira AI Reverse API", version="1.0.0", description="Reverse engineered Scira AI API" ) # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_scira_cookies(): """Get cookies for Scira API requests with optional randomization""" session_token = random.choice(SESSION_TOKENS) if BYPASS_CONFIG.get("session_rotation", {}).get("enabled", False) else SESSION_TOKENS[0] posthog_token = random.choice(POSTHOG_TOKENS) if len(POSTHOG_TOKENS) > 1 else POSTHOG_TOKENS[0] return { 'better-auth.session_token': session_token, 'ph_phc_iy7AsVPrSER6rEqnru1DlRr1rIy0GVYRilMbTIGUYrK_posthog': posthog_token } def get_session_tokens(): """Get list of available session tokens for rotation""" return SESSION_TOKENS def get_random_session_token(): """Get a random session token from available tokens""" return random.choice(SESSION_TOKENS) # OpenAI conversion functions def openai_to_scira_messages(openai_messages: List[OpenAIMessage]) -> List[Message]: """Convert OpenAI messages to Scira format""" scira_messages = [] for i, msg in enumerate(openai_messages): # Handle tool calls in content content = msg.content if msg.tool_calls: # Add tool calls to content for Scira tool_calls_text = "\n\nTool calls:\n" for tool_call in msg.tool_calls: tool_calls_text += f"- {tool_call.get('function', {}).get('name', 'unknown')}: {tool_call.get('function', {}).get('arguments', '')}\n" content += tool_calls_text scira_msg = Message( role=msg.role, parts=[MessagePart(type="text", text=content)], id=f"msg-{i}-{uuid.uuid4().hex[:8]}" ) scira_messages.append(scira_msg) return scira_messages def scira_to_openai_response(scira_data: str, request_id: str, model: str, is_stream: bool = False) -> Dict[str, Any]: """Convert Scira response to OpenAI format""" timestamp = int(time.time()) if is_stream: # Parse Scira SSE data if scira_data.startswith("data: "): try: data = json.loads(scira_data[6:]) # Remove "data: " prefix if data.get("type") == "reasoning-delta": # Convert to OpenAI stream format return { "id": request_id, "object": "chat.completion.chunk", "created": timestamp, "model": model, "choices": [{ "index": 0, "delta": {"content": data.get("delta", "")}, "finish_reason": None }] } elif data.get("type") == "end": # End of stream return { "id": request_id, "object": "chat.completion.chunk", "created": timestamp, "model": model, "choices": [{ "index": 0, "delta": {}, "finish_reason": "stop" }] } except json.JSONDecodeError: pass # Default empty delta for non-content chunks return { "id": request_id, "object": "chat.completion.chunk", "created": timestamp, "model": model, "choices": [{ "index": 0, "delta": {}, "finish_reason": None }] } @app.get("/") async def root(): """Root endpoint""" return { "message": "Scira AI Reverse API", "version": "1.0.0", "status": "running", "endpoints": [ "/api/search", "/health" ] } @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "timestamp": int(time.time()), "service": "scira-reverse-api" } async def stream_scira_response(request_data: dict, request_id: str): """Stream response from Scira API with rate limiting and proxy support""" try: # Apply rate limiting before making request await apply_rate_limiting() # Get randomized headers and proxy config headers = get_random_headers() proxy_config = get_proxy_config() # Add session cookies to headers cookies = get_scira_cookies() # Log proxy and header information proxy = get_primary_proxy() if proxy: logger.info(f"[{request_id}] Using proxy: {proxy['host']}:{proxy['port']}") logger.info(f"[{request_id}] User-Agent: {headers['user-agent']}") logger.info(f"[{request_id}] X-Forwarded-For: {headers['x-forwarded-for']}") # Make streaming request to Scira API with proxy and randomized headers response = requests.post( SCIRA_SEARCH_URL, headers=headers, cookies=cookies, json=request_data, stream=True, timeout=REQUEST_TIMEOUT, proxies=proxy_config ) logger.info(f"[{request_id}] Scira API response: {response.status_code}") if response.status_code == 200: # Stream the exact response from Scira for line in response.iter_lines(): if line: line_text = line.decode('utf-8') yield f"{line_text}\n" else: logger.error(f"[{request_id}] Scira API error: {response.status_code}") yield f"data: {{\"error\": \"Scira API error: {response.status_code}\"}}\n\n" except Exception as e: logger.error(f"[{request_id}] Streaming error: {e}") yield f"data: {{\"error\": \"Streaming error: {str(e)}\"}}\n\n" async def stream_openai_response(request_data: dict, request_id: str, model: str): """Stream OpenAI-compatible response from Scira API""" try: # Apply rate limiting before making request await apply_rate_limiting() # Get randomized headers and proxy config headers = get_random_headers() proxy_config = get_proxy_config() # Add session cookies to headers cookies = get_scira_cookies() # Log proxy and header information proxy = get_primary_proxy() if proxy: logger.info(f"[{request_id}] Using proxy: {proxy['host']}:{proxy['port']}") logger.info(f"[{request_id}] User-Agent: {headers['user-agent']}") logger.info(f"[{request_id}] X-Forwarded-For: {headers['x-forwarded-for']}") # Make streaming request to Scira API with proxy and randomized headers response = requests.post( SCIRA_SEARCH_URL, headers=headers, cookies=cookies, json=request_data, stream=True, timeout=REQUEST_TIMEOUT, proxies=proxy_config ) logger.info(f"[{request_id}] Scira API response: {response.status_code}") if response.status_code == 200: # Stream OpenAI-compatible response for line in response.iter_lines(): if line: line_text = line.decode('utf-8') # Convert Scira response to OpenAI format openai_chunk = scira_to_openai_response(line_text, request_id, model, is_stream=True) yield f"data: {json.dumps(openai_chunk)}\n\n" else: logger.error(f"[{request_id}] Scira API error: {response.status_code}") error_chunk = { "id": request_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "delta": {}, "finish_reason": "error" }], "error": {"message": f"Scira API error: {response.status_code}"} } yield f"data: {json.dumps(error_chunk)}\n\n" except Exception as e: logger.error(f"[{request_id}] Streaming error: {e}") error_chunk = { "id": request_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "delta": {}, "finish_reason": "error" }], "error": {"message": f"Streaming error: {str(e)}"} } yield f"data: {json.dumps(error_chunk)}\n\n" @app.post("/api/search") async def scira_search(request: SciraSearchRequest): """ Scira AI search endpoint - forwards requests to actual Scira API Returns SSE stream exactly as Scira API does """ request_id = f"req-{uuid.uuid4().hex[:8]}" logger.info(f"[{request_id}] Scira search request: model={request.model}") try: # Prepare request data exactly as in the curl request_data = { "id": request.id, "messages": [ { "role": msg.role, "parts": [ { "type": part.type, "text": part.text } for part in msg.parts ], "id": msg.id } for msg in request.messages ], "model": request.model, "group": request.group, "timezone": request.timezone, "isCustomInstructionsEnabled": request.isCustomInstructionsEnabled } logger.info(f"[{request_id}] Forwarding to Scira API") logger.info(f"[{request_id}] Request data: {json.dumps(request_data, indent=2)}") # Return streaming response return StreamingResponse( stream_scira_response(request_data, request_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", } ) except Exception as e: logger.error(f"[{request_id}] Unexpected error: {e}") raise HTTPException( status_code=500, detail=f"Internal server error: {str(e)}" ) # OpenAI-compatible endpoints @app.post("/v1/chat/completions") async def openai_chat_completions(request: OpenAIChatRequest): """ OpenAI-compatible chat completions endpoint Supports streaming, function calling, and all OpenAI parameters """ request_id = f"chatcmpl-{uuid.uuid4().hex[:8]}" logger.info(f"[{request_id}] OpenAI chat completion: model={request.model}, stream={request.stream}") try: # Convert OpenAI messages to Scira format scira_messages = openai_to_scira_messages(request.messages) # Prepare Scira request data scira_request_data = { "id": request_id, "messages": [ { "role": msg.role, "parts": [ { "type": part.type, "text": part.text } for part in msg.parts ], "id": msg.id } for msg in scira_messages ], "model": request.model, "group": "chat", "timezone": "Asia/Calcutta", "isCustomInstructionsEnabled": True } # Add tool information to the last message if tools are provided if request.tools: tools_text = "\n\nAvailable tools:\n" for tool in request.tools: func = tool.function tools_text += f"- {func.name}: {func.description or 'No description'}\n" if func.parameters: tools_text += f" Parameters: {json.dumps(func.parameters)}\n" # Add tools info to the last user message if scira_request_data["messages"]: last_msg = scira_request_data["messages"][-1] if last_msg["parts"]: last_msg["parts"][0]["text"] += tools_text logger.info(f"[{request_id}] Converted to Scira format") logger.info(f"[{request_id}] Tools provided: {len(request.tools) if request.tools else 0}") if request.stream: # Return OpenAI-compatible streaming response return StreamingResponse( stream_openai_response(scira_request_data, request_id, request.model), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", "Access-Control-Allow-Methods": "*" } ) else: # Non-streaming response (collect all chunks and return as single response) full_content = "" async for chunk_data in stream_openai_response(scira_request_data, request_id, request.model): if chunk_data.startswith("data: "): try: chunk_json = json.loads(chunk_data[6:]) if chunk_json.get("choices") and chunk_json["choices"][0].get("delta", {}).get("content"): full_content += chunk_json["choices"][0]["delta"]["content"] except json.JSONDecodeError: continue # Return complete response response = OpenAIChatResponse( id=request_id, object="chat.completion", created=int(time.time()), model=request.model, choices=[ OpenAIChoice( index=0, message=OpenAIMessage(role="assistant", content=full_content), finish_reason="stop" ) ], usage=OpenAIUsage( prompt_tokens=len(str(request.messages)) // 4, # Rough estimate completion_tokens=len(full_content) // 4, # Rough estimate total_tokens=(len(str(request.messages)) + len(full_content)) // 4 ) ) return response except Exception as e: logger.error(f"[{request_id}] Error processing OpenAI request: {e}") raise HTTPException( status_code=500, detail=f"Internal server error: {str(e)}" ) @app.get("/v1/models") async def openai_models(): """OpenAI-compatible models endpoint""" return { "object": "list", "data": [ { "id": "scira-5-mini", "object": "model", "created": int(time.time()), "owned_by": "scira", "permission": [], "root": "scira-5-mini", "parent": None } ] } if __name__ == "__main__": try: import uvicorn # Use configuration for server settings host = SERVER_CONFIG["host"] port = SERVER_CONFIG["port"] logger.info(f"🚀 Starting Scira Reverse API server on {host}:{port}") logger.info("📡 Endpoints available:") logger.info(" - POST /api/search - Scira AI search with SSE streaming") logger.info(" - GET /health - Health check") logger.info(" - GET / - API information") # Log configuration status proxy = get_primary_proxy() if proxy: logger.info(f"🔒 Proxy enabled: {proxy['host']}:{proxy['port']} ({proxy['type']})") else: logger.warning("⚠️ No proxy configured - using direct connection") logger.info(f"⏱️ Rate limiting: {MIN_DELAY}s - {MAX_DELAY}s delays") logger.info(f"🎭 User agents available: {len(USER_AGENTS)}") logger.info(f"🔑 Session tokens available: {len(SESSION_TOKENS)}") uvicorn.run( app, host=host, port=port, reload=False, log_level="info", access_log=True ) except ImportError: logger.error("❌ uvicorn not installed. Install with: pip install uvicorn") except Exception as e: logger.error(f"❌ Server startup error: {e}")