File size: 12,476 Bytes
9df97a2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 | """Phase 3: Bias Detection System - Monitor fairness in recruiter decisions."""
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import re
@dataclass
class BiasAlert:
"""Alert for potential bias detected."""
alert_type: str # "acceptance_rate_disparity", "score_gap", "pattern_anomaly"
severity: str # "low" | "medium" | "high"
message: str
affected_group: str # e.g., "junior_developers", "candidates_from_east_africa"
detected_at: datetime
recommendation: str
class BiasDetector:
"""
Monitor hiring patterns for potential bias:
- Acceptance rate disparities by inferred demographics
- Score distribution anomalies
- Pattern-based red flags
"""
def __init__(self, db=None):
"""Initialize detector."""
self.db = db
self.alerts: List[BiasAlert] = []
def analyze_recruiter_decisions(
self,
feedback_records: List[Dict],
min_samples: int = 30,
) -> Dict:
"""
Analyze recruiter decisions for bias indicators.
Args:
feedback_records: List of recruiter feedback dictionaries
min_samples: Minimum samples per group for analysis
Returns:
Report with detected biases and recommendations
"""
if not feedback_records or len(feedback_records) < min_samples:
return {"status": "insufficient_data", "sample_count": len(feedback_records)}
report = {
"analysis_date": datetime.utcnow().isoformat(),
"total_records": len(feedback_records),
"alerts": [],
"disparities": {},
"recommendations": [],
}
# Check acceptance rate disparities
disparities = self._check_acceptance_disparities(feedback_records)
if disparities:
report["disparities"] = disparities
report["alerts"].extend([d["alert"] for d in disparities.values()])
# Check score distribution anomalies
score_check = self._check_score_anomalies(feedback_records)
if score_check:
report["alerts"].extend(score_check)
# Check for pattern anomalies
patterns = self._check_pattern_anomalies(feedback_records)
if patterns:
report["alerts"].extend(patterns)
# Generate recommendations
if report["alerts"]:
report["recommendations"] = self._generate_recommendations(report["alerts"])
return report
def _check_acceptance_disparities(self, records: List[Dict]) -> Dict:
"""Check for acceptance rate disparities (the main bias concern)."""
disparities = {}
# Group by inferred characteristics from candidate names/emails
groups = self._group_by_demographics(records)
if len(groups) < 2:
return {}
# Calculate acceptance rates per group
rates = {}
for group_name, candidates in groups.items():
if len(candidates) < 10: # Skip groups with too few samples
continue
accepted = sum(1 for c in candidates if c.get("recruiter_decision") == "accepted")
rate = accepted / len(candidates)
rates[group_name] = {
"rate": rate,
"count": len(candidates),
"accepted": accepted,
}
# Find disparities (highest and lowest acceptance rates)
if len(rates) >= 2:
sorted_rates = sorted(rates.items(), key=lambda x: x[1]["rate"])
lowest = sorted_rates[0]
highest = sorted_rates[-1]
disparity_ratio = (
highest[1]["rate"] / lowest[1]["rate"]
if lowest[1]["rate"] > 0
else float("inf")
)
# Flag if disparity > 1.25 (25% difference)
if disparity_ratio > 1.25:
alert_msg = (
f"Acceptance rate disparity detected: {highest[0]} "
f"{highest[1]['rate']:.1%} vs {lowest[0]} {lowest[1]['rate']:.1%}"
)
disparities[f"{lowest[0]}_vs_{highest[0]}"] = {
"alert": BiasAlert(
alert_type="acceptance_rate_disparity",
severity="high" if disparity_ratio > 1.5 else "medium",
message=alert_msg,
affected_group=lowest[0],
detected_at=datetime.utcnow(),
recommendation=(
f"Review scoring/decisions for {lowest[0]}. "
f"Conduct blind review process."
),
),
"disparity_ratio": disparity_ratio,
"rates": rates,
}
return disparities
def _check_score_anomalies(self, records: List[Dict]) -> List[BiasAlert]:
"""Check if score distributions are anomalous."""
alerts = []
# Check if same scores are given despite different candidate profiles
score_consistency = {}
for record in records:
score = round(record.get("model_predicted_score", 0))
if score not in score_consistency:
score_consistency[score] = []
score_consistency[score].append(record)
# Find score buckets with >80% same decision when scores vary
for score, recs in score_consistency.items():
if len(recs) < 5:
continue
decisions = [r.get("recruiter_decision") for r in recs]
accepted_ratio = sum(1 for d in decisions if d == "accepted") / len(decisions)
if accepted_ratio > 0.85 or accepted_ratio < 0.15:
alerts.append(
BiasAlert(
alert_type="score_gap",
severity="low",
message=(
f"Unusual decision ratio at score {score}: "
f"{accepted_ratio:.0%} acceptance"
),
affected_group=f"candidates_at_score_{score}",
detected_at=datetime.utcnow(),
recommendation="Review scoring function calibration.",
)
)
return alerts
def _check_pattern_anomalies(self, records: List[Dict]) -> List[BiasAlert]:
"""Detect suspicious patterns in decisions."""
alerts = []
# Pattern 1: Certain recruiters consistently reject certain demographics
recruiter_patterns = self._analyze_recruiter_patterns(records)
for recruiter_id, pattern in recruiter_patterns.items():
if pattern.get("has_bias_flag"):
alerts.append(
BiasAlert(
alert_type="pattern_anomaly",
severity="medium",
message=f"Recruiter {recruiter_id} shows unusual decision pattern",
affected_group=f"recruiter_{recruiter_id}",
detected_at=datetime.utcnow(),
recommendation=(
"Audit this recruiter's decisions; "
"consider blind review or structured interviews."
),
)
)
return alerts
def _group_by_demographics(self, records: List[Dict]) -> Dict[str, List]:
"""Infer demographics from candidate names/emails for bias analysis."""
groups = {
"senior": [],
"junior": [],
"east_africa": [],
"west_africa": [],
"south_asia": [],
"anglo": [],
}
for record in records:
candidate_name = record.get("candidate_name", "").lower()
email = record.get("email", "").lower()
# Infer experience level (heuristic: title/name mentions)
if any(x in candidate_name for x in ["senior", "lead", "principal"]):
groups["senior"].append(record)
else:
groups["junior"].append(record)
# Infer geographic/cultural background (name-based, not 100% accurate)
if self._is_east_african_name(candidate_name):
groups["east_africa"].append(record)
elif self._is_west_african_name(candidate_name):
groups["west_africa"].append(record)
elif self._is_south_asian_name(candidate_name):
groups["south_asia"].append(record)
else:
groups["anglo"].append(record)
# Keep only groups with data
return {k: v for k, v in groups.items() if v}
def _is_east_african_name(self, name: str) -> bool:
"""Heuristic: detect East African names."""
patterns = ["njeri", "kipkemboi", "mutua", "koech", "kinyua", "muyeni"]
return any(p in name for p in patterns)
def _is_west_african_name(self, name: str) -> bool:
"""Heuristic: detect West African names."""
patterns = ["okonkwo", "adeyemi", "otchere", "mensah", "diallo", "faye"]
return any(p in name for p in patterns)
def _is_south_asian_name(self, name: str) -> bool:
"""Heuristic: detect South Asian names."""
patterns = ["sharma", "patel", "singh", "gupta", "banerjee", "krishnan"]
return any(p in name for p in patterns)
def _analyze_recruiter_patterns(self, records: List[Dict]) -> Dict:
"""Analyze each recruiter's decision patterns."""
patterns = {}
for record in records:
recruiter_id = record.get("recruiter_id", "unknown")
if recruiter_id not in patterns:
patterns[recruiter_id] = {
"total_decisions": 0,
"acceptance_rate": 0.0,
"has_bias_flag": False,
}
patterns[recruiter_id]["total_decisions"] += 1
if record.get("recruiter_decision") == "accepted":
patterns[recruiter_id]["acceptance_rate"] += 1
# Normalize rates and flag outliers
for recruiter_id, data in patterns.items():
if data["total_decisions"] >= 5:
rate = data["acceptance_rate"] / data["total_decisions"]
# Flag if acceptance rate < 10% or > 90% (unusually extreme)
if rate < 0.1 or rate > 0.9:
data["has_bias_flag"] = True
data["acceptance_rate"] = rate
return patterns
def _generate_recommendations(self, alerts: List[BiasAlert]) -> List[str]:
"""Generate actionable recommendations based on detected biases."""
recommendations = [
"✓ Implement blind resume review (remove names/photos)",
"✓ Use structured interviews with standardized questions",
"✓ Train recruiters on unconscious bias",
"✓ Regular bias audits (monthly minimum)",
"✓ Document decision rationale for all hires",
]
# Add severity-based recommendations
high_severity = [a for a in alerts if a.severity == "high"]
if high_severity:
recommendations.insert(
0, "⚠️ HIGH PRIORITY: Halt hiring review for affected groups"
)
return recommendations
def get_alerts_summary(self) -> Dict:
"""Get summary of all detected alerts."""
if not self.alerts:
return {"status": "no_alerts", "bias_risk": "low"}
severity_counts = {}
for alert in self.alerts:
severity_counts[alert.severity] = severity_counts.get(alert.severity, 0) + 1
risk_level = "high" if severity_counts.get("high", 0) > 0 else (
"medium" if severity_counts.get("medium", 0) > 0 else "low"
)
return {
"status": "alerts_detected",
"bias_risk": risk_level,
"alert_counts": severity_counts,
"alerts": [
{
"type": a.alert_type,
"severity": a.severity,
"message": a.message,
"group": a.affected_group,
}
for a in self.alerts[:10]
],
}
|