| """Build the final matching artifacts for the project. |
| |
| This script creates a reproducible pipeline that: |
| - exports a labeled dataset mixing real DB records and synthetic augmentation |
| - trains a supervised baseline model with train/test separation |
| - benchmarks it against the lightweight semantic matcher |
| - writes a final model bundle and a JSON report for the defense/demo |
| |
| Usage: |
| /Users/elhadjibassirousy/Desktop/AI-Talent-Finder/.venv/bin/python \ |
| backend/scripts/build_final_matching_artifacts.py \ |
| --db backend/ai_talent_finder.db |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import random |
| import sqlite3 |
| import sys |
| from dataclasses import asdict, dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import joblib |
| import numpy as np |
| import pandas as pd |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score |
| from sklearn.model_selection import train_test_split |
|
|
| script_dir = Path(__file__).resolve().parent |
| repo_root = script_dir.parent.parent |
| if str(repo_root / "backend") not in sys.path: |
| sys.path.insert(0, str(repo_root / "backend")) |
|
|
| from app.services.data_normalization import parse_experience_years |
| from app.services.feature_engineering import build_pair_features, fit_pair_vectorizer |
| from app.services.lightweight_siamese import get_siamese_matcher |
| from app.services.normalization import normalize_skill_name, normalize_text |
| from app.services.scoring import compute_match_score |
| from app.services.synthetic_data import SKILLS_POOL, generate_synthetic_candidate |
|
|
|
|
| @dataclass |
| class SplitMetrics: |
| accuracy: float |
| precision: float |
| recall: float |
| f1: float |
| roc_auc: float | None |
| threshold: float |
|
|
|
|
| def _ensure_parent(path: Path) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| def _normalize_skill_list(skills: list[str] | None) -> list[str]: |
| return [normalize_skill_name(skill) for skill in (skills or []) if normalize_skill_name(skill)] |
|
|
|
|
| def _candidate_text(record: dict[str, Any]) -> str: |
| parts: list[str] = [record.get("full_name") or "", record.get("raw_text") or ""] |
| parts.extend(record.get("skills", [])) |
| parts.extend(record.get("companies", [])) |
| parts.extend(record.get("job_titles", [])) |
| parts.extend(record.get("education", [])) |
| parts.extend(record.get("languages", [])) |
| return normalize_text(" \n ".join(part for part in parts if part)) |
|
|
|
|
| def _job_text(record: dict[str, Any]) -> str: |
| parts: list[str] = [record.get("title") or "", record.get("description") or ""] |
| parts.extend(record.get("required_skills", [])) |
| parts.extend(record.get("languages_required", [])) |
| parts.append(str(record.get("required_years") or "")) |
| return normalize_text(" \n ".join(part for part in parts if part)) |
|
|
|
|
| def _load_real_data(db_path: Path) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: |
| conn = sqlite3.connect(str(db_path)) |
| conn.row_factory = sqlite3.Row |
| cur = conn.cursor() |
|
|
| candidates: list[dict[str, Any]] = [] |
| for row in cur.execute( |
| """ |
| SELECT id, full_name, email, raw_text, extraction_quality_score, ner_extraction_data, |
| extracted_job_titles, extracted_companies, extracted_education, is_fully_extracted |
| FROM candidates |
| ORDER BY created_at DESC |
| """ |
| ).fetchall(): |
| payload: dict[str, Any] = {} |
| if row["ner_extraction_data"]: |
| try: |
| payload = json.loads(row["ner_extraction_data"]) |
| except Exception: |
| payload = {} |
|
|
| skills = _normalize_skill_list(payload.get("skills") or []) |
| companies = payload.get("companies") or [] |
| job_titles = payload.get("job_titles") or [] |
| education = payload.get("education") or [] |
| languages = payload.get("languages") or [] |
| experience_years = parse_experience_years(row["raw_text"] or "") |
|
|
| candidates.append( |
| { |
| "id": int(row["id"]), |
| "source": "real_db", |
| "full_name": row["full_name"] or "", |
| "email": row["email"] or "", |
| "raw_text": row["raw_text"] or "", |
| "skills": skills, |
| "companies": companies, |
| "job_titles": job_titles, |
| "education": education, |
| "languages": languages, |
| "experience_years": experience_years, |
| "quality_score": float(row["extraction_quality_score"] or 0.0), |
| "is_fully_extracted": bool(row["is_fully_extracted"]), |
| } |
| ) |
|
|
| jobs: list[dict[str, Any]] = [] |
| for row in cur.execute( |
| """ |
| SELECT jc.id, jc.title, jc.description, jc.created_at |
| FROM job_criteria jc |
| ORDER BY jc.created_at DESC |
| """ |
| ).fetchall(): |
| skill_rows = cur.execute( |
| """ |
| SELECT s.name |
| FROM criteria_skills cs |
| JOIN skills s ON s.id = cs.skill_id |
| WHERE cs.criteria_id = ? |
| ORDER BY cs.id ASC |
| """, |
| (row["id"],), |
| ).fetchall() |
| required_skills = _normalize_skill_list([skill_row["name"] for skill_row in skill_rows]) |
| jobs.append( |
| { |
| "id": int(row["id"]), |
| "source": "real_db", |
| "title": row["title"] or "", |
| "description": row["description"] or "", |
| "required_skills": required_skills, |
| "required_years": parse_experience_years((row["description"] or "") + " " + (row["title"] or "")), |
| "languages_required": [], |
| } |
| ) |
|
|
| conn.close() |
| return candidates, jobs |
|
|
|
|
| def _heuristic_label(candidate: dict[str, Any], job: dict[str, Any]) -> tuple[int, float]: |
| candidate_skills = _normalize_skill_list(candidate.get("skills", [])) |
| job_skills = _normalize_skill_list(job.get("required_skills", [])) |
|
|
| candidate_years = int(candidate.get("experience_years") or parse_experience_years(candidate.get("raw_text", "")) or 0) |
| job_years = int(job.get("required_years") or parse_experience_years(job.get("description", "")) or 0) |
|
|
| intersection = set(candidate_skills) & set(job_skills) |
| union = set(candidate_skills) | set(job_skills) |
| semantic_similarity = len(intersection) / max(1, len(union)) |
|
|
| score = compute_match_score( |
| cv_skills=candidate_skills, |
| job_skills=job_skills, |
| cv_years=candidate_years, |
| job_years=job_years, |
| cv_edu_level=2, |
| job_edu_level=2, |
| similarity_score=semantic_similarity, |
| ) |
| label = 1 if score >= 0.60 else 0 |
| return label, float(score) |
|
|
|
|
| def _rows_from_pairs(candidates: list[dict[str, Any]], jobs: list[dict[str, Any]], source: str) -> list[dict[str, Any]]: |
| rows: list[dict[str, Any]] = [] |
| for candidate in candidates: |
| candidate_text = _candidate_text(candidate) |
| for job in jobs: |
| job_text = _job_text(job) |
| label, score = _heuristic_label(candidate, job) |
| rows.append( |
| { |
| "source": source, |
| "cv_id": candidate.get("id"), |
| "job_id": job.get("id"), |
| "candidate_name": candidate.get("full_name", ""), |
| "job_title": job.get("title", ""), |
| "cv_text": candidate_text, |
| "job_text": job_text, |
| "label": label, |
| "heuristic_score": round(score, 4), |
| } |
| ) |
| return rows |
|
|
|
|
| def _synthetic_job_from_candidate(candidate: dict[str, Any], positive: bool, rng: random.Random, job_id: int) -> dict[str, Any]: |
| candidate_skills = _normalize_skill_list(candidate.get("skills", [])) |
| if positive: |
| required_skills = candidate_skills[:] |
| if len(required_skills) > 4: |
| required_skills = rng.sample(required_skills, 4) |
| if not required_skills: |
| required_skills = [rng.choice(SKILLS_POOL)] |
| title = f"Senior {' '.join(required_skills[:2]).replace(' ', '')} Engineer" |
| description = ( |
| f"Looking for a developer with strong skills in {', '.join(required_skills)} " |
| f"and {max(0, int(candidate.get('experience_years') or 0) - 1)}+ years of experience." |
| ) |
| required_years = max(0, int(candidate.get("experience_years") or 0) - 1) |
| else: |
| disjoint_pool = [skill for skill in SKILLS_POOL if skill not in candidate_skills] |
| if len(disjoint_pool) < 3: |
| disjoint_pool = SKILLS_POOL[:] |
| required_skills = rng.sample(disjoint_pool, min(4, len(disjoint_pool))) |
| title = "Unrelated Engineer" |
| description = f"Looking for a profile with expertise in {', '.join(required_skills)}." |
| required_years = int(candidate.get("experience_years") or 0) + 3 |
|
|
| return { |
| "id": job_id, |
| "source": "synthetic", |
| "title": title, |
| "description": description, |
| "required_skills": required_skills, |
| "required_years": required_years, |
| "languages_required": ["English"], |
| } |
|
|
|
|
| def _build_dataset(db_path: Path, synthetic_candidates: int, synthetic_jobs: int, seed: int) -> pd.DataFrame: |
| real_candidates, real_jobs = _load_real_data(db_path) |
| rows: list[dict[str, Any]] = [] |
|
|
| if real_candidates and real_jobs: |
| rows.extend(_rows_from_pairs(real_candidates, real_jobs, source="real_db")) |
|
|
| rng = random.Random(seed) |
| synthetic_candidates_rows = [] |
| for index in range(synthetic_candidates): |
| item = generate_synthetic_candidate(user_id=10_000 + index) |
| synthetic_candidates_rows.append( |
| { |
| **item, |
| "source": "synthetic", |
| "full_name": item.get("full_name", ""), |
| "email": item.get("email", ""), |
| "raw_text": " ".join( |
| [ |
| item.get("full_name", ""), |
| " ".join(item.get("normalized_skills", [])), |
| str(item.get("experience_years", 0)), |
| item.get("education", ""), |
| " ".join(item.get("languages", [])), |
| ] |
| ), |
| "skills": item.get("normalized_skills", []), |
| "companies": [], |
| "job_titles": [], |
| "education": [item.get("education", "")], |
| "languages": item.get("languages", []), |
| } |
| ) |
|
|
| synthetic_pairs: list[dict[str, Any]] = [] |
| next_job_id = 20_000 |
| for candidate in synthetic_candidates_rows: |
| positive_job = _synthetic_job_from_candidate(candidate, positive=True, rng=rng, job_id=next_job_id) |
| next_job_id += 1 |
| negative_job = _synthetic_job_from_candidate(candidate, positive=False, rng=rng, job_id=next_job_id) |
| next_job_id += 1 |
|
|
| for job, label in ((positive_job, 1), (negative_job, 0)): |
| job_text = _job_text(job) |
| candidate_text = _candidate_text(candidate) |
| _, score = _heuristic_label(candidate, job) |
| synthetic_pairs.append( |
| { |
| "source": "synthetic", |
| "cv_id": candidate.get("id"), |
| "job_id": job.get("id"), |
| "candidate_name": candidate.get("full_name", ""), |
| "job_title": job.get("title", ""), |
| "cv_text": candidate_text, |
| "job_text": job_text, |
| "label": label, |
| "heuristic_score": round(score, 4), |
| } |
| ) |
|
|
| rows.extend(synthetic_pairs) |
|
|
| df = pd.DataFrame(rows) |
| if df.empty: |
| raise RuntimeError("No training rows could be built from the database or synthetic augmentation.") |
|
|
| df = df.drop_duplicates(subset=["cv_text", "job_text", "label"]).reset_index(drop=True) |
| return df |
|
|
|
|
| def _build_matrix(df: pd.DataFrame, meta) -> np.ndarray: |
| return np.vstack([ |
| build_pair_features(str(row.cv_text), str(row.job_text), meta) |
| for row in df.itertuples(index=False) |
| ]) |
|
|
|
|
| def _train_model(X_train: np.ndarray, y_train: np.ndarray): |
| try: |
| from xgboost import XGBClassifier |
|
|
| model = XGBClassifier( |
| n_estimators=250, |
| max_depth=6, |
| learning_rate=0.05, |
| subsample=0.9, |
| colsample_bytree=0.85, |
| eval_metric="logloss", |
| random_state=42, |
| ) |
| model.fit(X_train, y_train) |
| model_name = "xgboost" |
| except Exception: |
| model = LogisticRegression(max_iter=2000, class_weight="balanced") |
| model.fit(X_train, y_train) |
| model_name = "logistic_regression" |
|
|
| return model, model_name |
|
|
|
|
| def _predict_scores(model, X: np.ndarray) -> np.ndarray: |
| try: |
| scores = model.predict_proba(X)[:, 1] |
| except Exception: |
| try: |
| raw_scores = model.decision_function(X) |
| scores = 1.0 / (1.0 + np.exp(-raw_scores)) |
| except Exception: |
| scores = model.predict(X).astype(float) |
| return np.clip(scores.astype(float), 0.0, 1.0) |
|
|
|
|
| def _metrics(y_true: np.ndarray, scores: np.ndarray, threshold: float = 0.5) -> SplitMetrics: |
| preds = (scores >= threshold).astype(int) |
| roc_auc = None |
| try: |
| roc_auc = float(roc_auc_score(y_true, scores)) |
| except Exception: |
| roc_auc = None |
|
|
| return SplitMetrics( |
| accuracy=float(accuracy_score(y_true, preds)), |
| precision=float(precision_score(y_true, preds, zero_division=0)), |
| recall=float(recall_score(y_true, preds, zero_division=0)), |
| f1=float(f1_score(y_true, preds, zero_division=0)), |
| roc_auc=roc_auc, |
| threshold=float(threshold), |
| ) |
|
|
|
|
| def _best_f1_threshold(y_true: np.ndarray, scores: np.ndarray) -> float: |
| best_threshold = 0.5 |
| best_f1 = -1.0 |
| for threshold in np.linspace(0.0, 1.0, 101): |
| f1 = f1_score(y_true, (scores >= threshold).astype(int), zero_division=0) |
| if f1 > best_f1: |
| best_f1 = float(f1) |
| best_threshold = float(threshold) |
| return best_threshold |
|
|
|
|
| def _threshold_for_precision(y_true: np.ndarray, scores: np.ndarray, target_precision: float, fallback: float) -> float: |
| candidates = np.linspace(0.0, 1.0, 101) |
| best = fallback |
| for threshold in candidates: |
| predictions = (scores >= threshold).astype(int) |
| precision = precision_score(y_true, predictions, zero_division=0) |
| if precision >= target_precision: |
| best = float(threshold) |
| break |
| return float(best) |
|
|
|
|
| def build_and_train(db_path: Path, synthetic_candidates: int, synthetic_jobs: int, seed: int) -> dict[str, Any]: |
| df = _build_dataset(db_path, synthetic_candidates, synthetic_jobs, seed) |
|
|
| dataset_path = repo_root / "data" / "final_training_pairs.csv" |
| review_sample_path = repo_root / "data" / "final_training_review_sample.csv" |
| report_path = repo_root / "reports" / "advanced_matching_report.json" |
| model_path = repo_root / "models" / "final_match_model.joblib" |
| fallback_model_path = repo_root / "models" / "baseline_model.joblib" |
|
|
| _ensure_parent(dataset_path) |
| _ensure_parent(review_sample_path) |
| _ensure_parent(report_path) |
| _ensure_parent(model_path) |
|
|
| df.to_csv(dataset_path, index=False) |
| df.sample(min(200, len(df)), random_state=seed).to_csv(review_sample_path, index=False) |
|
|
| train_df, test_df = train_test_split( |
| df, |
| test_size=0.20, |
| random_state=seed, |
| stratify=df["label"], |
| ) |
|
|
| meta = fit_pair_vectorizer(train_df["cv_text"].tolist(), train_df["job_text"].tolist()) |
| X_train = _build_matrix(train_df, meta) |
| X_test = _build_matrix(test_df, meta) |
| y_train = train_df["label"].to_numpy() |
| y_test = test_df["label"].to_numpy() |
|
|
| model, model_name = _train_model(X_train, y_train) |
|
|
| train_scores = _predict_scores(model, X_train) |
| test_scores = _predict_scores(model, X_test) |
|
|
| train_metrics = _metrics(y_train, train_scores, threshold=0.5) |
| test_metrics = _metrics(y_test, test_scores, threshold=0.5) |
| best_threshold = _best_f1_threshold(y_test, test_scores) |
| best_metrics = _metrics(y_test, test_scores, threshold=best_threshold) |
|
|
| siamese = get_siamese_matcher() |
| semantic_scores = np.array([ |
| siamese.compute_pair_similarity(row.cv_text, row.job_text) for row in test_df.itertuples(index=False) |
| ], dtype=float) |
| semantic_metrics = _metrics(y_test, semantic_scores, threshold=0.5) |
|
|
| accept_threshold = max( |
| 0.80, |
| _threshold_for_precision(y_test, test_scores, target_precision=0.90, fallback=0.80), |
| ) |
| review_threshold = max( |
| 0.50, |
| _threshold_for_precision(y_test, test_scores, target_precision=0.70, fallback=0.50), |
| ) |
| review_threshold = float(min(review_threshold, max(0.0, accept_threshold - 0.05))) |
|
|
| positive_rate = float(df["label"].mean()) |
| dataset_summary = { |
| "rows_total": int(len(df)), |
| "rows_train": int(len(train_df)), |
| "rows_test": int(len(test_df)), |
| "source_counts": df["source"].value_counts().to_dict(), |
| "positive_rate": positive_rate, |
| "seed": int(seed), |
| } |
|
|
| thresholds = { |
| "accept_pct": round(accept_threshold * 100.0, 2), |
| "review_pct": round(review_threshold * 100.0, 2), |
| } |
|
|
| bundle = { |
| "model": model, |
| "meta": meta, |
| "model_name": model_name, |
| "thresholds": thresholds, |
| "dataset_summary": dataset_summary, |
| "training_metrics": { |
| "train": asdict(train_metrics), |
| "test": asdict(test_metrics), |
| "test_best_f1": asdict(best_metrics), |
| "lightweight_semantic": asdict(semantic_metrics), |
| }, |
| } |
|
|
| joblib.dump(bundle, model_path) |
| joblib.dump(bundle, fallback_model_path) |
|
|
| report = { |
| "dataset": dataset_summary, |
| "model": { |
| "name": model_name, |
| "path": str(model_path.resolve()), |
| "fallback_path": str(fallback_model_path.resolve()), |
| }, |
| "metrics": bundle["training_metrics"], |
| "production_recommendation": { |
| "accept_threshold_score_pct": thresholds["accept_pct"], |
| "review_threshold_score_pct": thresholds["review_pct"], |
| "env": { |
| "MATCH_ACCEPT_THRESHOLD": str(thresholds["accept_pct"]), |
| "MATCH_REVIEW_THRESHOLD": str(thresholds["review_pct"]), |
| }, |
| }, |
| } |
| report_path.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8") |
|
|
| return { |
| "dataset_path": str(dataset_path), |
| "review_sample_path": str(review_sample_path), |
| "model_path": str(model_path), |
| "fallback_model_path": str(fallback_model_path), |
| "report_path": str(report_path), |
| "report": report, |
| } |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--db", default=str(repo_root / "backend" / "ai_talent_finder.db"), help="SQLite DB path") |
| parser.add_argument("--synthetic-candidates", type=int, default=40) |
| parser.add_argument("--synthetic-jobs", type=int, default=10) |
| parser.add_argument("--seed", type=int, default=42) |
| args = parser.parse_args() |
|
|
| result = build_and_train( |
| Path(args.db), |
| synthetic_candidates=args.synthetic_candidates, |
| synthetic_jobs=args.synthetic_jobs, |
| seed=args.seed, |
| ) |
|
|
| print("=== Final matching artifacts built ===") |
| print(json.dumps(result["report"], indent=2, ensure_ascii=False)) |
| print(f"Dataset: {result['dataset_path']}") |
| print(f"Review sample: {result['review_sample_path']}") |
| print(f"Model bundle: {result['model_path']}") |
| print(f"Fallback bundle: {result['fallback_model_path']}") |
| print(f"Report: {result['report_path']}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |