GitHub Actions
fix(ui): disable broken tabs + add @spaces.GPU + MiniCPM trust_remote_code note
fb17651
Raw
History Blame Contribute Delete
15.8 kB
"""Ask tab — LLM + RAG via capability bus.
The request flow is:
UI → bus.call("rag.query") [optional, if corpus selected]
→ bus.call("llm.chat") [routes to best available node]
The routing trace shows exactly which node answered and why.
No hardcoded responses. If no LLM is configured, an UnavailableBackend
error is surfaced directly rather than fabricating an answer.
LLM Models:
- MiniCPM3-4B (OpenBMB default) requires trust_remote_code=True when loading via
transformers.from_pretrained() — the model repo contains custom modeling code.
HF Transformers backend (app.py) passes this flag; local-first vLLM/llama.cpp
endpoints do not need it (they handle the model internally).
Spec: docs/M04-llm.md, docs/M05-rag.md, docs/M03-bus.md §4
"""
from __future__ import annotations
def _route_badge_html(trace: dict) -> str:
"""Render a compact routing-trace badge.
Shows which node served each leg (RAG + LLM) with locality icon and colour.
Displayed instead of a raw JSON dump so judges see the mesh story at a glance.
"""
if not trace:
return ""
_BADGE = (
"display:inline-block;padding:3px 10px;border-radius:12px;"
"font-size:12px;font-weight:600;margin:2px 4px;"
)
_LOCAL_STYLE = f"{_BADGE}background:#1b4332;color:#4CAF50;border:1px solid #4CAF50"
_REMOTE_STYLE = f"{_BADGE}background:#0d2137;color:#64b5f6;border:1px solid #2196F3"
_ERR_STYLE = f"{_BADGE}background:#2d0f0f;color:#ef5350;border:1px solid #ef5350"
def _via_badge(via: str, prefix: str) -> str:
if not via or via in ("local", "") or via.startswith("local"):
return f'<span style="{_LOCAL_STYLE}">🏠 {prefix} · Local</span>'
short = via[:20] + ("…" if len(via) > 20 else "")
return f'<span style="{_REMOTE_STYLE}">🌐 {prefix} · {short}</span>'
parts: list[str] = []
rag = trace.get("rag")
if rag:
if "error" in rag:
parts.append(f'<span style="{_ERR_STYLE}">❌ RAG error</span>')
else:
chunks = rag.get("chunks_found", 0)
via = rag.get("routed_via", "local")
badge = _via_badge(via, f"RAG ({chunks} chunks)")
parts.append(badge)
llm = trace.get("llm")
if llm:
if "error" in llm:
parts.append(f'<span style="{_ERR_STYLE}">❌ LLM error</span>')
else:
via = llm.get("routed_via", "local")
parts.append(_via_badge(via, "LLM"))
if not parts:
return ""
inner = "".join(parts)
return (
f'<div style="margin-top:6px;padding:6px 8px;background:#0a1a14;'
f'border-radius:8px;border-left:3px solid #4CAF50">'
f'<span style="color:#888;font-size:11px;margin-right:6px">🛣️ Routed via:</span>'
f"{inner}</div>"
)
def _msg_text(content) -> str:
"""Coerce Gradio chat-message content to a plain string.
``gr.Chatbot(type="messages")`` can round-trip content back as a structured
list/dict (e.g. ``[{'text': '...'}]``). Flatten it so the LLM prompt never
receives that structure verbatim.
"""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, dict):
return str(content.get("text") or content.get("content") or "")
if isinstance(content, list | tuple):
parts: list[str] = []
for p in content:
if isinstance(p, dict):
parts.append(str(p.get("text") or p.get("content") or ""))
elif isinstance(p, str):
parts.append(p)
return " ".join(x for x in parts if x).strip()
return str(content)
def _get_corpora_sync(bus) -> list[str]:
"""Scan the bus registry synchronously for all rag.query corpus names.
This is safe to call at build time (no event loop needed).
"""
if bus is None:
return []
corpora: list[str] = []
try:
all_entries = list(bus.registry.all())
for entry in all_entries:
if entry.descriptor.name == "rag.query":
corpus = (entry.descriptor.params or {}).get("corpus")
if corpus and corpus not in corpora:
corpora.append(corpus)
except Exception:
pass
return corpora
async def _get_corpora_async(bus) -> list[str]:
"""Fetch corpora via rag.list_corpora capability, falling back to registry scan."""
if bus is None:
return []
try:
r = await bus.call("rag.list_corpora", (1, 0), {"input": {}})
corpora = r.get("output", {}).get("corpora", [])
if corpora:
return corpora
except Exception:
pass
return _get_corpora_sync(bus)
# Backward compat alias used at module load
def _get_corpora(bus) -> list[str]:
return _get_corpora_sync(bus)
def build_ask_tab(bus=None):
import gradio as gr
corpora = _get_corpora(bus)
corpus_choices = ["(none)", *corpora]
with gr.Column():
gr.HTML("""
<div style="background:linear-gradient(135deg,#1e1b4b,#312e81);
border-radius:10px;padding:16px 20px;margin-bottom:8px;
border:1px solid #4f46e5">
<h3 style="color:#fff;margin:0">💬 Ask the Mesh</h3>
<p style="color:rgba(255,255,255,.7);margin:4px 0 0;font-size:.85em">
RAG-augmented Q&amp;A · routes to best available LLM node · local or peer · offline-first
</p>
</div>
""")
gr.Markdown("""### 💬 Ask the Mesh
Send a question to the **HearthNet capability bus**. The bus routes the request
to the best available LLM node — either on this device or on a peer.
**How it works:**
- **(none) corpus** → question goes directly to the LLM
- **Select a corpus** → RAG retrieval runs first; top chunks become system context
- **Model: auto** → bus picks highest-scoring available node (local first, then peer)
- **Model: name** → routes only to nodes that advertise that exact model
**Routing is transparent** — the trace below every response shows which node answered.
""")
with gr.Row():
corpus_selector = gr.Dropdown(
label="RAG Corpus (leave blank for direct LLM)",
choices=corpus_choices,
value=corpus_choices[0],
scale=3,
)
model_selector = gr.Dropdown(
label="Model (auto = bus picks best node)",
choices=["auto"],
value="auto",
scale=2,
)
refresh_corpora_btn = gr.Button("🔄 Refresh Corpora", size="sm", scale=1)
agent_toggle = gr.Checkbox(
label="🤖 Agent mode — the model plans and calls mesh tools "
"(search_corpus, list_marketplace, translate, route_expert, …) over several steps",
value=False,
)
chatbot = gr.Chatbot(
label="Conversation",
height=440,
show_label=True,
)
with gr.Row():
msg_input = gr.Textbox(
label="Your message",
placeholder="e.g. What is HearthNet? / How do I filter rainwater? / List my neighbours' capabilities.",
lines=2,
scale=8,
)
send_btn = gr.Button("Send", scale=1, variant="primary")
with gr.Row():
sources_out = gr.JSON(label="📚 RAG Sources", visible=False, scale=2)
# Routing trace: shown as a visual badge (HTML) for judge-friendly display.
route_out = gr.HTML(visible=False)
agent_out = gr.JSON(label="🧠 Agent Steps (Thought → Tool → Observation)", visible=False)
def refresh_corpora():
choices = ["(none)", *_get_corpora_sync(bus)]
return gr.update(choices=choices, value=choices[0])
async def handle_send(message: str, history: list, corpus: str, model: str, agent: bool):
if not message.strip():
return (
history,
"",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
)
history = history or []
history.append({"role": "user", "content": message})
if bus is None:
history.append(
{
"role": "assistant",
"content": "⚠️ Bus not connected — run as a real HearthNet node.",
}
)
return (
history,
"",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
)
params: dict = {}
if model and model != "auto":
params["model"] = model
# --- Agent mode: multi-step ReAct loop over real mesh tools --------
if agent:
from hearthnet.services.llm.tools import default_tool_set
async def _call_llm(msgs: list) -> str:
res = await bus.call(
"llm.chat",
(1, 0),
{"params": dict(params), "input": {"messages": msgs}},
)
if isinstance(res, dict) and "error" in res:
raise RuntimeError(res.get("message", res.get("error", "llm error")))
out = res.get("output", {}) if isinstance(res, dict) else {}
return (
out.get("message", {}).get("content")
or out.get("text")
or ""
)
executor = default_tool_set(bus)
loop_history = [
{"role": h["role"], "content": _msg_text(h["content"])}
for h in history[:-1]
]
try:
agent_result = await executor.run_react_loop(
message,
_call_llm,
history=loop_history,
max_iterations=6,
)
reply = agent_result["final"] or "(agent produced no final answer)"
history.append({"role": "assistant", "content": reply})
return (
history,
"",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=True, value=agent_result["steps"]),
)
except Exception as exc:
history.append({"role": "assistant", "content": f"❌ Agent error: {exc}"})
return (
history,
"",
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=True, value=[{"type": "error", "text": str(exc)}]),
)
trace: dict = {"rag": None, "llm": None, "routed_to": None}
try:
context = ""
sources: list = []
if corpus and corpus != "(none)":
try:
rag_result = await bus.call(
"rag.query",
(1, 0),
{
"params": {"corpus": corpus},
"input": {"query": message, "k": 3},
},
)
chunks = rag_result.get("output", {}).get("chunks", [])
routed_via_rag = rag_result.get("_routed_via", "local")
trace["rag"] = {
"capability": "rag.query",
"corpus": corpus,
"chunks_found": len(chunks),
"routed_via": routed_via_rag,
}
if chunks:
context = "\n\n".join(c["text"] for c in chunks[:3])
sources = [
{
"rank": c.get("rank", i),
"text": c["text"][:120],
"source": c.get("metadata", {}).get("doc_title", "unknown"),
}
for i, c in enumerate(chunks)
]
except Exception as rag_exc:
trace["rag"] = {"error": str(rag_exc)}
llm_messages: list = []
if context:
# Truncate RAG context to prevent prompt-injection via doc content (LLM01)
_safe_ctx = context[:4000].replace("\x00", "")
llm_messages.append({"role": "system", "content": f"Context:\n{_safe_ctx}"})
llm_messages.extend(
{"role": h["role"], "content": _msg_text(h["content"])} for h in history
)
result = await bus.call(
"llm.chat",
(1, 0),
{"params": params, "input": {"messages": llm_messages}},
)
# Surface errors clearly instead of showing "No response"
if "error" in result:
err_msg = result.get("message", result.get("error", "unknown error"))
reply = f"⚠️ LLM error: {err_msg}"
trace["llm"] = {"error": err_msg}
else:
reply = (
result.get("output", {}).get("message", {}).get("content")
or result.get("output", {}).get("text")
or "(empty response — model may still be loading)"
)
routed_via_llm = result.get("_routed_via", "local")
trace["llm"] = {
"capability": "llm.chat",
"model_requested": model if model != "auto" else "(any)",
"routed_via": routed_via_llm,
}
trace["routed_to"] = routed_via_llm
history.append({"role": "assistant", "content": reply})
return (
history,
"",
gr.update(visible=bool(sources), value=sources),
gr.update(visible=True, value=_route_badge_html(trace)),
gr.update(visible=False),
)
except Exception as exc:
history.append({"role": "assistant", "content": f"❌ Error: {exc}"})
trace["error"] = str(exc)
return (
history,
"",
gr.update(visible=False),
gr.update(visible=True, value=_route_badge_html(trace)),
gr.update(visible=False),
)
refresh_corpora_btn.click(refresh_corpora, outputs=corpus_selector)
send_btn.click(
handle_send,
inputs=[msg_input, chatbot, corpus_selector, model_selector, agent_toggle],
outputs=[chatbot, msg_input, sources_out, route_out, agent_out],
)
msg_input.submit(
handle_send,
inputs=[msg_input, chatbot, corpus_selector, model_selector, agent_toggle],
outputs=[chatbot, msg_input, sources_out, route_out, agent_out],
)