Spaces:
Running on Zero
Running on Zero
GitHub Actions
feat: Phase 2 (M14-M25, X05-X07) + Phase 3 experimental (M26-M31) + E2E tests + docs
4cd8837 | """M31 — Civil Defense (NRW Bevölkerungsschutz pilot, experimental Phase 3). | |
| Bridges HearthNet with THW/DRK/Feuerwehr/KatS role structures. | |
| Produces tamper-evident audit trails for incident coordination. | |
| Gated by config.research.civil_defense = True. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import time | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| # NRW role taxonomy | |
| NRW_ROLES = { | |
| "thw_helferin": "THW Helferin/Helfer", | |
| "thw_gruppenfuehrer": "THW Gruppenführer", | |
| "drk_ersthelfer": "DRK Ersthelfer", | |
| "drk_sanitaeter": "DRK Sanitäter", | |
| "feuerwehr_angehoeriger": "Feuerwehr-Angehöriger", | |
| "feuerwehr_fuehrungskraft": "Feuerwehr-Führungskraft", | |
| "kats_koordinator": "KatS-Koordinator", | |
| "kats_leiterin": "KatS-Leiterin", | |
| "bevoelkerungsschutz_beauftragte": "Bevölkerungsschutzbeauftragte(r)", | |
| } | |
| class AlertSeverity: | |
| INFORMATION = "information" | |
| WARNING = "warning" | |
| ALERT = "alert" | |
| EMERGENCY = "emergency" | |
| class RoleCertificate: | |
| """A role certificate issued by an authority for a community member.""" | |
| cert_id: str | |
| role_key: str # key from NRW_ROLES | |
| role_label: str | |
| holder_node_id: str | |
| issuer_node_id: str | |
| community_id: str | |
| region: str = "NRW" | |
| issued_at: float = field(default_factory=time.time) | |
| expires_at: float | None = None | |
| issuer_signature: bytes = b"" | |
| def is_expired(self, now: float | None = None) -> bool: | |
| if self.expires_at is None: | |
| return False | |
| return (now or time.time()) > self.expires_at | |
| def role_name(self) -> str: | |
| return NRW_ROLES.get(self.role_key, self.role_label) | |
| class Alert: | |
| """A civil-defense alert with full provenance.""" | |
| alert_id: str | |
| severity: str # AlertSeverity constant | |
| title: str | |
| body: str | |
| area_description: str # e.g. "Issum, Kreis Kleve, NRW" | |
| issuer_node_id: str | |
| issuer_role_cert_id: str | None | |
| community_id: str | |
| event_log_id: str | None = None # optional backlink to event log entry | |
| issued_at: float = field(default_factory=time.time) | |
| expires_at: float | None = None | |
| issuer_signature: bytes = b"" | |
| class AuditChain: | |
| """Tamper-evident append-only audit log for civil-defense operations. | |
| Each entry is a JSON-serialised dict with a backlink hash for chain integrity. | |
| For production use, entries should be stored in the event log (X02) with | |
| Ed25519 signatures to satisfy legal audit retention requirements. | |
| """ | |
| def __init__(self) -> None: | |
| self._entries: list[dict] = [] | |
| self._head_hash: str = "0" * 64 | |
| def _hash_entry(self, entry: dict) -> str: | |
| serialised = json.dumps(entry, sort_keys=True, ensure_ascii=True) | |
| return hashlib.sha256(serialised.encode()).hexdigest() | |
| def append(self, entry_type: str, actor_node_id: str, payload: dict[str, Any]) -> str: | |
| entry = { | |
| "entry_id": str(uuid.uuid4()), | |
| "entry_type": entry_type, | |
| "actor": actor_node_id, | |
| "payload": payload, | |
| "timestamp": time.time(), | |
| "prev_hash": self._head_hash, | |
| } | |
| entry_hash = self._hash_entry(entry) | |
| entry["hash"] = entry_hash | |
| self._entries.append(entry) | |
| self._head_hash = entry_hash | |
| return entry_hash | |
| def verify_integrity(self) -> bool: | |
| """Walk the chain and verify all backlinks.""" | |
| prev = "0" * 64 | |
| for entry in self._entries: | |
| if entry.get("prev_hash") != prev: | |
| return False | |
| expected_hash = entry["hash"] | |
| entry_copy = {k: v for k, v in entry.items() if k != "hash"} | |
| if self._hash_entry(entry_copy) != expected_hash: | |
| return False | |
| prev = expected_hash | |
| return True | |
| def export(self) -> list[dict]: | |
| return list(self._entries) | |
| def length(self) -> int: | |
| return len(self._entries) | |
| class CivilDefenseService: | |
| """Civil-defense pilot service for NRW. | |
| Registers capabilities: | |
| civdef.alert.issue@1.0 — publish a signed alert | |
| civdef.alert.list@1.0 — list active alerts | |
| civdef.cert.verify@1.0 — verify a role certificate | |
| civdef.audit.export@1.0 — export tamper-evident audit chain | |
| Only active when config.research.civil_defense = True. | |
| """ | |
| def __init__(self, keypair=None, bus=None) -> None: | |
| self._keypair = keypair | |
| self._bus = bus | |
| self._alerts: dict[str, Alert] = {} | |
| self._certs: dict[str, RoleCertificate] = {} | |
| self._audit = AuditChain() | |
| def issue_alert( | |
| self, | |
| severity: str, | |
| title: str, | |
| body: str, | |
| area: str, | |
| role_cert_id: str | None = None, | |
| community_id: str = "", | |
| expires_in_hours: float | None = 24.0, | |
| ) -> Alert: | |
| node_id = getattr(self._keypair, "node_id_short", "unknown") | |
| alert = Alert( | |
| alert_id=str(uuid.uuid4()), | |
| severity=severity, | |
| title=title, | |
| body=body, | |
| area_description=area, | |
| issuer_node_id=node_id, | |
| issuer_role_cert_id=role_cert_id, | |
| community_id=community_id, | |
| expires_at=time.time() + expires_in_hours * 3600 if expires_in_hours else None, | |
| ) | |
| self._alerts[alert.alert_id] = alert | |
| self._audit.append("alert.issued", node_id, { | |
| "alert_id": alert.alert_id, | |
| "severity": alert.severity, | |
| "title": alert.title, | |
| }) | |
| return alert | |
| def list_active_alerts(self, now: float | None = None) -> list[Alert]: | |
| now = now or time.time() | |
| return [ | |
| a for a in self._alerts.values() | |
| if a.expires_at is None or a.expires_at > now | |
| ] | |
| def register_cert(self, cert: RoleCertificate) -> None: | |
| self._certs[cert.cert_id] = cert | |
| self._audit.append("cert.registered", cert.issuer_node_id, { | |
| "cert_id": cert.cert_id, "role": cert.role_key, "holder": cert.holder_node_id | |
| }) | |
| def verify_cert(self, cert_id: str) -> dict: | |
| cert = self._certs.get(cert_id) | |
| if cert is None: | |
| return {"valid": False, "reason": "cert_not_found"} | |
| if cert.is_expired(): | |
| return {"valid": False, "reason": "cert_expired", "cert_id": cert_id} | |
| return { | |
| "valid": True, | |
| "role": cert.role_name(), | |
| "holder": cert.holder_node_id, | |
| "expires_at": cert.expires_at, | |
| } | |
| def export_audit(self) -> dict: | |
| return { | |
| "entries": self._audit.export(), | |
| "chain_valid": self._audit.verify_integrity(), | |
| "length": self._audit.length(), | |
| } | |