File size: 6,914 Bytes
4cd8837
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""M31 — Civil Defense (NRW Bevölkerungsschutz pilot, experimental Phase 3).

Bridges HearthNet with THW/DRK/Feuerwehr/KatS role structures.
Produces tamper-evident audit trails for incident coordination.
Gated by config.research.civil_defense = True.
"""
from __future__ import annotations

import hashlib
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any

# NRW role taxonomy
NRW_ROLES = {
    "thw_helferin": "THW Helferin/Helfer",
    "thw_gruppenfuehrer": "THW Gruppenführer",
    "drk_ersthelfer": "DRK Ersthelfer",
    "drk_sanitaeter": "DRK Sanitäter",
    "feuerwehr_angehoeriger": "Feuerwehr-Angehöriger",
    "feuerwehr_fuehrungskraft": "Feuerwehr-Führungskraft",
    "kats_koordinator": "KatS-Koordinator",
    "kats_leiterin": "KatS-Leiterin",
    "bevoelkerungsschutz_beauftragte": "Bevölkerungsschutzbeauftragte(r)",
}


@dataclass(frozen=True)
class AlertSeverity:
    INFORMATION = "information"
    WARNING     = "warning"
    ALERT       = "alert"
    EMERGENCY   = "emergency"


@dataclass(frozen=True)
class RoleCertificate:
    """A role certificate issued by an authority for a community member."""
    cert_id: str
    role_key: str               # key from NRW_ROLES
    role_label: str
    holder_node_id: str
    issuer_node_id: str
    community_id: str
    region: str = "NRW"
    issued_at: float = field(default_factory=time.time)
    expires_at: float | None = None
    issuer_signature: bytes = b""

    def is_expired(self, now: float | None = None) -> bool:
        if self.expires_at is None:
            return False
        return (now or time.time()) > self.expires_at

    def role_name(self) -> str:
        return NRW_ROLES.get(self.role_key, self.role_label)


@dataclass(frozen=True)
class Alert:
    """A civil-defense alert with full provenance."""
    alert_id: str
    severity: str                   # AlertSeverity constant
    title: str
    body: str
    area_description: str           # e.g. "Issum, Kreis Kleve, NRW"
    issuer_node_id: str
    issuer_role_cert_id: str | None
    community_id: str
    event_log_id: str | None = None # optional backlink to event log entry
    issued_at: float = field(default_factory=time.time)
    expires_at: float | None = None
    issuer_signature: bytes = b""


class AuditChain:
    """Tamper-evident append-only audit log for civil-defense operations.

    Each entry is a JSON-serialised dict with a backlink hash for chain integrity.
    For production use, entries should be stored in the event log (X02) with
    Ed25519 signatures to satisfy legal audit retention requirements.
    """

    def __init__(self) -> None:
        self._entries: list[dict] = []
        self._head_hash: str = "0" * 64

    def _hash_entry(self, entry: dict) -> str:
        serialised = json.dumps(entry, sort_keys=True, ensure_ascii=True)
        return hashlib.sha256(serialised.encode()).hexdigest()

    def append(self, entry_type: str, actor_node_id: str, payload: dict[str, Any]) -> str:
        entry = {
            "entry_id": str(uuid.uuid4()),
            "entry_type": entry_type,
            "actor": actor_node_id,
            "payload": payload,
            "timestamp": time.time(),
            "prev_hash": self._head_hash,
        }
        entry_hash = self._hash_entry(entry)
        entry["hash"] = entry_hash
        self._entries.append(entry)
        self._head_hash = entry_hash
        return entry_hash

    def verify_integrity(self) -> bool:
        """Walk the chain and verify all backlinks."""
        prev = "0" * 64
        for entry in self._entries:
            if entry.get("prev_hash") != prev:
                return False
            expected_hash = entry["hash"]
            entry_copy = {k: v for k, v in entry.items() if k != "hash"}
            if self._hash_entry(entry_copy) != expected_hash:
                return False
            prev = expected_hash
        return True

    def export(self) -> list[dict]:
        return list(self._entries)

    def length(self) -> int:
        return len(self._entries)


class CivilDefenseService:
    """Civil-defense pilot service for NRW.

    Registers capabilities:
      civdef.alert.issue@1.0   — publish a signed alert
      civdef.alert.list@1.0    — list active alerts
      civdef.cert.verify@1.0   — verify a role certificate
      civdef.audit.export@1.0  — export tamper-evident audit chain

    Only active when config.research.civil_defense = True.
    """

    def __init__(self, keypair=None, bus=None) -> None:
        self._keypair = keypair
        self._bus = bus
        self._alerts: dict[str, Alert] = {}
        self._certs: dict[str, RoleCertificate] = {}
        self._audit = AuditChain()

    def issue_alert(
        self,
        severity: str,
        title: str,
        body: str,
        area: str,
        role_cert_id: str | None = None,
        community_id: str = "",
        expires_in_hours: float | None = 24.0,
    ) -> Alert:
        node_id = getattr(self._keypair, "node_id_short", "unknown")
        alert = Alert(
            alert_id=str(uuid.uuid4()),
            severity=severity,
            title=title,
            body=body,
            area_description=area,
            issuer_node_id=node_id,
            issuer_role_cert_id=role_cert_id,
            community_id=community_id,
            expires_at=time.time() + expires_in_hours * 3600 if expires_in_hours else None,
        )
        self._alerts[alert.alert_id] = alert
        self._audit.append("alert.issued", node_id, {
            "alert_id": alert.alert_id,
            "severity": alert.severity,
            "title": alert.title,
        })
        return alert

    def list_active_alerts(self, now: float | None = None) -> list[Alert]:
        now = now or time.time()
        return [
            a for a in self._alerts.values()
            if a.expires_at is None or a.expires_at > now
        ]

    def register_cert(self, cert: RoleCertificate) -> None:
        self._certs[cert.cert_id] = cert
        self._audit.append("cert.registered", cert.issuer_node_id, {
            "cert_id": cert.cert_id, "role": cert.role_key, "holder": cert.holder_node_id
        })

    def verify_cert(self, cert_id: str) -> dict:
        cert = self._certs.get(cert_id)
        if cert is None:
            return {"valid": False, "reason": "cert_not_found"}
        if cert.is_expired():
            return {"valid": False, "reason": "cert_expired", "cert_id": cert_id}
        return {
            "valid": True,
            "role": cert.role_name(),
            "holder": cert.holder_node_id,
            "expires_at": cert.expires_at,
        }

    def export_audit(self) -> dict:
        return {
            "entries": self._audit.export(),
            "chain_valid": self._audit.verify_integrity(),
            "length": self._audit.length(),
        }