Itachi-1824
fix: brutal audit β€” reset tool_call_counts, date dedup, unused vars, playground overhaul with scenario picker + status dashboard
b4d7ce3
"""FastAPI application for EU AI Act Compliance Auditor.
Architecture (modeled on Maverick98's winning pattern):
- create_app() for standard OpenEnv endpoints (/reset, /step, /state, /health, /ws)
- Custom HTTP session API (/api/reset, /api/call_tool, /api/close)
- Custom Gradio landing mounted at '/' replacing default inspector
"""
import inspect
import json
import uuid
import asyncio
import uvicorn
from typing import Any, Dict, Optional
from fastapi import Body, HTTPException
from pydantic import BaseModel
from openenv.core.env_server import create_app
from models import ComplianceAction, ComplianceObservation
from server.environment import ComplianceAuditorEnvironment, QUERY_BUDGET
from scenarios.registry import SCENARIO_LIST, DIFFICULTY_TIERS
# ── Create base OpenEnv app ─────────────────────────────────────
app = create_app(
ComplianceAuditorEnvironment,
ComplianceAction,
ComplianceObservation,
env_name="compliance_auditor_env",
max_concurrent_envs=5,
)
# ── /tasks endpoint (hackathon validator) ───────────────────────
@app.get("/tasks")
def list_tasks():
return {"tasks": SCENARIO_LIST}
# ── HTTP Session API ────────────────────────────────────────────
_sessions: Dict[str, ComplianceAuditorEnvironment] = {}
_session_lock = asyncio.Lock()
class GraderBody(BaseModel):
task_id: str = "easy"
episode_id: Optional[str] = None
seed: Optional[int] = None
classification: str = ""
findings: list = []
remediation: list = []
tool_sequence: list = []
steps_taken: int = 10
@app.post("/grader")
async def grader_endpoint(body: GraderBody):
"""Grade a completed episode. Returns score in [0.001, 0.999]."""
from server.engine import compute_reward
from scenarios.registry import get_scenario
# Resolve scenario β€” supports fixed IDs, difficulty tiers, and procedural
try:
sc = get_scenario(body.task_id, body.seed)
except ValueError:
# Fallback: treat task_id as difficulty tier, pick first scenario
tier_map = {"easy": "easy_chatbot_transparency_001", "medium": "medium_hiring_bias_001", "hard": "hard_social_scoring_prohibited_001"}
sc = get_scenario(tier_map.get(body.task_id, "easy_chatbot_transparency_001"), body.seed)
breakdown = compute_reward(
scenario=sc,
classification_submitted=body.classification,
findings_submitted=body.findings,
remediation_submitted=body.remediation,
tool_sequence=body.tool_sequence,
steps_taken=body.steps_taken,
)
return {"score": breakdown.total(), "breakdown": breakdown.to_dict()}
class ResetBody(BaseModel):
difficulty: str = "medium"
scenario_id: Optional[str] = None
seed: Optional[int] = None
class CallToolBody(BaseModel):
session_id: str
tool_name: str
arguments: Dict[str, Any] = {}
class CloseBody(BaseModel):
session_id: str
@app.post("/api/reset")
async def api_reset(body: ResetBody = Body(default_factory=ResetBody)):
"""Create session, reset env, return session_id + tools + observation."""
env = ComplianceAuditorEnvironment()
obs = env.reset(
seed=body.seed,
difficulty=body.difficulty,
scenario_id=body.scenario_id,
)
session_id = str(uuid.uuid4())
async with _session_lock:
_sessions[session_id] = env
# Build tool schemas from _tool_fns
tools = []
for name, fn in env._tool_fns.items():
sig = inspect.signature(fn)
props = {}
required = []
for pname, param in sig.parameters.items():
ptype = "string"
if param.annotation == int:
ptype = "integer"
props[pname] = {"type": ptype}
if param.default is inspect.Parameter.empty:
required.append(pname)
tools.append({
"name": name,
"description": (fn.__doc__ or "").strip().split("\n")[0],
"inputSchema": {"type": "object", "properties": props, "required": required},
})
return {
"session_id": session_id,
"observation": obs.metadata if hasattr(obs, "metadata") else {},
"done": obs.done,
"reward": obs.reward,
"tools": tools,
}
@app.post("/api/call_tool")
async def api_call_tool(body: CallToolBody):
"""Call a tool on an existing session."""
async with _session_lock:
env = _sessions.get(body.session_id)
if env is None:
raise HTTPException(404, f"Session not found: {body.session_id}")
fn = env._tool_fns.get(body.tool_name)
if fn is None:
raise HTTPException(400, f"Tool not found: {body.tool_name}. Available: {list(env._tool_fns.keys())}")
try:
result = fn(**body.arguments)
except Exception as e:
return {"result": json.dumps({"error": str(e)}), "done": env._done, "reward": env._reward}
return {"result": result, "done": env._done, "reward": env._reward}
@app.post("/api/close")
async def api_close(body: CloseBody):
"""Close and clean up a session."""
async with _session_lock:
env = _sessions.pop(body.session_id, None)
if env:
env.close()
return {"closed": True, "session_id": body.session_id}
# ── Mount Gradio landing at '/' ─────────────────────────────────
try:
import gradio as gr
from server.gradio_landing import create_landing_app
_landing = create_landing_app()
# Mount at / β€” exactly like Maverick98's working pattern
app = gr.mount_gradio_app(app, _landing, path="/")
print(f"[gradio_landing] mounted at / β€” gradio {gr.__version__}", flush=True)
except Exception as e:
import sys
import traceback
print(f"[gradio_landing] MOUNT FAILED: {e}", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
# ── Entry point ─────────────────────────────────────────────────
def main(host: str = "0.0.0.0", port: int = 7860):
uvicorn.run(app, host=host, port=port, ws_ping_interval=None, ws_ping_timeout=None)
if __name__ == "__main__":
main()