GitHub Actions
feat: Phase 2 (M14-M25, X05-X07) + Phase 3 experimental (M26-M31) + E2E tests + docs
4cd8837
Raw
History Blame
6.91 kB
"""M31 — Civil Defense (NRW Bevölkerungsschutz pilot, experimental Phase 3).
Bridges HearthNet with THW/DRK/Feuerwehr/KatS role structures.
Produces tamper-evident audit trails for incident coordination.
Gated by config.research.civil_defense = True.
"""
from __future__ import annotations
import hashlib
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
# NRW role taxonomy
NRW_ROLES = {
"thw_helferin": "THW Helferin/Helfer",
"thw_gruppenfuehrer": "THW Gruppenführer",
"drk_ersthelfer": "DRK Ersthelfer",
"drk_sanitaeter": "DRK Sanitäter",
"feuerwehr_angehoeriger": "Feuerwehr-Angehöriger",
"feuerwehr_fuehrungskraft": "Feuerwehr-Führungskraft",
"kats_koordinator": "KatS-Koordinator",
"kats_leiterin": "KatS-Leiterin",
"bevoelkerungsschutz_beauftragte": "Bevölkerungsschutzbeauftragte(r)",
}
@dataclass(frozen=True)
class AlertSeverity:
INFORMATION = "information"
WARNING = "warning"
ALERT = "alert"
EMERGENCY = "emergency"
@dataclass(frozen=True)
class RoleCertificate:
"""A role certificate issued by an authority for a community member."""
cert_id: str
role_key: str # key from NRW_ROLES
role_label: str
holder_node_id: str
issuer_node_id: str
community_id: str
region: str = "NRW"
issued_at: float = field(default_factory=time.time)
expires_at: float | None = None
issuer_signature: bytes = b""
def is_expired(self, now: float | None = None) -> bool:
if self.expires_at is None:
return False
return (now or time.time()) > self.expires_at
def role_name(self) -> str:
return NRW_ROLES.get(self.role_key, self.role_label)
@dataclass(frozen=True)
class Alert:
"""A civil-defense alert with full provenance."""
alert_id: str
severity: str # AlertSeverity constant
title: str
body: str
area_description: str # e.g. "Issum, Kreis Kleve, NRW"
issuer_node_id: str
issuer_role_cert_id: str | None
community_id: str
event_log_id: str | None = None # optional backlink to event log entry
issued_at: float = field(default_factory=time.time)
expires_at: float | None = None
issuer_signature: bytes = b""
class AuditChain:
"""Tamper-evident append-only audit log for civil-defense operations.
Each entry is a JSON-serialised dict with a backlink hash for chain integrity.
For production use, entries should be stored in the event log (X02) with
Ed25519 signatures to satisfy legal audit retention requirements.
"""
def __init__(self) -> None:
self._entries: list[dict] = []
self._head_hash: str = "0" * 64
def _hash_entry(self, entry: dict) -> str:
serialised = json.dumps(entry, sort_keys=True, ensure_ascii=True)
return hashlib.sha256(serialised.encode()).hexdigest()
def append(self, entry_type: str, actor_node_id: str, payload: dict[str, Any]) -> str:
entry = {
"entry_id": str(uuid.uuid4()),
"entry_type": entry_type,
"actor": actor_node_id,
"payload": payload,
"timestamp": time.time(),
"prev_hash": self._head_hash,
}
entry_hash = self._hash_entry(entry)
entry["hash"] = entry_hash
self._entries.append(entry)
self._head_hash = entry_hash
return entry_hash
def verify_integrity(self) -> bool:
"""Walk the chain and verify all backlinks."""
prev = "0" * 64
for entry in self._entries:
if entry.get("prev_hash") != prev:
return False
expected_hash = entry["hash"]
entry_copy = {k: v for k, v in entry.items() if k != "hash"}
if self._hash_entry(entry_copy) != expected_hash:
return False
prev = expected_hash
return True
def export(self) -> list[dict]:
return list(self._entries)
def length(self) -> int:
return len(self._entries)
class CivilDefenseService:
"""Civil-defense pilot service for NRW.
Registers capabilities:
civdef.alert.issue@1.0 — publish a signed alert
civdef.alert.list@1.0 — list active alerts
civdef.cert.verify@1.0 — verify a role certificate
civdef.audit.export@1.0 — export tamper-evident audit chain
Only active when config.research.civil_defense = True.
"""
def __init__(self, keypair=None, bus=None) -> None:
self._keypair = keypair
self._bus = bus
self._alerts: dict[str, Alert] = {}
self._certs: dict[str, RoleCertificate] = {}
self._audit = AuditChain()
def issue_alert(
self,
severity: str,
title: str,
body: str,
area: str,
role_cert_id: str | None = None,
community_id: str = "",
expires_in_hours: float | None = 24.0,
) -> Alert:
node_id = getattr(self._keypair, "node_id_short", "unknown")
alert = Alert(
alert_id=str(uuid.uuid4()),
severity=severity,
title=title,
body=body,
area_description=area,
issuer_node_id=node_id,
issuer_role_cert_id=role_cert_id,
community_id=community_id,
expires_at=time.time() + expires_in_hours * 3600 if expires_in_hours else None,
)
self._alerts[alert.alert_id] = alert
self._audit.append("alert.issued", node_id, {
"alert_id": alert.alert_id,
"severity": alert.severity,
"title": alert.title,
})
return alert
def list_active_alerts(self, now: float | None = None) -> list[Alert]:
now = now or time.time()
return [
a for a in self._alerts.values()
if a.expires_at is None or a.expires_at > now
]
def register_cert(self, cert: RoleCertificate) -> None:
self._certs[cert.cert_id] = cert
self._audit.append("cert.registered", cert.issuer_node_id, {
"cert_id": cert.cert_id, "role": cert.role_key, "holder": cert.holder_node_id
})
def verify_cert(self, cert_id: str) -> dict:
cert = self._certs.get(cert_id)
if cert is None:
return {"valid": False, "reason": "cert_not_found"}
if cert.is_expired():
return {"valid": False, "reason": "cert_expired", "cert_id": cert_id}
return {
"valid": True,
"role": cert.role_name(),
"holder": cert.holder_node_id,
"expires_at": cert.expires_at,
}
def export_audit(self) -> dict:
return {
"entries": self._audit.export(),
"chain_valid": self._audit.verify_integrity(),
"length": self._audit.length(),
}