""" EU AI Act Compliance Auditor — HTTP Client. HTTP-based client that works through HF Space proxy (no WebSocket timeout issues). Same API as Maverick98's SREIncidentEnvHTTP: reset(), list_tools(), call_tool(). """ from __future__ import annotations import json from typing import Any, Dict, List, Optional class ComplianceAuditorHTTP: """HTTP-based client for the Compliance Auditor environment. Each call is a short HTTP round-trip — no WebSocket needed. Works reliably through HF Space reverse proxy. """ def __init__(self, base_url: str, timeout: float = 120.0): self.base_url = base_url.rstrip("/") if self.base_url.startswith("ws://"): self.base_url = self.base_url.replace("ws://", "http://", 1) elif self.base_url.startswith("wss://"): self.base_url = self.base_url.replace("wss://", "https://", 1) self.timeout = timeout self._client = None self._session_id: Optional[str] = None self._tools_cache: Optional[List[Dict]] = None self._last_done: bool = False self._last_reward: float = 0.0 async def __aenter__(self): import httpx self._client = httpx.AsyncClient(timeout=self.timeout) return self async def __aexit__(self, *args): if self._session_id and self._client: try: await self._client.post( f"{self.base_url}/api/close", json={"session_id": self._session_id}, ) except Exception: pass if self._client: await self._client.aclose() self._client = None async def reset(self, **kwargs) -> Dict[str, Any]: """Reset environment and start a new audit session.""" if self._client is None: import httpx self._client = httpx.AsyncClient(timeout=self.timeout) resp = await self._client.post( f"{self.base_url}/api/reset", json={ "difficulty": kwargs.get("difficulty", "medium"), "scenario_id": kwargs.get("scenario_id"), "seed": kwargs.get("seed"), }, ) resp.raise_for_status() data = resp.json() self._session_id = data["session_id"] self._tools_cache = data.get("tools", []) self._last_done = False self._last_reward = 0.0 return data["observation"] async def list_tools(self) -> List[Dict]: """Return cached tool list from last reset().""" return self._tools_cache or [] async def call_tool(self, name: str, **kwargs) -> Any: """Call an audit tool. Returns the tool's result (JSON string).""" if not self._session_id: raise RuntimeError("No active session. Call reset() first.") resp = await self._client.post( f"{self.base_url}/api/call_tool", json={ "session_id": self._session_id, "tool_name": name, "arguments": kwargs, }, ) resp.raise_for_status() data = resp.json() self._last_done = data.get("done", False) self._last_reward = data.get("reward", 0.0) return data.get("result", "")