Shubham-Rasal
Fix row-count mismatch in extract_episode for PhAIL signals
073a3e5
"""
Robot Policy Evaluation Harness
Interactive HuggingFace Space β€” Bayesian + SPARC + STL on real robot data.
Based on: Kress-Gazit et al. (TRI/Cornell) arXiv:2409.09491
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from scipy.fft import rfft, rfftfreq
from scipy import stats
from datasets import load_dataset
from huggingface_hub import hf_hub_download, list_repo_files
import gradio as gr
import io, json, os, tempfile
# ── constants ────────────────────────────────────────────────────────────────
PALETTE = ["#60A5FA", "#FB923C", "#F87171", "#34D399", "#A78BFA"]
BG = "#0F172A"
CARD = "#1E293B"
BORDER = "#334155"
TEXT = "#F1F5F9"
SUBTEXT = "#94A3B8"
ACCENT = "#38BDF8"
PLOTLY_LAYOUT = dict(
paper_bgcolor=CARD, plot_bgcolor=CARD,
font=dict(color=TEXT, family="Inter, sans-serif"),
margin=dict(l=40, r=20, t=50, b=40),
legend=dict(bgcolor="rgba(0,0,0,0)", bordercolor=BORDER),
xaxis=dict(gridcolor=BORDER, zerolinecolor=BORDER),
yaxis=dict(gridcolor=BORDER, zerolinecolor=BORDER),
)
FS = 50 # Hz β€” ALOHA dataset sampling rate
# ── signal extraction ─────────────────────────────────────────────────────────
def extract_episode(states, actions):
states = np.array(states, dtype=float)
actions = np.array(actions, dtype=float)
dq = np.diff(states, axis=0)
speed = np.linalg.norm(dq, axis=1) * FS
# align both rows and dims before subtracting
min_rows = min(len(states), len(actions))
min_dim = min(states.shape[1], actions.shape[1])
effort = np.linalg.norm(
actions[:min_rows, :min_dim] - states[:min_rows, :min_dim], axis=1)
raw_z = states[:, 1]
z = (raw_z - raw_z.min()) / (raw_z.max() - raw_z.min() + 1e-9)
return speed, effort, z
# ── SPARC ─────────────────────────────────────────────────────────────────────
def sparc(speed, fs=FS, padlevel=4, fc=10.0, amp_th=0.05):
speed = np.asarray(speed, dtype=float)
if speed.max() == 0:
return 0.0
nfft = int(pow(2, np.ceil(np.log2(len(speed))) + padlevel))
freqs = rfftfreq(nfft, d=1.0 / fs)
Mf = np.abs(rfft(speed, n=nfft)); Mf /= Mf.max()
inx = np.where((freqs <= fc) & (Mf >= amp_th))[0]
fc_used = freqs[inx[-1]] if len(inx) else fc
inx = np.where(freqs <= fc_used)[0]
dMf = np.diff(Mf[inx]) / (freqs[1] - freqs[0])
_trapz = getattr(np, "trapezoid", None) or getattr(np, "trapz")
return float(-np.sqrt(_trapz(dMf**2 + 1, freqs[inx[:-1]])))
# ── STL manual robustness ─────────────────────────────────────────────────────
def stl_robustness(effort, z, threshold):
"""min over t of: if effort>threshold then z>0.3"""
high = effort > threshold
if high.any():
return float((z[high] - 0.3).min())
return float((threshold - effort).min())
# ── Bayesian helpers ──────────────────────────────────────────────────────────
def posterior(s, n, N=200_000):
return np.random.beta(1 + s, 1 + n - s, N)
def ci_lower(s, n, N=100_000):
return float(np.percentile(np.random.beta(1 + s, 1 + n - s, N), 5))
# ── load ALOHA demo data ──────────────────────────────────────────────────────
_cache = {}
# ── load PhAIL sample (4 autonomous VLA policies on Franka) ──────────────────
PHAIL_POLICIES = {
"act": "ACT",
"groot": "GR00T N1.6",
"openpi": "Ο€0.5",
"smolvla": "SmolVLA",
}
def load_phail_sample(progress=None):
"""
Load PhAIL sample β€” path structure:
sample/inference/<batch_id>/<episode_id>/<signal>.parquet
sample/inference/<batch_id>/<episode_id>/static.json ← model name + outcome
Per-episode signals used:
robot_state.q.parquet β€” 7-DOF Franka joint positions (states)
robot_commands.pose.parquet β€” commanded EE pose (actions proxy)
"""
if "phail" in _cache:
return _cache["phail"]
def _prog(frac, desc=""):
if progress is not None:
progress(frac, desc=desc)
_prog(0.05, "Listing PhAIL sample files…")
all_files = list(list_repo_files("phail-anon/phail-v1.0", repo_type="dataset"))
# Collect episode dirs that have static.json
static_files = sorted([f for f in all_files
if f.startswith("sample/inference/") and f.endswith("/static.json")])
print(f"[PhAIL] found {len(static_files)} episodes (static.json)")
if not static_files:
raise ValueError("No static.json files found under sample/inference/")
_prog(0.1, f"Found {len(static_files)} episodes β€” loading…")
policy_data = {} # built dynamically from actual model names in static.json
for i, sf in enumerate(static_files):
_prog(0.1 + 0.8 * (i / len(static_files)), f"Episode {i+1}/{len(static_files)}…")
ep_dir = sf[: -len("/static.json")] # e.g. sample/inference/000.../000.../
# ── load metadata ──────────────────────────────────────────────────────
try:
meta_local = hf_hub_download(repo_id="phail-anon/phail-v1.0",
filename=sf, repo_type="dataset")
with open(meta_local) as f:
meta = json.load(f)
except Exception as exc:
print(f"[PhAIL] skip {sf}: {exc}")
continue
model = meta.get("model", meta.get("source", "unknown"))
outcome = meta.get("eval", {}).get("outcome", meta.get("outcome", ""))
success = 1 if outcome == "Success" else 0
# Map model key β†’ display label
label = PHAIL_POLICIES.get(model, model)
# ── load joint positions (states) ──────────────────────────────────────
q_path = ep_dir + "/robot_state.q.parquet"
try:
q_local = hf_hub_download(repo_id="phail-anon/phail-v1.0",
filename=q_path, repo_type="dataset")
q_df = pd.read_parquet(q_local)
except Exception as exc:
print(f"[PhAIL] no robot_state.q for {ep_dir}: {exc}")
continue
# ── load commands (actions proxy) ──────────────────────────────────────
cmd_path = ep_dir + "/robot_commands.pose.parquet"
try:
cmd_local = hf_hub_download(repo_id="phail-anon/phail-v1.0",
filename=cmd_path, repo_type="dataset")
cmd_df = pd.read_parquet(cmd_local)
except Exception:
cmd_df = q_df # fall back to state = action (effort β‰ˆ 0)
states = q_df.select_dtypes(include=[np.number]).values.astype(float)
actions = cmd_df.select_dtypes(include=[np.number]).values.astype(float)
if len(states) < 4:
print(f"[PhAIL] skip {ep_dir} β€” only {len(states)} rows")
continue
speed, effort, z = extract_episode(states, actions)
if label not in policy_data:
policy_data[label] = {"trials": [], "speeds": [], "efforts": [], "zs": []}
policy_data[label]["trials"].append(success)
policy_data[label]["speeds"].append(speed)
policy_data[label]["efforts"].append(effort)
policy_data[label]["zs"].append(z)
print(f"[PhAIL] loaded: { {k: len(v['trials']) for k, v in policy_data.items()} }")
if not policy_data:
raise ValueError("PhAIL: no episodes loaded β€” check logs for path/schema details.")
_prog(0.95, "Finalising…")
_cache["phail"] = policy_data
return policy_data
def run_phail(progress=gr.Progress()):
progress(0, desc="Connecting to HuggingFace Hub…")
try:
policy_data = load_phail_sample(progress)
except ValueError as e:
return [None]*8 + [f"❌ {e}"]
progress(0.9, desc="Running Bayesian + SPARC + STL analysis…")
results = run_analysis(policy_data)
progress(1.0, desc="Done!")
return results
def load_aloha():
if "aloha" in _cache:
return _cache["aloha"]
raw = load_dataset("lerobot/aloha_static_cups_open", split="train")
episodes = {}
for row in raw:
ei = row["episode_index"]
if ei not in episodes:
episodes[ei] = {"states": [], "actions": []}
episodes[ei]["states"].append(row["observation.state"])
episodes[ei]["actions"].append(row["action"])
ep_ids = sorted(episodes.keys())
extracted = {}
for ei in ep_ids:
speed, effort, z = extract_episode(episodes[ei]["states"], episodes[ei]["actions"])
extracted[ei] = {"speed": speed, "effort": effort, "z": z}
_cache["aloha"] = (ep_ids, extracted)
return ep_ids, extracted
# ── core analysis ─────────────────────────────────────────────────────────────
def run_analysis(policy_data):
"""
policy_data: dict of name β†’ {trials, speeds, efforts, zs}
Returns dict of figures + report text.
"""
names = list(policy_data.keys())
colors = {n: PALETTE[i] for i, n in enumerate(names)}
all_efforts = np.concatenate([e for d in policy_data.values() for e in d["efforts"]])
effort_thresh = float(np.percentile(all_efforts, 75))
# ── compute metrics ───────────────────────────────────────────────────────
metrics = {}
for name, d in policy_data.items():
s = sum(d["trials"]); n = len(d["trials"])
sc = [sparc(sp) for sp in d["speeds"]]
st = [stl_robustness(ef, z, effort_thresh)
for ef, z in zip(d["efforts"], d["zs"])]
metrics[name] = {
"s": s, "n": n,
"sparc": sc,
"stl": st,
"ci_lo": ci_lower(s, n),
"safe": sum(1 for x in st if x >= 0) / len(st),
}
# ── fig 1: bayesian posteriors ────────────────────────────────────────────
x = np.linspace(0, 1, 400)
fig1 = go.Figure()
for name in names:
m = metrics[name]
a, b = 1 + m["s"], 1 + m["n"] - m["s"]
y = stats.beta.pdf(x, a, b)
fig1.add_trace(go.Scatter(
x=x, y=y, mode="lines", name=f"Policy {name} ({m['s']}/{m['n']})",
line=dict(color=colors[name], width=2.5),
fill="tozeroy", fillcolor=colors[name].replace(")", ",0.1)").replace("rgb", "rgba"),
))
fig1.add_vline(x=m["ci_lo"], line_color=colors[name],
line_dash="dot", line_width=1.5)
fig1.update_layout(**PLOTLY_LAYOUT,
title="β‘  Bayesian Posteriors (dotted = 95 % CI lower bound)",
xaxis_title="Success probability p", yaxis_title="Density")
# pairwise matrix
mat = np.zeros((len(names), len(names)))
samps = {n: posterior(metrics[n]["s"], metrics[n]["n"]) for n in names}
for i, a in enumerate(names):
for j, b in enumerate(names):
mat[i, j] = (samps[a] > samps[b]).mean()
fig1b = go.Figure(go.Heatmap(
z=mat, x=[f"Policy {n}" for n in names], y=[f"Policy {n}" for n in names],
colorscale="RdYlGn", zmin=0, zmax=1,
text=[[f"{mat[i,j]:.2f}" for j in range(len(names))] for i in range(len(names))],
texttemplate="%{text}", textfont=dict(size=14),
))
fig1b.update_layout(**PLOTLY_LAYOUT, title="P(row beats col)")
# ── fig 2: SPARC ──────────────────────────────────────────────────────────
fig2 = go.Figure()
for name in names:
sc = metrics[name]["sparc"]
fig2.add_trace(go.Box(
y=sc, name=f"Policy {name}",
marker_color=colors[name], line_color=colors[name],
boxmean=True, fillcolor=colors[name].replace(")", ",0.3)").replace("rgb", "rgba"),
))
fig2.add_hline(y=np.mean([v for m in metrics.values() for v in m["sparc"]]),
line_dash="dot", line_color=SUBTEXT, annotation_text="global mean")
fig2.update_layout(**PLOTLY_LAYOUT,
title="β‘‘ SPARC Smoothness (less negative = smoother)",
yaxis_title="SPARC score")
# sample speed profiles
fig2b = go.Figure()
for name in names:
sp = policy_data[name]["speeds"][0]
t = np.arange(len(sp)) / FS
fig2b.add_trace(go.Scatter(x=t, y=sp, mode="lines",
name=f"Policy {name}", line=dict(color=colors[name], width=1.8)))
fig2b.update_layout(**PLOTLY_LAYOUT,
title="Joint-Space Speed Profile (first episode per policy)",
xaxis_title="Time (s)", yaxis_title="Speed (rad/s)")
# ── fig 3: STL ────────────────────────────────────────────────────────────
fig3 = go.Figure()
for name in names:
st = metrics[name]["stl"]
fig3.add_trace(go.Scatter(
x=[f"Policy {name}"] * len(st),
y=st, mode="markers",
name=f"Policy {name}",
marker=dict(color=colors[name], size=9, opacity=0.7,
line=dict(color="white", width=0.5)),
))
fig3.add_trace(go.Scatter(
x=[f"Policy {name}", f"Policy {name}"],
y=[np.mean(st), np.mean(st)],
mode="lines", line=dict(color=colors[name], width=4),
showlegend=False,
))
fig3.add_hline(y=0, line_dash="dash", line_color="white", line_width=1.5,
annotation_text="violation boundary")
fig3.update_layout(**PLOTLY_LAYOUT,
title="β‘’ STL Safety Robustness (positive = constraint satisfied)",
yaxis_title="Robustness score")
# violation bar
viols = [sum(1 for x in metrics[n]["stl"] if x < 0) for n in names]
totals = [metrics[n]["n"] for n in names]
fig3b = go.Figure(go.Bar(
x=[f"Policy {n}" for n in names], y=viols,
marker_color=[colors[n] for n in names],
text=[f"{v}/{t}" for v, t in zip(viols, totals)],
textposition="outside",
))
fig3b.update_layout(**PLOTLY_LAYOUT,
title="Constraint Violations per Policy",
yaxis_title="# violations")
# ── fig 4: composite radar + bar ──────────────────────────────────────────
def normalize(vals):
lo, hi = min(vals), max(vals)
return [(v - lo) / (hi - lo + 1e-9) for v in vals]
sparc_norm = normalize([-metrics[n]["ci_lo"] for n in names]) # invert (less neg = better)
sparc_norm = [1 - v for v in normalize([-metrics[n]["ci_lo"] for n in names])]
sparc_norm = normalize([-np.mean(metrics[n]["sparc"]) for n in names])
sparc_norm = [1 - v for v in sparc_norm]
composite = {}
for i, name in enumerate(names):
m = metrics[name]
composite[name] = (
0.40 * m["ci_lo"] +
0.20 * sparc_norm[i] +
0.25 * m["safe"] +
0.15 * (m["s"] / m["n"])
)
cats = ["Success<br>(CI lb)", "Smoothness", "Safety<br>(STL)", "Success<br>rate"]
fig4 = go.Figure()
for i, name in enumerate(names):
m = metrics[name]
vals = [m["ci_lo"], sparc_norm[i], m["safe"], m["s"] / m["n"]]
vals += vals[:1]
theta = cats + [cats[0]]
fig4.add_trace(go.Scatterpolar(
r=vals, theta=theta, fill="toself", name=f"Policy {name}",
line=dict(color=colors[name], width=2),
fillcolor=colors[name].replace(")", ",0.15)").replace("rgb", "rgba"),
))
fig4.update_layout(
paper_bgcolor=CARD, font=dict(color=TEXT),
polar=dict(
bgcolor=CARD,
radialaxis=dict(visible=True, range=[0, 1], gridcolor=BORDER, color=SUBTEXT),
angularaxis=dict(gridcolor=BORDER, color=TEXT),
),
title="β‘£ Composite Radar",
legend=dict(bgcolor="rgba(0,0,0,0)"),
margin=dict(l=60, r=60, t=60, b=40),
)
cv = [composite[n] for n in names]
fig4b = go.Figure(go.Bar(
x=[f"Policy {n}" for n in names], y=cv,
marker_color=[colors[n] for n in names],
text=[f"{v:.3f}" for v in cv],
textposition="outside",
))
winner = names[int(np.argmax(cv))]
fig4b.update_layout(**PLOTLY_LAYOUT,
title=f"β‘£ Final Ranking (winner: Policy {winner})",
yaxis_title="Composite score",
yaxis_range=[0, max(cv) * 1.3])
# ── scorecard text ────────────────────────────────────────────────────────
rows = ["| Metric | " + " | ".join(f"Policy {n}" for n in names) + " |",
"|" + "---|" * (len(names) + 1)]
defs = [
("Episodes", lambda n: str(metrics[n]["n"])),
("Successes", lambda n: f"{metrics[n]['s']}/{metrics[n]['n']} ({metrics[n]['s']/metrics[n]['n']:.0%})"),
("95% CI lower", lambda n: f"{metrics[n]['ci_lo']:.1%}"),
("Mean SPARC", lambda n: f"{np.mean(metrics[n]['sparc']):.3f}"),
("Safe fraction", lambda n: f"{metrics[n]['safe']:.0%}"),
("Composite", lambda n: f"**{composite[n]:.3f}**"),
]
for label, fn in defs:
rows.append("| " + label + " | " + " | ".join(fn(n) for n in names) + " |")
rows.append(f"\nπŸ† **Recommended policy: {winner}**")
rows.append(f"\nEffort threshold used for STL: `{effort_thresh:.4f}`")
return fig1, fig1b, fig2, fig2b, fig3, fig3b, fig4, fig4b, "\n".join(rows)
# ── demo analysis (ALOHA) ─────────────────────────────────────────────────────
def run_demo(n_A, n_B, n_C, sr_A, sr_B, sr_C, progress=gr.Progress()):
progress(0, desc="Loading ALOHA dataset from HuggingFace…")
ep_ids, extracted = load_aloha()
total = n_A + n_B + n_C
if total > len(ep_ids):
n_A = min(n_A, len(ep_ids) // 3)
n_B = min(n_B, len(ep_ids) // 3)
n_C = len(ep_ids) - n_A - n_B
progress(0.3, desc="Extracting signals…")
ids_A = ep_ids[:n_A]
ids_B = ep_ids[n_A:n_A + n_B]
ids_C = ep_ids[n_A + n_B:n_A + n_B + n_C]
def make_policy(eids, sr):
n = len(eids)
ns = int(round(sr * n))
t = [1]*ns + [0]*(n - ns); np.random.shuffle(t)
return {
"trials": t,
"speeds": [extracted[ei]["speed"] for ei in eids],
"efforts": [extracted[ei]["effort"] for ei in eids],
"zs": [extracted[ei]["z"] for ei in eids],
}
policy_data = {
"A": make_policy(ids_A, sr_A / 100),
"B": make_policy(ids_B, sr_B / 100),
"C": make_policy(ids_C, sr_C / 100),
}
progress(0.6, desc="Running Bayesian + SPARC + STL analysis…")
results = run_analysis(policy_data)
progress(1.0, desc="Done!")
return results
# ── upload analysis ────────────────────────────────────────────────────────────
def run_upload(file):
if file is None:
return [None]*8 + ["⚠️ Please upload a CSV file."]
df = pd.read_csv(file.name)
required = {"episode_id", "success"}
state_cols = [c for c in df.columns if c.startswith("state_")]
action_cols = [c for c in df.columns if c.startswith("action_")]
if not required.issubset(df.columns):
return [None]*8 + [f"⚠️ CSV must have columns: episode_id, success, state_0…state_N, action_0…action_N\nFound: {list(df.columns)}"]
if not state_cols:
return [None]*8 + ["⚠️ No state columns found (expected state_0, state_1, …)"]
# Group by episode
policy_data = {"A": {"trials": [], "speeds": [], "efforts": [], "zs": []}}
for ei, grp in df.groupby("episode_id"):
states = grp[state_cols].values
actions = grp[action_cols].values if action_cols else states
speed, effort, z = extract_episode(states, actions)
policy_data["A"]["trials"].append(int(grp["success"].iloc[-1]))
policy_data["A"]["speeds"].append(speed)
policy_data["A"]["efforts"].append(effort)
policy_data["A"]["zs"].append(z)
# If policy_name column exists, split into multiple policies
if "policy_name" in df.columns:
policy_data = {}
for pname, pdf in df.groupby("policy_name"):
pd_ = {"trials": [], "speeds": [], "efforts": [], "zs": []}
for ei, grp in pdf.groupby("episode_id"):
states = grp[state_cols].values
actions = grp[action_cols].values if action_cols else states
speed, effort, z = extract_episode(states, actions)
pd_["trials"].append(int(grp["success"].iloc[-1]))
pd_["speeds"].append(speed)
pd_["efforts"].append(effort)
pd_["zs"].append(z)
policy_data[str(pname)] = pd_
return run_analysis(policy_data)
# ── CSV template + sample downloads ───────────────────────────────────────────
def make_template():
rows = []
for ep in range(3):
for frame in range(20):
row = {"episode_id": ep, "policy_name": ["A","B","C"][ep],
"success": int(frame == 19)}
for i in range(7):
row[f"state_{i}"] = round(np.random.randn() * 0.5, 4)
row[f"action_{i}"] = round(np.random.randn() * 0.5, 4)
rows.append(row)
df = pd.DataFrame(rows)
path = "/tmp/robot_eval_template.csv"
df.to_csv(path, index=False)
return path
SAMPLE_DATASETS = {
"ALOHA bimanual β€” cup opening (14-DOF)":
("lerobot/aloha_static_cups_open", "observation.state", "action", 20),
"Push-T real robot β€” tabletop push (8-DOF)":
("lerobot/columbia_cairlab_pusht_real", "observation.state", "action", 20),
"Franka Panda β€” free-play manipulation (13-DOF)":
("lerobot/nyu_franka_play_dataset", "observation.state", "action", 20),
"Unitree H1 humanoid β€” warehouse (19-DOF / 40-DOF action)":
("lerobot/unitreeh1_warehouse", "observation.state", "action", 12),
}
def download_sample(choice, progress=gr.Progress()):
if not choice:
return None
progress(0.1, desc=f"Loading {choice}…")
hf, sc, ac, max_eps = SAMPLE_DATASETS[choice]
ds = load_dataset(hf, split="train")
df_raw = ds.to_pandas()
ep_ids = sorted(df_raw["episode_index"].unique())[:max_eps]
rows = []
policy_name = choice.split("β€”")[0].strip()
progress(0.4, desc="Extracting episodes…")
for ei in ep_ids:
grp = df_raw[df_raw["episode_index"] == ei].reset_index(drop=True)
success = int(grp["next.reward"].max() > 0) if "next.reward" in grp.columns else 1
states = np.vstack(grp[sc].values)
actions = np.vstack(grp[ac].values)
for fi, (s, a) in enumerate(zip(states, actions)):
row = {"episode_id": int(ei), "policy_name": policy_name,
"frame_id": fi, "success": success}
for i, v in enumerate(s): row[f"state_{i}"] = round(float(v), 6)
for i, v in enumerate(a): row[f"action_{i}"] = round(float(v), 6)
rows.append(row)
path = f"/tmp/sample_{hf.split('/')[-1]}.csv"
pd.DataFrame(rows).to_csv(path, index=False)
progress(1.0, desc="Ready!")
return path
# ── UI ────────────────────────────────────────────────────────────────────────
CSS = f"""
:root {{
--bg: {BG}; --card: {CARD}; --border: {BORDER};
--text: {TEXT}; --sub: {SUBTEXT}; --accent: {ACCENT};
}}
body, .gradio-container {{ background: var(--bg) !important; color: var(--text) !important; }}
.gr-box, .gr-panel {{ background: var(--card) !important; border-color: var(--border) !important; }}
.gr-button-primary {{ background: var(--accent) !important; color: #0F172A !important; font-weight: 700; }}
.gr-button {{ border-color: var(--border) !important; color: var(--text) !important; }}
footer {{ display: none !important; }}
h1, h2, h3 {{ color: var(--text) !important; }}
label {{ color: var(--sub) !important; }}
.tab-nav button {{ color: var(--sub) !important; }}
.tab-nav button.selected {{ color: var(--accent) !important; border-color: var(--accent) !important; }}
"""
HEADER = """
<div style="text-align:center; padding: 2rem 1rem 1rem; background: linear-gradient(135deg, #0F172A 0%, #1E3A5F 100%); border-radius: 12px; margin-bottom: 1.5rem;">
<h1 style="font-size: 2.2rem; font-weight: 800; color: #F1F5F9; margin: 0; letter-spacing: -0.5px;">
πŸ€– Robot Policy Evaluation Harness
</h1>
<p style="color: #94A3B8; margin: 0.6rem 0 0; font-size: 1.05rem;">
Bayesian statistics Β· SPARC smoothness Β· STL safety constraints
</p>
<p style="color: #64748B; margin: 0.4rem 0 0; font-size: 0.85rem;">
Based on Kress-Gazit et al. (TRI/Cornell) Β· arXiv:2409.09491
</p>
</div>
"""
FORMAT_HINT = """
**Expected CSV format:**
```
episode_id, policy_name, success, state_0, state_1, ..., state_N, action_0, action_1, ..., action_N
0, A, 0, -0.001, -0.963, 1.173, ...
0, A, 0, -0.013, -0.952, 1.168, ...
...
0, A, 1, ... ← last frame: success=1
1, B, 0, ...
```
- `episode_id`: integer, groups frames into one rollout
- `policy_name`: string, used to group into comparison groups (omit for single-policy)
- `success`: 0 or 1 (use the value on the **last frame** of the episode)
- `state_N`: joint position at each timestep (any number of joints)
- `action_N`: commanded joint position (optional β€” if absent, effort will be zero)
"""
def build_ui():
with gr.Blocks(css=CSS, title="Robot Policy Eval Harness") as demo:
gr.HTML(HEADER)
with gr.Tabs():
# ── TAB 1: PhAIL β€” 4 autonomous VLA policies ──────────────────────
with gr.Tab("πŸ† PhAIL: 4 VLA Policies Head-to-Head"):
gr.Markdown("""
**Dataset**: [`phail-anon/phail-v1.0`](https://huggingface.co/datasets/phail-anon/phail-v1.0)
β€” 20 stratified episodes from **4 real VLA policies** running autonomously on a **Franka Research 3** robot.
No GPU needed β€” we're scoring pre-recorded rollouts, not running the policies.
| Policy | Type | Developer |
|--------|------|-----------|
| ACT | Action Chunking Transformer | Academic (Chi et al.) |
| GR00T N1.6 | Foundation model | NVIDIA |
| Ο€0.5 | Diffusion policy VLA | Physical Intelligence |
| SmolVLA | Compact VLA | HuggingFace |
Task: **bin-to-bin pick-and-place** (batteries, scissors, towels, wooden spoons).
Success labels are human-verified from gripper telemetry.
""")
phail_btn = gr.Button("β–Ά Load & Analyse PhAIL Sample", variant="primary", size="lg")
with gr.Row():
ph_bayes = gr.Plot(label="Bayesian Posteriors")
ph_bayes_mat = gr.Plot(label="P(row beats col)")
with gr.Row():
ph_sparc = gr.Plot(label="SPARC Smoothness")
ph_speed = gr.Plot(label="Speed Profiles")
with gr.Row():
ph_stl = gr.Plot(label="STL Robustness")
ph_viols = gr.Plot(label="Violations")
with gr.Row():
ph_radar = gr.Plot(label="Composite Radar")
ph_rank = gr.Plot(label="Final Ranking")
ph_scorecard = gr.Markdown(label="Scorecard")
phail_btn.click(
fn=run_phail,
inputs=[],
outputs=[ph_bayes, ph_bayes_mat,
ph_sparc, ph_speed,
ph_stl, ph_viols,
ph_radar, ph_rank,
ph_scorecard],
)
# ── TAB 2: DEMO DATA ──────────────────────────────────────────────
with gr.Tab("🦾 Try with Real ALOHA Data"):
gr.Markdown("""
**Dataset**: [`lerobot/aloha_static_cups_open`](https://huggingface.co/datasets/lerobot/aloha_static_cups_open)
β€” 50 real episodes of a bimanual ALOHA robot opening a cup lid, collected via human teleoperation.
Adjust the sliders to configure policies, then run the analysis.
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### Policy A")
n_A = gr.Slider(5, 20, value=15, step=1, label="Episodes")
sr_A = gr.Slider(10, 100, value=80, step=5, label="Success rate (%)")
with gr.Column(scale=1):
gr.Markdown("#### Policy B")
n_B = gr.Slider(5, 20, value=15, step=1, label="Episodes")
sr_B = gr.Slider(10, 100, value=60, step=5, label="Success rate (%)")
with gr.Column(scale=1):
gr.Markdown("#### Policy C")
n_C = gr.Slider(3, 10, value=10, step=1, label="Episodes")
sr_C = gr.Slider(10, 100, value=40, step=5, label="Success rate (%)")
run_btn = gr.Button("β–Ά Run Analysis", variant="primary", size="lg")
with gr.Row():
fig_bayes = gr.Plot(label="Bayesian Posteriors")
fig_bayes_mat = gr.Plot(label="P(row beats col)")
with gr.Row():
fig_sparc = gr.Plot(label="SPARC Smoothness")
fig_speed = gr.Plot(label="Speed Profiles")
with gr.Row():
fig_stl = gr.Plot(label="STL Robustness")
fig_viols = gr.Plot(label="Violations")
with gr.Row():
fig_radar = gr.Plot(label="Composite Radar")
fig_rank = gr.Plot(label="Final Ranking")
scorecard = gr.Markdown(label="Scorecard")
run_btn.click(
fn=run_demo,
inputs=[n_A, n_B, n_C, sr_A, sr_B, sr_C],
outputs=[fig_bayes, fig_bayes_mat,
fig_sparc, fig_speed,
fig_stl, fig_viols,
fig_radar, fig_rank,
scorecard],
)
# ── TAB 2: UPLOAD ─────────────────────────────────────────────────
with gr.Tab("πŸ“‚ Upload Your Own Data"):
gr.Markdown("### Try a real dataset β€” or upload your own rollouts")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("""
**Step 1 β€” pick a real robot dataset to download as a ready-to-use CSV:**
| Dataset | Robot | DOF | Task |
|---|---|---|---|
| ALOHA bimanual | Stanford ALOHA (2Γ— ViperX) | 14 | Cup opening |
| Push-T real | Columbia delta robot | 8 | Push block to goal |
| Franka Panda | NYU Franka Emika Panda | 13 | Free-play manipulation |
| Unitree H1 | Full-size humanoid | 19 state / 40 action | Warehouse pick-place |
Then upload the downloaded CSV below and hit **Analyse**.
""")
with gr.Column(scale=1):
sample_picker = gr.Dropdown(
choices=list(SAMPLE_DATASETS.keys()),
label="Real robot dataset",
value=None,
)
dl_btn = gr.Button("⬇ Download as CSV", variant="secondary")
dl_file = gr.File(label="Downloaded CSV (upload below ↓)")
dl_btn.click(fn=download_sample, inputs=[sample_picker], outputs=[dl_file])
gr.Markdown("---")
tmpl_btn = gr.Button("⬇ Blank template CSV", variant="secondary")
tmpl_file = gr.File(label="Template")
tmpl_btn.click(fn=make_template, outputs=tmpl_file)
with gr.Accordion("CSV format reference", open=False):
gr.Markdown(FORMAT_HINT)
upload = gr.File(label="⬆ Upload rollout CSV (your own or downloaded above)", file_types=[".csv"])
run_upload_btn = gr.Button("β–Ά Analyse Uploaded Data", variant="primary", size="lg")
with gr.Row():
uf_bayes = gr.Plot(label="Bayesian Posteriors")
uf_bayes_mat = gr.Plot(label="P(row beats col)")
with gr.Row():
uf_sparc = gr.Plot(label="SPARC Smoothness")
uf_speed = gr.Plot(label="Speed Profiles")
with gr.Row():
uf_stl = gr.Plot(label="STL Robustness")
uf_viols = gr.Plot(label="Violations")
with gr.Row():
uf_radar = gr.Plot(label="Composite Radar")
uf_rank = gr.Plot(label="Final Ranking")
uf_scorecard = gr.Markdown(label="Scorecard")
run_upload_btn.click(
fn=run_upload,
inputs=[upload],
outputs=[uf_bayes, uf_bayes_mat,
uf_sparc, uf_speed,
uf_stl, uf_viols,
uf_radar, uf_rank,
uf_scorecard],
)
# ── TAB 3: ABOUT ──────────────────────────────────────────────────
with gr.Tab("πŸ“– About"):
gr.Markdown("""
## What this is
A lightweight evaluation harness for robot manipulation policies, based on best practices
from [Kress-Gazit et al. (TRI / Cornell), arXiv:2409.09491](https://arxiv.org/abs/2409.09491).
The field almost universally reports bare success rate from a handful of trials with no
statistical analysis. This tool replaces that with three complementary methods:
---
### β‘  Bayesian Bernoulli Analysis
Models each policy's success probability as a **Beta distribution** rather than a point
estimate. Shows the full posterior, 95% credible interval lower bound, and the probability
that one policy is genuinely better than another β€” not just luckier.
> *"P(A > B) = 0.83" is very different from "A scored 80%, B scored 60%".*
### β‘‘ SPARC Smoothness
Computes the **SPectral ARC length** of the robot's joint-space speed profile.
Two policies can have identical success rates but completely different motion quality.
A policy that succeeds jerkily is unsafe near people and hard on hardware.
### β‘’ STL Safety Constraints
Encodes behavioral requirements as **Signal Temporal Logic** formulas and automatically
scores every rollout β€” no human video review required.
> Example: *"Whenever the robot is straining (high tracking error), the arm must stay above table height."*
---
### Uploading your own data
Any robot with joint-position logging works. The CSV format is:
```
episode_id, policy_name, success, state_0 … state_N, action_0 … action_N
```
---
### Citation
```bibtex
@article{kressgazit2024robot,
title = {Robot Learning as an Empirical Science},
author = {Kress-Gazit, Hadas and others},
journal = {arXiv preprint arXiv:2409.09491},
year = {2024}
}
```
""")
return demo
if __name__ == "__main__":
build_ui().launch()