GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8
Raw
History Blame
10.4 kB
"""M31 β€” Civil Defense (NRW BevΓΆlkerungsschutz pilot, experimental Phase 3).
Bridges HearthNet with THW/DRK/Feuerwehr/KatS role structures.
Produces tamper-evident audit trails for incident coordination.
Gated by config.research.civil_defense = True.
"""
from __future__ import annotations
import hashlib
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
# NRW role taxonomy
NRW_ROLES = {
"thw_helferin": "THW Helferin/Helfer",
"thw_gruppenfuehrer": "THW GruppenfΓΌhrer",
"drk_ersthelfer": "DRK Ersthelfer",
"drk_sanitaeter": "DRK SanitΓ€ter",
"feuerwehr_angehoeriger": "Feuerwehr-AngehΓΆriger",
"feuerwehr_fuehrungskraft": "Feuerwehr-FΓΌhrungskraft",
"kats_koordinator": "KatS-Koordinator",
"kats_leiterin": "KatS-Leiterin",
"bevoelkerungsschutz_beauftragte": "BevΓΆlkerungsschutzbeauftragte(r)",
}
@dataclass(frozen=True)
class AlertSeverity:
INFORMATION = "information"
WARNING = "warning"
ALERT = "alert"
EMERGENCY = "emergency"
@dataclass(frozen=True)
class RoleCertificate:
"""A role certificate issued by an authority for a community member."""
cert_id: str
role_key: str # key from NRW_ROLES
role_label: str
holder_node_id: str
issuer_node_id: str
community_id: str
region: str = "NRW"
issued_at: float = field(default_factory=time.time)
expires_at: float | None = None
issuer_signature: bytes = b""
def is_expired(self, now: float | None = None) -> bool:
if self.expires_at is None:
return False
return (now or time.time()) > self.expires_at
def role_name(self) -> str:
return NRW_ROLES.get(self.role_key, self.role_label)
@dataclass(frozen=True)
class Alert:
"""A civil-defense alert with full provenance."""
alert_id: str
severity: str # AlertSeverity constant
title: str
body: str
area_description: str # e.g. "Issum, Kreis Kleve, NRW"
issuer_node_id: str
issuer_role_cert_id: str | None
community_id: str
event_log_id: str | None = None # optional backlink to event log entry
issued_at: float = field(default_factory=time.time)
expires_at: float | None = None
issuer_signature: bytes = b""
class AuditChain:
"""Tamper-evident append-only audit log for civil-defense operations.
Each entry is a JSON-serialised dict with a backlink hash for chain integrity.
For production use, entries should be stored in the event log (X02) with
Ed25519 signatures to satisfy legal audit retention requirements.
"""
def __init__(self) -> None:
self._entries: list[dict] = []
self._head_hash: str = "0" * 64
def _hash_entry(self, entry: dict) -> str:
serialised = json.dumps(entry, sort_keys=True, ensure_ascii=True)
return hashlib.sha256(serialised.encode()).hexdigest()
def append(self, entry_type: str, actor_node_id: str, payload: dict[str, Any]) -> str:
entry = {
"entry_id": str(uuid.uuid4()),
"entry_type": entry_type,
"actor": actor_node_id,
"payload": payload,
"timestamp": time.time(),
"prev_hash": self._head_hash,
}
entry_hash = self._hash_entry(entry)
entry["hash"] = entry_hash
self._entries.append(entry)
self._head_hash = entry_hash
return entry_hash
def verify_integrity(self) -> bool:
"""Walk the chain and verify all backlinks."""
prev = "0" * 64
for entry in self._entries:
if entry.get("prev_hash") != prev:
return False
expected_hash = entry["hash"]
entry_copy = {k: v for k, v in entry.items() if k != "hash"}
if self._hash_entry(entry_copy) != expected_hash:
return False
prev = expected_hash
return True
def export(self) -> list[dict]:
return list(self._entries)
def length(self) -> int:
return len(self._entries)
class CivilDefenseService:
"""Civil-defense pilot service for NRW.
Registers capabilities:
civdef.alert.issue@1.0 β€” publish a signed alert
civdef.alert.list@1.0 β€” list active alerts
civdef.cert.verify@1.0 β€” verify a role certificate
civdef.audit.export@1.0 β€” export tamper-evident audit chain
Only active when config.research.civil_defense = True.
"""
def __init__(self, keypair=None, bus=None) -> None:
self._keypair = keypair
self._bus = bus
self._alerts: dict[str, Alert] = {}
self._certs: dict[str, RoleCertificate] = {}
self._audit = AuditChain()
def issue_alert(
self,
severity: str,
title: str,
body: str,
area: str,
role_cert_id: str | None = None,
community_id: str = "",
expires_in_hours: float | None = 24.0,
) -> Alert:
node_id = getattr(self._keypair, "node_id_short", "unknown")
alert = Alert(
alert_id=str(uuid.uuid4()),
severity=severity,
title=title,
body=body,
area_description=area,
issuer_node_id=node_id,
issuer_role_cert_id=role_cert_id,
community_id=community_id,
expires_at=time.time() + expires_in_hours * 3600 if expires_in_hours else None,
)
self._alerts[alert.alert_id] = alert
self._audit.append(
"alert.issued",
node_id,
{
"alert_id": alert.alert_id,
"severity": alert.severity,
"title": alert.title,
},
)
return alert
def list_active_alerts(self, now: float | None = None) -> list[Alert]:
now = now or time.time()
return [a for a in self._alerts.values() if a.expires_at is None or a.expires_at > now]
def register_cert(self, cert: RoleCertificate) -> None:
self._certs[cert.cert_id] = cert
self._audit.append(
"cert.registered",
cert.issuer_node_id,
{"cert_id": cert.cert_id, "role": cert.role_key, "holder": cert.holder_node_id},
)
def verify_cert(self, cert_id: str) -> dict:
cert = self._certs.get(cert_id)
if cert is None:
return {"valid": False, "reason": "cert_not_found"}
if cert.is_expired():
return {"valid": False, "reason": "cert_expired", "cert_id": cert_id}
return {
"valid": True,
"role": cert.role_name(),
"holder": cert.holder_node_id,
"expires_at": cert.expires_at,
}
def export_audit(self) -> dict:
return {
"entries": self._audit.export(),
"chain_valid": self._audit.verify_integrity(),
"length": self._audit.length(),
}
# ── Capability-bus adapter (registered only under research=True) ────────
name = "civdef"
version = "1.0"
def capabilities(self) -> list[tuple]:
from hearthnet.bus.capability import CapabilityDescriptor
return [
(
CapabilityDescriptor(
name="civdef.alert.issue",
version=(1, 0),
stability="experimental",
trust_required="trusted",
),
self.handle_issue,
None,
),
(
CapabilityDescriptor(
name="civdef.alert.list",
version=(1, 0),
stability="experimental",
idempotent=True,
),
self.handle_list,
None,
),
(
CapabilityDescriptor(
name="civdef.cert.verify",
version=(1, 0),
stability="experimental",
idempotent=True,
),
self.handle_verify,
None,
),
(
CapabilityDescriptor(
name="civdef.audit.export",
version=(1, 0),
stability="experimental",
idempotent=True,
),
self.handle_audit,
None,
),
]
def register(self, bus: Any) -> None:
self._bus = bus
for cap, handler, predicate in self.capabilities():
bus.register_capability(cap, handler, predicate)
@staticmethod
def _alert_to_dict(alert: Alert) -> dict[str, Any]:
return {
"alert_id": alert.alert_id,
"severity": alert.severity,
"title": alert.title,
"body": alert.body,
"area": alert.area_description,
"issuer_node_id": alert.issuer_node_id,
"community_id": alert.community_id,
"issued_at": alert.issued_at,
"expires_at": alert.expires_at,
}
async def handle_issue(self, req: Any) -> dict:
inp = req.body.get("input", {})
title = str(inp.get("title", ""))
body = str(inp.get("body", ""))
area = str(inp.get("area", ""))
if not title or not area:
return {"error": "bad_request", "message": "title and area are required"}
alert = self.issue_alert(
severity=str(inp.get("severity", AlertSeverity.WARNING)),
title=title,
body=body,
area=area,
role_cert_id=inp.get("role_cert_id"),
community_id=str(inp.get("community_id", "")),
expires_in_hours=inp.get("expires_in_hours", 24.0),
)
return {"output": {"alert": self._alert_to_dict(alert)}, "meta": {}}
async def handle_list(self, req: Any) -> dict:
alerts = [self._alert_to_dict(a) for a in self.list_active_alerts()]
return {"output": {"alerts": alerts}, "meta": {"count": len(alerts)}}
async def handle_verify(self, req: Any) -> dict:
cert_id = str(req.body.get("input", {}).get("cert_id", ""))
return {"output": self.verify_cert(cert_id), "meta": {}}
async def handle_audit(self, req: Any) -> dict:
return {"output": self.export_audit(), "meta": {}}