Itachi-1824
fix: brutal audit β reset tool_call_counts, date dedup, unused vars, playground overhaul with scenario picker + status dashboard
b4d7ce3 | """FastAPI application for EU AI Act Compliance Auditor. | |
| Architecture (modeled on Maverick98's winning pattern): | |
| - create_app() for standard OpenEnv endpoints (/reset, /step, /state, /health, /ws) | |
| - Custom HTTP session API (/api/reset, /api/call_tool, /api/close) | |
| - Custom Gradio landing mounted at '/' replacing default inspector | |
| """ | |
| import inspect | |
| import json | |
| import uuid | |
| import asyncio | |
| import uvicorn | |
| from typing import Any, Dict, Optional | |
| from fastapi import Body, HTTPException | |
| from pydantic import BaseModel | |
| from openenv.core.env_server import create_app | |
| from models import ComplianceAction, ComplianceObservation | |
| from server.environment import ComplianceAuditorEnvironment, QUERY_BUDGET | |
| from scenarios.registry import SCENARIO_LIST, DIFFICULTY_TIERS | |
| # ββ Create base OpenEnv app βββββββββββββββββββββββββββββββββββββ | |
| app = create_app( | |
| ComplianceAuditorEnvironment, | |
| ComplianceAction, | |
| ComplianceObservation, | |
| env_name="compliance_auditor_env", | |
| max_concurrent_envs=5, | |
| ) | |
| # ββ /tasks endpoint (hackathon validator) βββββββββββββββββββββββ | |
| def list_tasks(): | |
| return {"tasks": SCENARIO_LIST} | |
| # ββ HTTP Session API ββββββββββββββββββββββββββββββββββββββββββββ | |
| _sessions: Dict[str, ComplianceAuditorEnvironment] = {} | |
| _session_lock = asyncio.Lock() | |
| class GraderBody(BaseModel): | |
| task_id: str = "easy" | |
| episode_id: Optional[str] = None | |
| seed: Optional[int] = None | |
| classification: str = "" | |
| findings: list = [] | |
| remediation: list = [] | |
| tool_sequence: list = [] | |
| steps_taken: int = 10 | |
| async def grader_endpoint(body: GraderBody): | |
| """Grade a completed episode. Returns score in [0.001, 0.999].""" | |
| from server.engine import compute_reward | |
| from scenarios.registry import get_scenario | |
| # Resolve scenario β supports fixed IDs, difficulty tiers, and procedural | |
| try: | |
| sc = get_scenario(body.task_id, body.seed) | |
| except ValueError: | |
| # Fallback: treat task_id as difficulty tier, pick first scenario | |
| tier_map = {"easy": "easy_chatbot_transparency_001", "medium": "medium_hiring_bias_001", "hard": "hard_social_scoring_prohibited_001"} | |
| sc = get_scenario(tier_map.get(body.task_id, "easy_chatbot_transparency_001"), body.seed) | |
| breakdown = compute_reward( | |
| scenario=sc, | |
| classification_submitted=body.classification, | |
| findings_submitted=body.findings, | |
| remediation_submitted=body.remediation, | |
| tool_sequence=body.tool_sequence, | |
| steps_taken=body.steps_taken, | |
| ) | |
| return {"score": breakdown.total(), "breakdown": breakdown.to_dict()} | |
| class ResetBody(BaseModel): | |
| difficulty: str = "medium" | |
| scenario_id: Optional[str] = None | |
| seed: Optional[int] = None | |
| class CallToolBody(BaseModel): | |
| session_id: str | |
| tool_name: str | |
| arguments: Dict[str, Any] = {} | |
| class CloseBody(BaseModel): | |
| session_id: str | |
| async def api_reset(body: ResetBody = Body(default_factory=ResetBody)): | |
| """Create session, reset env, return session_id + tools + observation.""" | |
| env = ComplianceAuditorEnvironment() | |
| obs = env.reset( | |
| seed=body.seed, | |
| difficulty=body.difficulty, | |
| scenario_id=body.scenario_id, | |
| ) | |
| session_id = str(uuid.uuid4()) | |
| async with _session_lock: | |
| _sessions[session_id] = env | |
| # Build tool schemas from _tool_fns | |
| tools = [] | |
| for name, fn in env._tool_fns.items(): | |
| sig = inspect.signature(fn) | |
| props = {} | |
| required = [] | |
| for pname, param in sig.parameters.items(): | |
| ptype = "string" | |
| if param.annotation == int: | |
| ptype = "integer" | |
| props[pname] = {"type": ptype} | |
| if param.default is inspect.Parameter.empty: | |
| required.append(pname) | |
| tools.append({ | |
| "name": name, | |
| "description": (fn.__doc__ or "").strip().split("\n")[0], | |
| "inputSchema": {"type": "object", "properties": props, "required": required}, | |
| }) | |
| return { | |
| "session_id": session_id, | |
| "observation": obs.metadata if hasattr(obs, "metadata") else {}, | |
| "done": obs.done, | |
| "reward": obs.reward, | |
| "tools": tools, | |
| } | |
| async def api_call_tool(body: CallToolBody): | |
| """Call a tool on an existing session.""" | |
| async with _session_lock: | |
| env = _sessions.get(body.session_id) | |
| if env is None: | |
| raise HTTPException(404, f"Session not found: {body.session_id}") | |
| fn = env._tool_fns.get(body.tool_name) | |
| if fn is None: | |
| raise HTTPException(400, f"Tool not found: {body.tool_name}. Available: {list(env._tool_fns.keys())}") | |
| try: | |
| result = fn(**body.arguments) | |
| except Exception as e: | |
| return {"result": json.dumps({"error": str(e)}), "done": env._done, "reward": env._reward} | |
| return {"result": result, "done": env._done, "reward": env._reward} | |
| async def api_close(body: CloseBody): | |
| """Close and clean up a session.""" | |
| async with _session_lock: | |
| env = _sessions.pop(body.session_id, None) | |
| if env: | |
| env.close() | |
| return {"closed": True, "session_id": body.session_id} | |
| # ββ Mount Gradio landing at '/' βββββββββββββββββββββββββββββββββ | |
| try: | |
| import gradio as gr | |
| from server.gradio_landing import create_landing_app | |
| _landing = create_landing_app() | |
| # Mount at / β exactly like Maverick98's working pattern | |
| app = gr.mount_gradio_app(app, _landing, path="/") | |
| print(f"[gradio_landing] mounted at / β gradio {gr.__version__}", flush=True) | |
| except Exception as e: | |
| import sys | |
| import traceback | |
| print(f"[gradio_landing] MOUNT FAILED: {e}", file=sys.stderr, flush=True) | |
| traceback.print_exc(file=sys.stderr) | |
| # ββ Entry point βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(host: str = "0.0.0.0", port: int = 7860): | |
| uvicorn.run(app, host=host, port=port, ws_ping_interval=None, ws_ping_timeout=None) | |
| if __name__ == "__main__": | |
| main() | |