ai-talent-finder-backend / scripts /retrain_feedback_model.py
ilyass yani
Deploiement backend dans HF Spaces
9df97a2
Raw
History Blame
14.2 kB
"""Phase 3: Model Retraining Pipeline - Learn from recruiter feedback."""
import json
import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime
import argparse
try:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
import joblib
except ImportError:
np = None
pd = None
class ModelRetrainingPipeline:
"""
Retrain matching model using recruiter feedback.
Incorporates:
- Recruiter overrides as training signals
- Corrected labels from hiring outcomes
- Feedback-based feature importance
"""
def __init__(self, model_path: Optional[Path] = None):
"""Initialize retraining pipeline."""
self.model_path = model_path or (
Path(__file__).resolve().parents[4] / "models" / "final_match_model.joblib"
)
self.model = None
self.scaler = None
self.feature_names = None
self._load_model()
def _load_model(self) -> None:
"""Load existing model bundle."""
if self.model_path.exists() and joblib:
try:
bundle = joblib.load(self.model_path)
self.model = bundle.get("model")
self.scaler = bundle.get("scaler")
self.feature_names = bundle.get("feature_names", [])
print(f"Loaded model from {self.model_path}")
except Exception as e:
print(f"Warning: Could not load model: {e}")
def prepare_features(self, retraining_data: List[Dict]) -> Tuple[np.ndarray, np.ndarray]:
"""
Prepare features and labels from feedback data.
Returns:
X (features), y (labels)
"""
if not np:
print("NumPy not available; cannot prepare features")
return None, None
features_list = []
labels = []
for record in retraining_data:
try:
# Extract features from CV and job description
feature_vector = self._extract_features(record)
features_list.append(feature_vector)
# Label: 1 = accepted by recruiter, 0 = rejected
label = 1 if record.get("label") == 1 else 0
labels.append(label)
except Exception as e:
print(f"Warning: Could not process record: {e}")
continue
if not features_list:
print("No valid features extracted")
return None, None
X = np.array(features_list)
y = np.array(labels)
return X, y
def _extract_features(self, record: Dict) -> List[float]:
"""Extract feature vector from candidate+job record."""
cv_text = record.get("cv_text", "").lower()
job_text = record.get("job_description", "").lower()
features = []
# 1. Skill match ratio
job_skills = self._parse_skills(job_text)
cv_skills = self._parse_skills(cv_text)
skill_overlap = (
len(set(cv_skills) & set(job_skills)) / len(job_skills)
if job_skills
else 0
)
features.append(skill_overlap)
# 2. Experience years (heuristic)
experiences = cv_text.count("years of experience") + cv_text.count("yrs")
features.append(min(experiences / 10, 1.0)) # Normalize to 0-1
# 3. Education level
edu_score = 0
if "phd" in cv_text or "doctorate" in cv_text:
edu_score = 1.0
elif "master" in cv_text or "m.s." in cv_text:
edu_score = 0.8
elif "bachelor" in cv_text or "b.s." in cv_text:
edu_score = 0.6
features.append(edu_score)
# 4. Language proficiency
lang_count = sum(
1
for lang in ["fluent", "native", "bilingual", "multilingual"]
if lang in cv_text
)
features.append(min(lang_count / 3, 1.0))
# 5. Certification count
certs = sum(
1
for cert in [
"aws",
"gcp",
"azure",
"kubernetes",
"docker",
"certified",
]
if cert in cv_text
)
features.append(min(certs / 3, 1.0))
# 6. Company tier (heuristic)
prestigious_companies = [
"google",
"microsoft",
"apple",
"amazon",
"facebook",
"ibm",
"netflix",
]
company_tier = (
1.0
if any(company in cv_text for company in prestigious_companies)
else 0.5
)
features.append(company_tier)
# 7. Model score from record
model_score = record.get("score", 0.5)
features.append(model_score)
return features
def _parse_skills(self, text: str) -> List[str]:
"""Extract skill mentions from text."""
skill_keywords = [
"python",
"javascript",
"java",
"c++",
"docker",
"kubernetes",
"aws",
"react",
"angular",
"sql",
"nosql",
"git",
"jenkins",
"ci/cd",
]
return [skill for skill in skill_keywords if skill in text.lower()]
def train(
self,
X: np.ndarray,
y: np.ndarray,
test_size: float = 0.2,
n_estimators: int = 100,
) -> Dict[str, Any]:
"""
Retrain model on feedback data.
Args:
X: Feature matrix
y: Labels (1=accepted, 0=rejected)
test_size: Train/test split ratio
n_estimators: Number of boosting rounds
Returns:
Metrics dictionary
"""
if X is None or y is None or not np:
return {"status": "error", "message": "Invalid data or NumPy not available"}
# Validate label variety before training
try:
unique_labels = np.unique(y)
except Exception:
return {"status": "error", "message": "Could not determine unique labels"}
if len(unique_labels) < 2:
return {
"status": "skipped",
"reason": "Insufficient label variety: need at least 2 classes to train",
"unique_labels": unique_labels.tolist() if hasattr(unique_labels, 'tolist') else list(unique_labels),
}
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# Validate training split has multiple classes
train_unique = np.unique(y_train)
if len(train_unique) < 2:
return {
"status": "skipped",
"reason": f"Training set resulted in single class after split. Train classes: {train_unique.tolist()}",
"unique_labels": train_unique.tolist() if hasattr(train_unique, 'tolist') else list(train_unique),
}
# Scale features
if not self.scaler:
self.scaler = StandardScaler()
X_train = self.scaler.fit_transform(X_train)
else:
X_train = self.scaler.transform(X_train)
X_test = self.scaler.transform(X_test)
# Train model
print(f"Training model on {len(X_train)} samples...")
model = GradientBoostingClassifier(
n_estimators=n_estimators,
learning_rate=0.05,
max_depth=5,
random_state=42,
)
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Get feature importance
feature_names = [
"skill_match_ratio",
"experience_years",
"education_level",
"language_proficiency",
"certifications",
"company_tier",
"model_score",
]
feature_importance = list(zip(feature_names, model.feature_importances_))
feature_importance.sort(key=lambda x: x[1], reverse=True)
metrics = {
"status": "success",
"train_accuracy": float(train_score),
"test_accuracy": float(test_score),
"samples": len(X),
"feature_importance": feature_importance,
"trained_at": datetime.utcnow().isoformat(),
}
self.model = model
return metrics
def save_model(self, output_path: Optional[Path] = None) -> str:
"""Save retrained model."""
if not self.model:
return "No model to save"
output_path = output_path or self.model_path
output_path.parent.mkdir(parents=True, exist_ok=True)
if joblib:
try:
bundle = {
"model": self.model,
"scaler": self.scaler,
"feature_names": [
"skill_match_ratio",
"experience_years",
"education_level",
"language_proficiency",
"certifications",
"company_tier",
"model_score",
],
"trained_at": datetime.utcnow().isoformat(),
"version": "phase3_feedback_v1",
}
joblib.dump(bundle, output_path)
return f"Model saved to {output_path}"
except Exception as e:
return f"Error saving model: {e}"
return "Joblib not available"
def evaluate_on_feedback(
self, feedback_records: List[Dict]
) -> Dict[str, Any]:
"""
Evaluate model improvements from retraining.
Compare model predictions vs recruiter decisions.
"""
if not self.model or not feedback_records:
return {"status": "error"}
correct_before = 0
correct_after = 0
for record in feedback_records:
X, _ = self.prepare_features([record])
if X is None:
continue
# Model prediction (before retraining)
model_pred = record.get("model_predicted_decision", "rejected")
recruiter_decision = record.get("recruiter_decision", "rejected")
if model_pred == recruiter_decision:
correct_before += 1
# New prediction
try:
X_scaled = self.scaler.transform(X) if self.scaler else X
new_pred_prob = self.model.predict_proba(X_scaled)[0][1]
new_pred = "accepted" if new_pred_prob >= 0.5 else "rejected"
if new_pred == recruiter_decision:
correct_after += 1
except Exception:
pass
accuracy_before = correct_before / len(feedback_records)
accuracy_after = correct_after / len(feedback_records)
improvement = accuracy_after - accuracy_before
return {
"status": "success",
"accuracy_before": round(accuracy_before, 3),
"accuracy_after": round(accuracy_after, 3),
"improvement": round(improvement, 3),
"samples_evaluated": len(feedback_records),
}
def export_retraining_report(self, metrics: Dict, output_dir: Path) -> str:
"""Export retraining metrics report."""
output_dir.mkdir(parents=True, exist_ok=True)
report_path = output_dir / f"retrain_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
try:
with open(report_path, "w") as f:
json.dump(metrics, f, indent=2, default=str)
return str(report_path)
except Exception as e:
return f"Error exporting report: {e}"
def main():
"""CLI for model retraining."""
parser = argparse.ArgumentParser(
description="Retrain matching model using recruiter feedback"
)
parser.add_argument(
"--feedback-data",
type=str,
required=True,
help="Path to feedback data JSON/JSONL",
)
parser.add_argument(
"--output-dir",
type=str,
default="./models",
help="Output directory for retrained model",
)
parser.add_argument(
"--epochs",
type=int,
default=100,
help="Number of boosting rounds",
)
parser.add_argument(
"--evaluate-only",
action="store_true",
help="Evaluate only, don't retrain",
)
args = parser.parse_args()
# Load feedback data
feedback_data = []
try:
with open(args.feedback_data) as f:
for line in f:
feedback_data.append(json.loads(line))
except FileNotFoundError:
print(f"Error: {args.feedback_data} not found")
return
print(f"Loaded {len(feedback_data)} feedback records")
# Initialize pipeline
pipeline = ModelRetrainingPipeline()
if args.evaluate_only:
# Evaluate without retraining
eval_metrics = pipeline.evaluate_on_feedback(feedback_data)
print("\n=== Evaluation Results ===")
print(json.dumps(eval_metrics, indent=2))
else:
# Prepare and retrain
X, y = pipeline.prepare_features(feedback_data)
if X is not None:
metrics = pipeline.train(X, y, n_estimators=args.epochs)
print("\n=== Training Metrics ===")
print(json.dumps(metrics, indent=2, default=str))
# Save model
save_msg = pipeline.save_model()
print(f"\n{save_msg}")
# Export report
report_path = pipeline.export_retraining_report(
metrics, Path(args.output_dir)
)
print(f"Report: {report_path}")
if __name__ == "__main__":
main()