| """Phase 3: Bias Detection System - Monitor fairness in recruiter decisions.""" |
|
|
| from dataclasses import dataclass |
| from typing import Dict, List, Optional |
| from datetime import datetime, timedelta |
| import re |
|
|
|
|
| @dataclass |
| class BiasAlert: |
| """Alert for potential bias detected.""" |
| alert_type: str |
| severity: str |
| message: str |
| affected_group: str |
| detected_at: datetime |
| recommendation: str |
|
|
|
|
| class BiasDetector: |
| """ |
| Monitor hiring patterns for potential bias: |
| - Acceptance rate disparities by inferred demographics |
| - Score distribution anomalies |
| - Pattern-based red flags |
| """ |
|
|
| def __init__(self, db=None): |
| """Initialize detector.""" |
| self.db = db |
| self.alerts: List[BiasAlert] = [] |
|
|
| def analyze_recruiter_decisions( |
| self, |
| feedback_records: List[Dict], |
| min_samples: int = 30, |
| ) -> Dict: |
| """ |
| Analyze recruiter decisions for bias indicators. |
| |
| Args: |
| feedback_records: List of recruiter feedback dictionaries |
| min_samples: Minimum samples per group for analysis |
| |
| Returns: |
| Report with detected biases and recommendations |
| """ |
| if not feedback_records or len(feedback_records) < min_samples: |
| return {"status": "insufficient_data", "sample_count": len(feedback_records)} |
|
|
| report = { |
| "analysis_date": datetime.utcnow().isoformat(), |
| "total_records": len(feedback_records), |
| "alerts": [], |
| "disparities": {}, |
| "recommendations": [], |
| } |
|
|
| |
| disparities = self._check_acceptance_disparities(feedback_records) |
| if disparities: |
| report["disparities"] = disparities |
| report["alerts"].extend([d["alert"] for d in disparities.values()]) |
|
|
| |
| score_check = self._check_score_anomalies(feedback_records) |
| if score_check: |
| report["alerts"].extend(score_check) |
|
|
| |
| patterns = self._check_pattern_anomalies(feedback_records) |
| if patterns: |
| report["alerts"].extend(patterns) |
|
|
| |
| if report["alerts"]: |
| report["recommendations"] = self._generate_recommendations(report["alerts"]) |
|
|
| return report |
|
|
| def _check_acceptance_disparities(self, records: List[Dict]) -> Dict: |
| """Check for acceptance rate disparities (the main bias concern).""" |
| disparities = {} |
|
|
| |
| groups = self._group_by_demographics(records) |
|
|
| if len(groups) < 2: |
| return {} |
|
|
| |
| rates = {} |
| for group_name, candidates in groups.items(): |
| if len(candidates) < 10: |
| continue |
|
|
| accepted = sum(1 for c in candidates if c.get("recruiter_decision") == "accepted") |
| rate = accepted / len(candidates) |
| rates[group_name] = { |
| "rate": rate, |
| "count": len(candidates), |
| "accepted": accepted, |
| } |
|
|
| |
| if len(rates) >= 2: |
| sorted_rates = sorted(rates.items(), key=lambda x: x[1]["rate"]) |
| lowest = sorted_rates[0] |
| highest = sorted_rates[-1] |
|
|
| disparity_ratio = ( |
| highest[1]["rate"] / lowest[1]["rate"] |
| if lowest[1]["rate"] > 0 |
| else float("inf") |
| ) |
|
|
| |
| if disparity_ratio > 1.25: |
| alert_msg = ( |
| f"Acceptance rate disparity detected: {highest[0]} " |
| f"{highest[1]['rate']:.1%} vs {lowest[0]} {lowest[1]['rate']:.1%}" |
| ) |
| disparities[f"{lowest[0]}_vs_{highest[0]}"] = { |
| "alert": BiasAlert( |
| alert_type="acceptance_rate_disparity", |
| severity="high" if disparity_ratio > 1.5 else "medium", |
| message=alert_msg, |
| affected_group=lowest[0], |
| detected_at=datetime.utcnow(), |
| recommendation=( |
| f"Review scoring/decisions for {lowest[0]}. " |
| f"Conduct blind review process." |
| ), |
| ), |
| "disparity_ratio": disparity_ratio, |
| "rates": rates, |
| } |
|
|
| return disparities |
|
|
| def _check_score_anomalies(self, records: List[Dict]) -> List[BiasAlert]: |
| """Check if score distributions are anomalous.""" |
| alerts = [] |
|
|
| |
| score_consistency = {} |
| for record in records: |
| score = round(record.get("model_predicted_score", 0)) |
| if score not in score_consistency: |
| score_consistency[score] = [] |
| score_consistency[score].append(record) |
|
|
| |
| for score, recs in score_consistency.items(): |
| if len(recs) < 5: |
| continue |
|
|
| decisions = [r.get("recruiter_decision") for r in recs] |
| accepted_ratio = sum(1 for d in decisions if d == "accepted") / len(decisions) |
|
|
| if accepted_ratio > 0.85 or accepted_ratio < 0.15: |
| alerts.append( |
| BiasAlert( |
| alert_type="score_gap", |
| severity="low", |
| message=( |
| f"Unusual decision ratio at score {score}: " |
| f"{accepted_ratio:.0%} acceptance" |
| ), |
| affected_group=f"candidates_at_score_{score}", |
| detected_at=datetime.utcnow(), |
| recommendation="Review scoring function calibration.", |
| ) |
| ) |
|
|
| return alerts |
|
|
| def _check_pattern_anomalies(self, records: List[Dict]) -> List[BiasAlert]: |
| """Detect suspicious patterns in decisions.""" |
| alerts = [] |
|
|
| |
| recruiter_patterns = self._analyze_recruiter_patterns(records) |
| for recruiter_id, pattern in recruiter_patterns.items(): |
| if pattern.get("has_bias_flag"): |
| alerts.append( |
| BiasAlert( |
| alert_type="pattern_anomaly", |
| severity="medium", |
| message=f"Recruiter {recruiter_id} shows unusual decision pattern", |
| affected_group=f"recruiter_{recruiter_id}", |
| detected_at=datetime.utcnow(), |
| recommendation=( |
| "Audit this recruiter's decisions; " |
| "consider blind review or structured interviews." |
| ), |
| ) |
| ) |
|
|
| return alerts |
|
|
| def _group_by_demographics(self, records: List[Dict]) -> Dict[str, List]: |
| """Infer demographics from candidate names/emails for bias analysis.""" |
| groups = { |
| "senior": [], |
| "junior": [], |
| "east_africa": [], |
| "west_africa": [], |
| "south_asia": [], |
| "anglo": [], |
| } |
|
|
| for record in records: |
| candidate_name = record.get("candidate_name", "").lower() |
| email = record.get("email", "").lower() |
|
|
| |
| if any(x in candidate_name for x in ["senior", "lead", "principal"]): |
| groups["senior"].append(record) |
| else: |
| groups["junior"].append(record) |
|
|
| |
| if self._is_east_african_name(candidate_name): |
| groups["east_africa"].append(record) |
| elif self._is_west_african_name(candidate_name): |
| groups["west_africa"].append(record) |
| elif self._is_south_asian_name(candidate_name): |
| groups["south_asia"].append(record) |
| else: |
| groups["anglo"].append(record) |
|
|
| |
| return {k: v for k, v in groups.items() if v} |
|
|
| def _is_east_african_name(self, name: str) -> bool: |
| """Heuristic: detect East African names.""" |
| patterns = ["njeri", "kipkemboi", "mutua", "koech", "kinyua", "muyeni"] |
| return any(p in name for p in patterns) |
|
|
| def _is_west_african_name(self, name: str) -> bool: |
| """Heuristic: detect West African names.""" |
| patterns = ["okonkwo", "adeyemi", "otchere", "mensah", "diallo", "faye"] |
| return any(p in name for p in patterns) |
|
|
| def _is_south_asian_name(self, name: str) -> bool: |
| """Heuristic: detect South Asian names.""" |
| patterns = ["sharma", "patel", "singh", "gupta", "banerjee", "krishnan"] |
| return any(p in name for p in patterns) |
|
|
| def _analyze_recruiter_patterns(self, records: List[Dict]) -> Dict: |
| """Analyze each recruiter's decision patterns.""" |
| patterns = {} |
|
|
| for record in records: |
| recruiter_id = record.get("recruiter_id", "unknown") |
| if recruiter_id not in patterns: |
| patterns[recruiter_id] = { |
| "total_decisions": 0, |
| "acceptance_rate": 0.0, |
| "has_bias_flag": False, |
| } |
|
|
| patterns[recruiter_id]["total_decisions"] += 1 |
| if record.get("recruiter_decision") == "accepted": |
| patterns[recruiter_id]["acceptance_rate"] += 1 |
|
|
| |
| for recruiter_id, data in patterns.items(): |
| if data["total_decisions"] >= 5: |
| rate = data["acceptance_rate"] / data["total_decisions"] |
| |
| if rate < 0.1 or rate > 0.9: |
| data["has_bias_flag"] = True |
| data["acceptance_rate"] = rate |
|
|
| return patterns |
|
|
| def _generate_recommendations(self, alerts: List[BiasAlert]) -> List[str]: |
| """Generate actionable recommendations based on detected biases.""" |
| recommendations = [ |
| "✓ Implement blind resume review (remove names/photos)", |
| "✓ Use structured interviews with standardized questions", |
| "✓ Train recruiters on unconscious bias", |
| "✓ Regular bias audits (monthly minimum)", |
| "✓ Document decision rationale for all hires", |
| ] |
|
|
| |
| high_severity = [a for a in alerts if a.severity == "high"] |
| if high_severity: |
| recommendations.insert( |
| 0, "⚠️ HIGH PRIORITY: Halt hiring review for affected groups" |
| ) |
|
|
| return recommendations |
|
|
| def get_alerts_summary(self) -> Dict: |
| """Get summary of all detected alerts.""" |
| if not self.alerts: |
| return {"status": "no_alerts", "bias_risk": "low"} |
|
|
| severity_counts = {} |
| for alert in self.alerts: |
| severity_counts[alert.severity] = severity_counts.get(alert.severity, 0) + 1 |
|
|
| risk_level = "high" if severity_counts.get("high", 0) > 0 else ( |
| "medium" if severity_counts.get("medium", 0) > 0 else "low" |
| ) |
|
|
| return { |
| "status": "alerts_detected", |
| "bias_risk": risk_level, |
| "alert_counts": severity_counts, |
| "alerts": [ |
| { |
| "type": a.alert_type, |
| "severity": a.severity, |
| "message": a.message, |
| "group": a.affected_group, |
| } |
| for a in self.alerts[:10] |
| ], |
| } |
|
|