| """Phase 3: Model Retraining Pipeline - Learn from recruiter feedback.""" |
|
|
| import json |
| import os |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple, Any |
| from datetime import datetime |
| import argparse |
|
|
| try: |
| import numpy as np |
| import pandas as pd |
| from sklearn.model_selection import train_test_split |
| from sklearn.ensemble import GradientBoostingClassifier |
| from sklearn.preprocessing import StandardScaler |
| import joblib |
| except ImportError: |
| np = None |
| pd = None |
|
|
|
|
| class ModelRetrainingPipeline: |
| """ |
| Retrain matching model using recruiter feedback. |
| |
| Incorporates: |
| - Recruiter overrides as training signals |
| - Corrected labels from hiring outcomes |
| - Feedback-based feature importance |
| """ |
|
|
| def __init__(self, model_path: Optional[Path] = None): |
| """Initialize retraining pipeline.""" |
| self.model_path = model_path or ( |
| Path(__file__).resolve().parents[4] / "models" / "final_match_model.joblib" |
| ) |
| self.model = None |
| self.scaler = None |
| self.feature_names = None |
| self._load_model() |
|
|
| def _load_model(self) -> None: |
| """Load existing model bundle.""" |
| if self.model_path.exists() and joblib: |
| try: |
| bundle = joblib.load(self.model_path) |
| self.model = bundle.get("model") |
| self.scaler = bundle.get("scaler") |
| self.feature_names = bundle.get("feature_names", []) |
| print(f"Loaded model from {self.model_path}") |
| except Exception as e: |
| print(f"Warning: Could not load model: {e}") |
|
|
| def prepare_features(self, retraining_data: List[Dict]) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Prepare features and labels from feedback data. |
| |
| Returns: |
| X (features), y (labels) |
| """ |
| if not np: |
| print("NumPy not available; cannot prepare features") |
| return None, None |
|
|
| features_list = [] |
| labels = [] |
|
|
| for record in retraining_data: |
| try: |
| |
| feature_vector = self._extract_features(record) |
| features_list.append(feature_vector) |
|
|
| |
| label = 1 if record.get("label") == 1 else 0 |
| labels.append(label) |
| except Exception as e: |
| print(f"Warning: Could not process record: {e}") |
| continue |
|
|
| if not features_list: |
| print("No valid features extracted") |
| return None, None |
|
|
| X = np.array(features_list) |
| y = np.array(labels) |
| return X, y |
|
|
| def _extract_features(self, record: Dict) -> List[float]: |
| """Extract feature vector from candidate+job record.""" |
| cv_text = record.get("cv_text", "").lower() |
| job_text = record.get("job_description", "").lower() |
|
|
| features = [] |
|
|
| |
| job_skills = self._parse_skills(job_text) |
| cv_skills = self._parse_skills(cv_text) |
| skill_overlap = ( |
| len(set(cv_skills) & set(job_skills)) / len(job_skills) |
| if job_skills |
| else 0 |
| ) |
| features.append(skill_overlap) |
|
|
| |
| experiences = cv_text.count("years of experience") + cv_text.count("yrs") |
| features.append(min(experiences / 10, 1.0)) |
|
|
| |
| edu_score = 0 |
| if "phd" in cv_text or "doctorate" in cv_text: |
| edu_score = 1.0 |
| elif "master" in cv_text or "m.s." in cv_text: |
| edu_score = 0.8 |
| elif "bachelor" in cv_text or "b.s." in cv_text: |
| edu_score = 0.6 |
| features.append(edu_score) |
|
|
| |
| lang_count = sum( |
| 1 |
| for lang in ["fluent", "native", "bilingual", "multilingual"] |
| if lang in cv_text |
| ) |
| features.append(min(lang_count / 3, 1.0)) |
|
|
| |
| certs = sum( |
| 1 |
| for cert in [ |
| "aws", |
| "gcp", |
| "azure", |
| "kubernetes", |
| "docker", |
| "certified", |
| ] |
| if cert in cv_text |
| ) |
| features.append(min(certs / 3, 1.0)) |
|
|
| |
| prestigious_companies = [ |
| "google", |
| "microsoft", |
| "apple", |
| "amazon", |
| "facebook", |
| "ibm", |
| "netflix", |
| ] |
| company_tier = ( |
| 1.0 |
| if any(company in cv_text for company in prestigious_companies) |
| else 0.5 |
| ) |
| features.append(company_tier) |
|
|
| |
| model_score = record.get("score", 0.5) |
| features.append(model_score) |
|
|
| return features |
|
|
| def _parse_skills(self, text: str) -> List[str]: |
| """Extract skill mentions from text.""" |
| skill_keywords = [ |
| "python", |
| "javascript", |
| "java", |
| "c++", |
| "docker", |
| "kubernetes", |
| "aws", |
| "react", |
| "angular", |
| "sql", |
| "nosql", |
| "git", |
| "jenkins", |
| "ci/cd", |
| ] |
| return [skill for skill in skill_keywords if skill in text.lower()] |
|
|
| def train( |
| self, |
| X: np.ndarray, |
| y: np.ndarray, |
| test_size: float = 0.2, |
| n_estimators: int = 100, |
| ) -> Dict[str, Any]: |
| """ |
| Retrain model on feedback data. |
| |
| Args: |
| X: Feature matrix |
| y: Labels (1=accepted, 0=rejected) |
| test_size: Train/test split ratio |
| n_estimators: Number of boosting rounds |
| |
| Returns: |
| Metrics dictionary |
| """ |
| if X is None or y is None or not np: |
| return {"status": "error", "message": "Invalid data or NumPy not available"} |
|
|
| |
| try: |
| unique_labels = np.unique(y) |
| except Exception: |
| return {"status": "error", "message": "Could not determine unique labels"} |
|
|
| if len(unique_labels) < 2: |
| return { |
| "status": "skipped", |
| "reason": "Insufficient label variety: need at least 2 classes to train", |
| "unique_labels": unique_labels.tolist() if hasattr(unique_labels, 'tolist') else list(unique_labels), |
| } |
|
|
| |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=test_size, random_state=42 |
| ) |
|
|
| |
| train_unique = np.unique(y_train) |
| if len(train_unique) < 2: |
| return { |
| "status": "skipped", |
| "reason": f"Training set resulted in single class after split. Train classes: {train_unique.tolist()}", |
| "unique_labels": train_unique.tolist() if hasattr(train_unique, 'tolist') else list(train_unique), |
| } |
|
|
| |
| if not self.scaler: |
| self.scaler = StandardScaler() |
| X_train = self.scaler.fit_transform(X_train) |
| else: |
| X_train = self.scaler.transform(X_train) |
|
|
| X_test = self.scaler.transform(X_test) |
|
|
| |
| print(f"Training model on {len(X_train)} samples...") |
| model = GradientBoostingClassifier( |
| n_estimators=n_estimators, |
| learning_rate=0.05, |
| max_depth=5, |
| random_state=42, |
| ) |
| model.fit(X_train, y_train) |
|
|
| |
| train_score = model.score(X_train, y_train) |
| test_score = model.score(X_test, y_test) |
|
|
| |
| feature_names = [ |
| "skill_match_ratio", |
| "experience_years", |
| "education_level", |
| "language_proficiency", |
| "certifications", |
| "company_tier", |
| "model_score", |
| ] |
| feature_importance = list(zip(feature_names, model.feature_importances_)) |
| feature_importance.sort(key=lambda x: x[1], reverse=True) |
|
|
| metrics = { |
| "status": "success", |
| "train_accuracy": float(train_score), |
| "test_accuracy": float(test_score), |
| "samples": len(X), |
| "feature_importance": feature_importance, |
| "trained_at": datetime.utcnow().isoformat(), |
| } |
|
|
| self.model = model |
| return metrics |
|
|
| def save_model(self, output_path: Optional[Path] = None) -> str: |
| """Save retrained model.""" |
| if not self.model: |
| return "No model to save" |
|
|
| output_path = output_path or self.model_path |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| if joblib: |
| try: |
| bundle = { |
| "model": self.model, |
| "scaler": self.scaler, |
| "feature_names": [ |
| "skill_match_ratio", |
| "experience_years", |
| "education_level", |
| "language_proficiency", |
| "certifications", |
| "company_tier", |
| "model_score", |
| ], |
| "trained_at": datetime.utcnow().isoformat(), |
| "version": "phase3_feedback_v1", |
| } |
| joblib.dump(bundle, output_path) |
| return f"Model saved to {output_path}" |
| except Exception as e: |
| return f"Error saving model: {e}" |
| |
| return "Joblib not available" |
|
|
| def evaluate_on_feedback( |
| self, feedback_records: List[Dict] |
| ) -> Dict[str, Any]: |
| """ |
| Evaluate model improvements from retraining. |
| |
| Compare model predictions vs recruiter decisions. |
| """ |
| if not self.model or not feedback_records: |
| return {"status": "error"} |
|
|
| correct_before = 0 |
| correct_after = 0 |
|
|
| for record in feedback_records: |
| X, _ = self.prepare_features([record]) |
| if X is None: |
| continue |
|
|
| |
| model_pred = record.get("model_predicted_decision", "rejected") |
| recruiter_decision = record.get("recruiter_decision", "rejected") |
|
|
| if model_pred == recruiter_decision: |
| correct_before += 1 |
|
|
| |
| try: |
| X_scaled = self.scaler.transform(X) if self.scaler else X |
| new_pred_prob = self.model.predict_proba(X_scaled)[0][1] |
| new_pred = "accepted" if new_pred_prob >= 0.5 else "rejected" |
|
|
| if new_pred == recruiter_decision: |
| correct_after += 1 |
| except Exception: |
| pass |
|
|
| accuracy_before = correct_before / len(feedback_records) |
| accuracy_after = correct_after / len(feedback_records) |
| improvement = accuracy_after - accuracy_before |
|
|
| return { |
| "status": "success", |
| "accuracy_before": round(accuracy_before, 3), |
| "accuracy_after": round(accuracy_after, 3), |
| "improvement": round(improvement, 3), |
| "samples_evaluated": len(feedback_records), |
| } |
|
|
| def export_retraining_report(self, metrics: Dict, output_dir: Path) -> str: |
| """Export retraining metrics report.""" |
| output_dir.mkdir(parents=True, exist_ok=True) |
| report_path = output_dir / f"retrain_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" |
|
|
| try: |
| with open(report_path, "w") as f: |
| json.dump(metrics, f, indent=2, default=str) |
| return str(report_path) |
| except Exception as e: |
| return f"Error exporting report: {e}" |
|
|
|
|
| def main(): |
| """CLI for model retraining.""" |
| parser = argparse.ArgumentParser( |
| description="Retrain matching model using recruiter feedback" |
| ) |
| parser.add_argument( |
| "--feedback-data", |
| type=str, |
| required=True, |
| help="Path to feedback data JSON/JSONL", |
| ) |
| parser.add_argument( |
| "--output-dir", |
| type=str, |
| default="./models", |
| help="Output directory for retrained model", |
| ) |
| parser.add_argument( |
| "--epochs", |
| type=int, |
| default=100, |
| help="Number of boosting rounds", |
| ) |
| parser.add_argument( |
| "--evaluate-only", |
| action="store_true", |
| help="Evaluate only, don't retrain", |
| ) |
| args = parser.parse_args() |
|
|
| |
| feedback_data = [] |
| try: |
| with open(args.feedback_data) as f: |
| for line in f: |
| feedback_data.append(json.loads(line)) |
| except FileNotFoundError: |
| print(f"Error: {args.feedback_data} not found") |
| return |
|
|
| print(f"Loaded {len(feedback_data)} feedback records") |
|
|
| |
| pipeline = ModelRetrainingPipeline() |
|
|
| if args.evaluate_only: |
| |
| eval_metrics = pipeline.evaluate_on_feedback(feedback_data) |
| print("\n=== Evaluation Results ===") |
| print(json.dumps(eval_metrics, indent=2)) |
| else: |
| |
| X, y = pipeline.prepare_features(feedback_data) |
| if X is not None: |
| metrics = pipeline.train(X, y, n_estimators=args.epochs) |
| print("\n=== Training Metrics ===") |
| print(json.dumps(metrics, indent=2, default=str)) |
|
|
| |
| save_msg = pipeline.save_model() |
| print(f"\n{save_msg}") |
|
|
| |
| report_path = pipeline.export_retraining_report( |
| metrics, Path(args.output_dir) |
| ) |
| print(f"Report: {report_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|