"""Mesh Network tab โ live topology from the capability bus registry.
Shows all peers this node has discovered, their capabilities, and an SVG
topology graph. Data is sourced exclusively from bus.registry.all_remote()
and bus.registry.all_local() โ no hardcoded or simulated nodes.
Spec: docs/M02-discovery.md, docs/M03-bus.md ยง4 (registry)
"""
from __future__ import annotations
import html as html_lib
import math
def _topology_svg(this_node: str, peers: list[dict]) -> str:
"""Build an SVG graph from live registry data. No fake data."""
all_nodes = [{"id": this_node[:24], "role": "this node", "is_self": True}] + [
{
"id": p["node_id"][:24],
"role": f"{p['capability_count']} caps",
"is_self": False,
}
for p in peers
]
if len(all_nodes) == 1:
return (
"
"
"
No peers discovered yet.
"
"
Start a second HearthNet node and run net.mesh_discover(),"
" or enable mDNS/UDP discovery.
"
"
See docs/HOWTO.md ยง3 for step-by-step instructions.
"
"
"
)
n = len(all_nodes)
cx, cy, r_orbit = 250, 220, 150
items: list[tuple[float, float, dict]] = []
for i, node in enumerate(all_nodes):
angle = (i / n) * math.tau - math.pi / 2
x = cx + r_orbit * math.cos(angle)
y = cy + r_orbit * math.sin(angle)
items.append((x, y, node))
lines: list[str] = []
circles: list[str] = []
labels: list[str] = []
# Lines from this node to each peer
self_x, self_y = items[0][0], items[0][1]
for x, y, _ in items[1:]:
lines.append(
f''
)
for x, y, node in items:
fill = "#4CAF50" if node["is_self"] else "#2196F3"
circles.append(f'')
labels.append(
f''
f"{html_lib.escape(node['id'])}"
)
labels.append(
f'{html_lib.escape(node["role"])}'
)
return (
'"
'
'
"๐ข this node | ๐ต peers | "
"dashed lines = active capability-bus connections
"
)
def build_mesh_tab(bus=None, node=None):
import gradio as gr
with gr.Column():
gr.HTML("""
""")
gr.Markdown("""### ๐ Mesh Network
Live view of every node this HearthNet instance has discovered.
Each entry is a real peer registered in the capability bus โ no simulated data.
**How peers appear here:**
1. Run a second HearthNet node on the same LAN โ or join the internet relay below
2. Both nodes auto-discover each other via mDNS/UDP (M02) or the relay hub
3. Each node advertises its capabilities on the bus (M03)
4. Click **Refresh** to pull the current registry snapshot
""")
with gr.Accordion("๐ Join Internet Relay (NAT mesh)", open=True):
gr.Markdown(
"Connect this node to a relay hub so it meshes with nodes over the internet. "
"Use `hf` to join the public HuggingFace Space relay."
)
with gr.Row():
relay_input = gr.Textbox(
value="https://build-small-hackathon-hearthnet.hf.space",
label="Relay URL or 'hf'",
scale=4,
)
join_btn = gr.Button("Join Relay", variant="primary", scale=1)
relay_status = gr.Markdown("", visible=False)
async def do_join(relay_url: str):
if node is None:
return gr.update(value="โ ๏ธ Node not available.", visible=True)
url = relay_url.strip()
if url in ("hf", "space"):
url = "https://build-small-hackathon-hearthnet.hf.space"
if not url.startswith(("http://", "https://")):
return gr.update(value=f"โ ๏ธ Invalid URL: `{url}`", visible=True)
try:
result = await node.join_relay(url)
count = len(result.get("roster", []))
return gr.update(
value=f"โ Joined relay! **{count}** other member(s) in the mesh.",
visible=True,
)
except Exception as exc:
return gr.update(value=f"โ Join failed: {html_lib.escape(str(exc))}", visible=True)
join_btn.click(do_join, inputs=[relay_input], outputs=[relay_status])
with gr.Row():
refresh_btn = gr.Button("๐ Refresh Mesh", variant="primary", scale=2)
mesh_html = gr.HTML(
value="
Click Refresh to load live mesh topology.
"
)
with gr.Row():
stats_out = gr.JSON(label="Mesh Statistics", visible=False, scale=2)
caps_out = gr.JSON(label="Capability Matrix", visible=False, scale=3)
async def get_mesh():
if bus is None:
svg = (
"
"
"Bus not connected. Run as a real HearthNet node to see live mesh topology."
"
"
)
return svg, gr.update(visible=False), gr.update(visible=False)
try:
remote_entries = list(bus.registry.all_remote())
local_entries = list(bus.registry.all_local())
peer_caps: dict[str, list[str]] = {}
for e in remote_entries:
nid = e.node_id
peer_caps.setdefault(nid, []).append(
f"{e.descriptor.name}@{e.descriptor.version[0]}.{e.descriptor.version[1]}"
)
peers = [
{
"node_id": nid,
"capabilities": caps,
"capability_count": len(caps),
}
for nid, caps in peer_caps.items()
]
this_node = getattr(bus, "node_id_full", "this-node")
local_caps = [
f"{e.descriptor.name}@{e.descriptor.version[0]}.{e.descriptor.version[1]}"
for e in local_entries
]
svg = _topology_svg(this_node, peers)
stats = {
"this_node": this_node,
"peer_count": len(peers),
"local_capabilities": len(local_caps),
"total_mesh_capabilities": len(local_caps)
+ sum(p["capability_count"] for p in peers),
}
# Capability matrix: which node has what
all_cap_names: set[str] = set(local_caps)
for p in peers:
all_cap_names.update(p["capabilities"])
matrix = {
"this_node": {c: (c in local_caps) for c in sorted(all_cap_names)},
}
for p in peers:
matrix[p["node_id"][:20]] = {
c: (c in p["capabilities"]) for c in sorted(all_cap_names)
}
return (
svg,
gr.update(visible=True, value=stats),
gr.update(visible=True, value=matrix),
)
except Exception as exc:
err = f"
Error loading mesh: {html_lib.escape(str(exc))}
"
return err, gr.update(visible=False), gr.update(visible=False)
refresh_btn.click(get_mesh, outputs=[mesh_html, stats_out, caps_out])
# Auto-refresh every 10 s so peer joins appear without a manual click.
# gr.Timer fires `tick` on an interval; active=True starts it immediately.
try:
auto_timer = gr.Timer(value=10, active=True)
auto_timer.tick(fn=get_mesh, outputs=[mesh_html, stats_out, caps_out])
except AttributeError:
# Gradio < 4.x doesn't have gr.Timer โ manual refresh still works.
pass