| """ |
| Phase 3 β Clinical Logic & Verification Engine |
| =============================================== |
| |
| Implements deterministic clinical metric extraction from segmentation masks. |
| No ML here β pure geometry and ophthalmology math. |
| |
| Mask convention (from Phase 2 training): |
| 0 = Background |
| 1 = Optic Disc |
| 2 = Optic Cup |
| """ |
|
|
| import numpy as np |
| import cv2 |
| from dataclasses import dataclass, field |
| from typing import Tuple, Dict, Optional |
| from enum import Enum |
|
|
|
|
| class RiskLevel(Enum): |
| HEALTHY = "Healthy" |
| SUSPECT = "Glaucoma Suspect" |
| HIGH = "High Risk" |
|
|
|
|
| class SanityError(Exception): |
| """Raised when mask geometry violates anatomical constraints.""" |
| pass |
|
|
|
|
| @dataclass |
| class ISNTResult: |
| inferior: float = 0.0 |
| superior: float = 0.0 |
| nasal: float = 0.0 |
| temporal: float = 0.0 |
| rule_satisfied: bool = False |
|
|
| def to_dict(self) -> Dict[str, float]: |
| return { |
| 'inferior': round(self.inferior, 4), |
| 'superior': round(self.superior, 4), |
| 'nasal': round(self.nasal, 4), |
| 'temporal': round(self.temporal, 4), |
| 'rule_satisfied': self.rule_satisfied, |
| } |
|
|
|
|
| @dataclass |
| class ClinicalResult: |
| vcdr: float = 0.0 |
| isnt: ISNTResult = field(default_factory=ISNTResult) |
| disc_area_px: int = 0 |
| cup_area_px: int = 0 |
| disc_center: Tuple[int, int] = (0, 0) |
| cup_center: Tuple[int, int] = (0, 0) |
| uncertainty: float = 0.0 |
| high_uncertainty: bool = False |
| risk_level: RiskLevel = RiskLevel.HEALTHY |
| sanity_passed: bool = False |
| warnings: list = field(default_factory=list) |
|
|
| def to_dict(self) -> dict: |
| return { |
| 'vcdr': round(self.vcdr, 4), |
| 'isnt': self.isnt.to_dict(), |
| 'disc_area_px': self.disc_area_px, |
| 'cup_area_px': self.cup_area_px, |
| 'disc_center': self.disc_center, |
| 'cup_center': self.cup_center, |
| 'uncertainty': round(self.uncertainty, 6), |
| 'high_uncertainty': self.high_uncertainty, |
| 'risk_level': self.risk_level.value, |
| 'sanity_passed': self.sanity_passed, |
| 'warnings': self.warnings, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_sanity_checks(disc_mask: np.ndarray, cup_mask: np.ndarray) -> None: |
| """ |
| Enforce anatomical constraints. Raises SanityError on any violation. |
| |
| Checks: |
| 1. Masks are binary (0/1 values only). |
| 2. Disc region is non-empty. |
| 3. Cup is 100 % contained inside the Disc. |
| 4. Neither mask has disconnected 'islands' outside the main region. |
| """ |
| |
| for name, mask in [('disc', disc_mask), ('cup', cup_mask)]: |
| unique_vals = np.unique(mask) |
| if not set(unique_vals).issubset({0, 1}): |
| raise SanityError(f"{name} mask contains non-binary values: {unique_vals}") |
|
|
| |
| disc_area = int(disc_mask.sum()) |
| if disc_area == 0: |
| raise SanityError("Optic Disc mask is empty β segmentation failure.") |
|
|
| |
| cup_outside_disc = np.logical_and(cup_mask == 1, disc_mask == 0) |
| if cup_outside_disc.any(): |
| n_violation = int(cup_outside_disc.sum()) |
| raise SanityError( |
| f"Cup extends outside Disc boundary ({n_violation} pixels). " |
| "Anatomically impossible β reject segmentation." |
| ) |
|
|
| |
| for name, mask in [('disc', disc_mask), ('cup', cup_mask)]: |
| if mask.sum() == 0: |
| continue |
| n_labels, _ = cv2.connectedComponents(mask.astype(np.uint8)) |
| if n_labels > 2: |
| raise SanityError( |
| f"{name} mask has {n_labels - 1} disconnected regions. " |
| "Expected a single contiguous structure." |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def calculate_vcdr( |
| disc_mask: np.ndarray, |
| cup_mask: np.ndarray |
| ) -> Tuple[float, dict]: |
| """ |
| Calculate vertical Cup-to-Disc Ratio using vertical extrema method. |
| |
| Clinical basis: |
| Horizontal disc expansion is less indicative of early glaucoma. |
| The vertical ratio (vCDR) is the primary screening metric. |
| |
| Returns: |
| vcdr (float): ratio in [0, 1] |
| details (dict): raw pixel measurements |
| """ |
| disc_rows = np.where(disc_mask.any(axis=1))[0] |
| cup_rows = np.where(cup_mask.any(axis=1))[0] |
|
|
| if disc_rows.size == 0: |
| return 0.0, {} |
|
|
| disc_v_diam = int(disc_rows.max() - disc_rows.min() + 1) |
| cup_v_diam = int(cup_rows.max() - cup_rows.min() + 1) if cup_rows.size > 0 else 0 |
|
|
| vcdr = cup_v_diam / disc_v_diam if disc_v_diam > 0 else 0.0 |
|
|
| details = { |
| 'disc_top_px': int(disc_rows.min()), |
| 'disc_bottom_px': int(disc_rows.max()), |
| 'disc_v_diam_px': disc_v_diam, |
| 'cup_top_px': int(cup_rows.min()) if cup_rows.size > 0 else None, |
| 'cup_bottom_px': int(cup_rows.max()) if cup_rows.size > 0 else None, |
| 'cup_v_diam_px': cup_v_diam, |
| } |
| return round(vcdr, 4), details |
|
|
|
|
| |
| |
| |
|
|
| def _disc_centroid(disc_mask: np.ndarray) -> Tuple[int, int]: |
| M = cv2.moments(disc_mask.astype(np.uint8)) |
| if M['m00'] == 0: |
| h, w = disc_mask.shape |
| return h // 2, w // 2 |
| cy = int(M['m01'] / M['m00']) |
| cx = int(M['m10'] / M['m00']) |
| return cy, cx |
|
|
|
|
| def _rim_thickness_in_quadrant( |
| quadrant_mask: np.ndarray, |
| disc_mask: np.ndarray, |
| cup_mask: np.ndarray |
| ) -> float: |
| """ |
| Mean Euclidean distance from cup boundary to disc boundary, |
| measured inside a specific quadrant. |
| |
| Uses distance transform on the inverse disc mask so that each |
| pixel inside the disc gets its distance to the disc edge. |
| Then we sample only rim pixels (disc=1, cup=0) in this quadrant. |
| """ |
| rim = np.logical_and(disc_mask == 1, cup_mask == 0).astype(np.uint8) |
| rim_in_quad = np.logical_and(rim, quadrant_mask).astype(np.uint8) |
|
|
| if rim_in_quad.sum() == 0: |
| return 0.0 |
|
|
| |
| dist_to_disc_edge = cv2.distanceTransform( |
| disc_mask.astype(np.uint8), cv2.DIST_L2, 5 |
| ) |
|
|
| thicknesses = dist_to_disc_edge[rim_in_quad == 1] |
| return float(np.mean(thicknesses)) |
|
|
|
|
| def calculate_isnt( |
| disc_mask: np.ndarray, |
| cup_mask: np.ndarray |
| ) -> ISNTResult: |
| """ |
| Calculate neuro-retinal rim thickness in the four ISNT quadrants. |
| |
| Quadrant definition (image coordinates): |
| Superior β top half (rows < cy) |
| Inferior β bottom half (rows >= cy) |
| Nasal β right half (cols >= cx) [standard right eye convention] |
| Temporal β left half (cols < cx) |
| |
| ISNT rule is satisfied when: |
| Inferior > Superior > Nasal > Temporal |
| Violation is a known early indicator of glaucomatous damage. |
| """ |
| h, w = disc_mask.shape |
| cy, cx = _disc_centroid(disc_mask) |
|
|
| superior_q = np.zeros((h, w), dtype=bool) |
| inferior_q = np.zeros((h, w), dtype=bool) |
| nasal_q = np.zeros((h, w), dtype=bool) |
| temporal_q = np.zeros((h, w), dtype=bool) |
|
|
| superior_q[:cy, :] = True |
| inferior_q[cy:, :] = True |
| nasal_q[:, cx:] = True |
| temporal_q[:, :cx] = True |
|
|
| I = _rim_thickness_in_quadrant(inferior_q, disc_mask, cup_mask) |
| S = _rim_thickness_in_quadrant(superior_q, disc_mask, cup_mask) |
| N = _rim_thickness_in_quadrant(nasal_q, disc_mask, cup_mask) |
| T = _rim_thickness_in_quadrant(temporal_q, disc_mask, cup_mask) |
|
|
| rule_ok = (I > S > N > T) |
|
|
| return ISNTResult(inferior=I, superior=S, nasal=N, temporal=T, |
| rule_satisfied=rule_ok) |
|
|
|
|
| |
| |
| |
|
|
| def classify_risk( |
| vcdr: float, |
| isnt: ISNTResult, |
| uncertainty: float, |
| uncertainty_threshold: float = 0.05 |
| ) -> Tuple[RiskLevel, list]: |
| """ |
| Rule-based risk stratification. |
| |
| Thresholds derived from clinical literature: |
| vCDR < 0.65 β Healthy |
| vCDR 0.65β0.80 + ISNT violation β Suspect |
| vCDR > 0.80 β High Risk |
| High uncertainty overrides to Suspect. |
| """ |
| warnings = [] |
|
|
| if uncertainty > uncertainty_threshold: |
| warnings.append( |
| f"High model uncertainty ({uncertainty:.4f}) β result may be unreliable." |
| ) |
| return RiskLevel.SUSPECT, warnings |
|
|
| if vcdr > 0.80: |
| risk = RiskLevel.HIGH |
| warnings.append(f"vCDR {vcdr:.2f} exceeds 0.80 β urgent referral recommended.") |
| elif vcdr > 0.65: |
| risk = RiskLevel.SUSPECT |
| warnings.append(f"vCDR {vcdr:.2f} in borderline range (0.65β0.80).") |
| else: |
| risk = RiskLevel.HEALTHY |
|
|
| if not isnt.rule_satisfied: |
| warnings.append( |
| "ISNT rule violated β neuro-retinal rim thinning detected." |
| ) |
| if risk == RiskLevel.HEALTHY: |
| risk = RiskLevel.SUSPECT |
|
|
| return risk, warnings |
|
|
|
|
| |
| |
| |
|
|
| def run_clinical_pipeline( |
| disc_mask: np.ndarray, |
| cup_mask: np.ndarray, |
| uncertainty: float = 0.0, |
| uncertainty_threshold: float = 0.05 |
| ) -> ClinicalResult: |
| """ |
| Execute complete Phase 3 pipeline on binary masks. |
| |
| Args: |
| disc_mask: uint8 binary array (1 = disc, 0 = background) |
| cup_mask: uint8 binary array (1 = cup, 0 = background) |
| uncertainty: scalar from Phase 2 MC-Dropout |
| uncertainty_threshold: flag above this value as high uncertainty |
| |
| Returns: |
| ClinicalResult dataclass |
| """ |
| result = ClinicalResult() |
| result.uncertainty = float(uncertainty) |
| result.high_uncertainty = uncertainty > uncertainty_threshold |
|
|
| disc_mask = (disc_mask > 0).astype(np.uint8) |
| cup_mask = (cup_mask > 0).astype(np.uint8) |
|
|
| |
| try: |
| run_sanity_checks(disc_mask, cup_mask) |
| result.sanity_passed = True |
| except SanityError as e: |
| result.warnings.append(f"SANITY FAIL: {e}") |
| result.sanity_passed = False |
| if disc_mask.sum() == 0: |
| result.risk_level = RiskLevel.SUSPECT |
| return result |
|
|
| |
| result.disc_area_px = int(disc_mask.sum()) |
| result.cup_area_px = int(cup_mask.sum()) |
|
|
| dy, dx = _disc_centroid(disc_mask) |
| result.disc_center = (int(dx), int(dy)) |
|
|
| if cup_mask.sum() > 0: |
| M = cv2.moments(cup_mask) |
| if M['m00'] > 0: |
| result.cup_center = ( |
| int(M['m10'] / M['m00']), |
| int(M['m01'] / M['m00']) |
| ) |
|
|
| |
| result.vcdr, _ = calculate_vcdr(disc_mask, cup_mask) |
|
|
| |
| result.isnt = calculate_isnt(disc_mask, cup_mask) |
|
|
| |
| result.risk_level, warnings = classify_risk( |
| result.vcdr, result.isnt, uncertainty, uncertainty_threshold |
| ) |
| result.warnings.extend(warnings) |
|
|
| return result |
|
|