| from fastapi import APIRouter, Depends, HTTPException, status, Header |
| from sqlalchemy.orm import Session |
| from pydantic import BaseModel, Field |
| from typing import List, Optional, Dict, Tuple, cast |
| from datetime import datetime |
| from enum import Enum |
| import os |
|
|
| from app.core.dependencies import get_db, get_current_user |
| from app.models.models import ( |
| JobCriteria, |
| MatchResult, |
| Candidate, |
| User, |
| CriteriaSkill, |
| Skill, |
| CandidateSkill, |
| ) |
|
|
|
|
| router = APIRouter( |
| prefix="/api/matching", |
| tags=["matching"], |
| dependencies=[Depends(get_current_user)], |
| ) |
|
|
|
|
| class EnrichedExplanationRequest(BaseModel): |
| """Request for enriched match explanation using AI analysis.""" |
| candidate_id: int |
| job_criteria_id: int |
|
|
|
|
| class EnrichedExplanationResponse(BaseModel): |
|
|
|
|
| class SkillMetric(BaseModel): |
| """Single skill metric.""" |
| skill_name: str |
| usage_count: int |
| coverage_contribution: float |
| category: str = "tech" |
|
|
|
|
| class SkillQualityResponse(BaseModel): |
| """Overall skill quality metrics for the candidate pool.""" |
| quality_score: float |
| total_skills: int |
| unique_skills: int |
| average_usage: float |
| coverage_percentage: float |
| unused_skills: list[str] |
| trending_missing: list[str] |
| health_status: str |
| recommendations: list[str] |
| pareto_analysis: dict |
|
|
|
|
| @router.get("/admin/skills-quality", response_model=SkillQualityResponse) |
| def get_skills_quality_metrics( |
| db: Session = Depends(get_db), |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Get skill quality metrics for the candidate pool. |
| Provides insights on skill distribution, gaps, and recommendations. |
| Requires admin access. |
| """ |
| try: |
| from ai_module.matching.skill_quality import SkillQualityAnalyzer |
|
|
| analyzer = SkillQualityAnalyzer() |
| metrics = analyzer.compute_metrics(db) |
| |
| return SkillQualityResponse( |
| quality_score=metrics.get("quality_score", 0), |
| total_skills=metrics.get("total_skills", 0), |
| unique_skills=metrics.get("unique_skills", 0), |
| average_usage=metrics.get("average_usage", 0), |
| coverage_percentage=metrics.get("coverage_percentage", 0), |
| unused_skills=metrics.get("unused_skills", []), |
| trending_missing=metrics.get("trending_missing", []), |
| health_status=metrics.get("health_status", "unknown"), |
| recommendations=metrics.get("recommendations", []), |
| pareto_analysis=metrics.get("pareto_analysis", {}), |
| ) |
| except Exception as e: |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error computing skill quality metrics: {str(e)}" |
| ) |
| """Enriched explanation with detailed AI analysis.""" |
| score: float |
| coverage: float |
| matched_skills: list[str] |
| missing_skills: list[str] |
| summary: str |
| strengths: list[str] = Field(default_factory=list) |
| gaps: list[str] = Field(default_factory=list) |
| recommendation: dict = Field(default_factory=dict) |
| confidence: float = 0.0 |
|
|
|
|
| @router.post("/enriched-explanation", response_model=EnrichedExplanationResponse) |
| def get_enriched_match_explanation( |
| request: EnrichedExplanationRequest, |
| db: Session = Depends(get_db), |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Generate enriched explanation using AI-powered analysis. |
| Provides detailed breakdown of match with strengths, gaps, and recommendations. |
| """ |
| |
| candidate = db.query(Candidate).filter(Candidate.id == request.candidate_id).first() |
| if not candidate: |
| raise HTTPException(status_code=404, detail="Candidate not found") |
| |
| |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == request.job_criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Job criteria not found") |
| |
| try: |
| |
| criteria_skills = _load_criteria_skills(criteria.id, db) |
| skill_universe = build_skill_universe(db) |
| score, details = score_candidate_against_criteria(candidate, criteria_skills, skill_universe) |
| |
| |
| enriched = generate_enriched_explanation(candidate, score, details, criteria_skills) |
| |
| return EnrichedExplanationResponse( |
| score=enriched.get("score", score), |
| coverage=enriched.get("coverage", float(details.get("coverage", 0))), |
| matched_skills=enriched.get("matched_skills", []), |
| missing_skills=enriched.get("missing_skills", []), |
| summary=enriched.get("summary", ""), |
| strengths=enriched.get("strengths", []), |
| gaps=enriched.get("gaps", []), |
| recommendation=enriched.get("recommendation", {}), |
| confidence=enriched.get("confidence", 0.0), |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error generating enriched explanation: {str(e)}") |
| """ |
| Matching API routes - Recruteur workflow |
| MODES: |
| 1️⃣ Mode recherche: Chercher dans candidats existants |
| 2️⃣ Mode génération profil idéal: Décrire le besoin, l'IA génère le profil |
| """ |
| from fastapi import APIRouter, Depends, HTTPException, status, Header |
| from sqlalchemy.orm import Session |
| from typing import List, Optional, Dict, Tuple, cast |
| from pydantic import BaseModel, Field |
| from datetime import datetime |
| from enum import Enum |
| import os |
|
|
| from app.core.dependencies import get_db, get_current_user |
| from app.models.models import ( |
| JobCriteria, |
| MatchResult, |
| Candidate, |
| User, |
| CriteriaSkill, |
| Skill, |
| CandidateSkill |
| ) |
| from app.services.matching_engine import build_skill_universe, build_explanation_payload, score_candidate_against_criteria |
| from app.services.matching_engine import build_skill_universe, build_explanation_payload, score_candidate_against_criteria, generate_enriched_explanation |
| from app.services.feature_engineering import build_pair_features, PairFeatureMeta |
| from app.services.normalization import normalize_skill_name, normalize_text |
| from app.services.explainability_engine import generate_explanation, generate_shortlist_summary |
|
|
| |
| try: |
| from ai_module.nlp.profile_generator import ProfileGenerator |
| PROFILE_GENERATOR_AVAILABLE = True |
| except Exception as e: |
| print(f"⚠️ ProfileGenerator not available: {e}") |
| PROFILE_GENERATOR_AVAILABLE = False |
|
|
| try: |
| from ai_module.matching.semantic_matcher import SemanticSkillMatcher |
| SEMANTIC_MATCHER_AVAILABLE = True |
| except Exception as e: |
| print(f"⚠️ SemanticSkillMatcher not available: {e}") |
| SEMANTIC_MATCHER_AVAILABLE = False |
|
|
| try: |
| from ai_module.nlp.skill_extractor import SkillExtractor |
| SKILL_EXTRACTOR_AVAILABLE = True |
| except Exception as e: |
| print(f"⚠️ SkillExtractor not available: {e}") |
| SKILL_EXTRACTOR_AVAILABLE = False |
|
|
|
|
| router = APIRouter( |
| prefix="/api/matching", |
| tags=["matching"], |
| dependencies=[Depends(get_current_user)] |
| ) |
|
|
| import json |
| import joblib |
| from pathlib import Path |
| import numpy as np |
|
|
| |
| _BASELINE_MODEL: dict | None = None |
| _SIAMESE_MODEL = None |
| _SIAMESE_MODEL_PATH: str | None = None |
| _MATCH_THRESHOLDS: dict[str, float] | None = None |
|
|
|
|
| def _load_baseline_model() -> dict | None: |
| global _BASELINE_MODEL |
| if _BASELINE_MODEL is not None: |
| return _BASELINE_MODEL |
|
|
| model_root = Path(__file__).resolve().parents[3] / "models" |
| candidates = [ |
| model_root / "final_match_model.joblib", |
| model_root / "baseline_model.joblib", |
| ] |
| model_path = next((path for path in candidates if path.exists()), None) |
| if model_path is None: |
| return None |
|
|
| try: |
| _BASELINE_MODEL = joblib.load(model_path) |
| thresholds = _BASELINE_MODEL.get("thresholds") if isinstance(_BASELINE_MODEL, dict) else None |
| if isinstance(thresholds, dict): |
| global _MATCH_THRESHOLDS |
| _MATCH_THRESHOLDS = { |
| "accept_pct": float(thresholds.get("accept_pct", 94.78)), |
| "review_pct": float(thresholds.get("review_pct", 89.78)), |
| } |
| return _BASELINE_MODEL |
| except Exception: |
| return None |
|
|
|
|
| def _load_siamese_model() -> tuple[object, str] | tuple[None, None]: |
| global _SIAMESE_MODEL, _SIAMESE_MODEL_PATH |
|
|
| if _SIAMESE_MODEL is not None and _SIAMESE_MODEL_PATH is not None: |
| return _SIAMESE_MODEL, _SIAMESE_MODEL_PATH |
|
|
| model_root = Path(__file__).resolve().parents[3] / "models" |
| candidates = [ |
| model_root / "siamese_model_phase2_full", |
| model_root / "siamese_model_phase2", |
| model_root / "siamese_model", |
| ] |
|
|
| existing = [path for path in candidates if path.exists()] |
| if not existing: |
| return None, None |
|
|
| try: |
| from sentence_transformers import SentenceTransformer |
| except Exception: |
| return None, None |
|
|
| for model_path in existing: |
| try: |
| _SIAMESE_MODEL = SentenceTransformer(str(model_path)) |
| _SIAMESE_MODEL_PATH = str(model_path) |
| return _SIAMESE_MODEL, _SIAMESE_MODEL_PATH |
| except Exception: |
| continue |
|
|
| return None, None |
|
|
|
|
| def _score_with_siamese(model: object, candidate_text: str, job_text: str) -> float: |
| embeddings = model.encode( |
| [candidate_text, job_text], |
| convert_to_numpy=True, |
| normalize_embeddings=True, |
| show_progress_bar=False, |
| ) |
| similarity = float(np.dot(embeddings[0], embeddings[1])) |
| return float(np.clip(similarity, 0.0, 1.0) * 100.0) |
|
|
|
|
| def _decision_from_score(score_pct: float) -> str: |
| global _MATCH_THRESHOLDS |
| if _MATCH_THRESHOLDS is None: |
| _load_baseline_model() |
|
|
| accept_threshold = float( |
| os.getenv( |
| "MATCH_ACCEPT_THRESHOLD", |
| str((_MATCH_THRESHOLDS or {}).get("accept_pct", 94.78)), |
| ) |
| ) |
| review_threshold = float( |
| os.getenv( |
| "MATCH_REVIEW_THRESHOLD", |
| str((_MATCH_THRESHOLDS or {}).get("review_pct", 89.78)), |
| ) |
| ) |
|
|
| |
| if review_threshold > accept_threshold: |
| review_threshold = accept_threshold |
|
|
| if score_pct >= accept_threshold: |
| return "accepted" |
| if score_pct >= review_threshold: |
| return "review" |
| return "rejected" |
|
|
|
|
| def _build_pair_features_single(candidate_text: str, job_text: str, meta: dict) -> np.ndarray: |
| if isinstance(meta, PairFeatureMeta): |
| feature_meta = meta |
| else: |
| feature_meta = PairFeatureMeta(tfidf=meta.get('tf'), svd=meta.get('svd')) |
|
|
| return build_pair_features(candidate_text, job_text, feature_meta) |
|
|
|
|
|
|
| |
| |
| |
|
|
| class MatchingMode(str, Enum): |
| search = "search" |
| generate = "generate" |
|
|
|
|
| class RequiredSkillInput(BaseModel): |
| name: str |
| weight: int = 50 |
|
|
|
|
| class JobCriteriaCreate(BaseModel): |
| """Create job criteria for matching""" |
| title: str |
| description: str |
| mode: MatchingMode = MatchingMode.search |
| required_skills: List[RequiredSkillInput] = Field(default_factory=list) |
|
|
|
|
| class JobCriteriaUpdate(BaseModel): |
| """Update an existing criteria""" |
| title: Optional[str] = None |
| description: Optional[str] = None |
| required_skills: Optional[List[RequiredSkillInput]] = None |
|
|
|
|
| class MatchResultResponse(BaseModel): |
| """Match result between criteria and candidate""" |
| id: int |
| criteria_id: int |
| candidate_id: int |
| score: float |
| explanation: Optional[str] = None |
| created_at: datetime |
|
|
| class Config: |
| from_attributes = True |
|
|
|
|
| class JobCriteriaResponse(BaseModel): |
| """Job criteria response""" |
| id: int |
| recruiter_id: int |
| title: str |
| description: str |
| created_at: datetime |
| required_skills: List[dict] = Field(default_factory=list) |
|
|
| class Config: |
| from_attributes = True |
|
|
|
|
| class CandidateMatchResponse(BaseModel): |
| """Candidate with match score""" |
| candidate_id: int |
| full_name: str |
| email: str |
| match_score: float |
| explanation: Optional[str] = None |
|
|
|
|
| class GenerateProfileRequest(BaseModel): |
| job_title: str |
| description: str |
|
|
|
|
| class SkillBreakdownResponse(BaseModel): |
| skill: str |
| weight: int |
| present: bool |
| score: float |
| contribution: float |
|
|
|
|
| class CriteriaMatchResultResponse(BaseModel): |
| match_result_id: int |
| criteria_id: int |
| candidate_id: int |
| candidate_name: str |
| candidate_email: str |
| score: float |
| coverage: float |
| matched_skills: List[str] |
| missing_skills: List[str] |
| skill_breakdown: List[SkillBreakdownResponse] |
| summary: str |
| created_at: datetime |
|
|
|
|
| class PredictSkillBreakdownResponse(BaseModel): |
| skill: str |
| present: bool |
| weight: int |
| matched: bool |
|
|
|
|
| class PredictCandidateResponse(BaseModel): |
| candidate_id: int |
| full_name: str |
| email: str |
| predicted_score: float |
| decision: str |
| coverage: float |
| matched_skills: List[str] |
| missing_skills: List[str] |
| skill_breakdown: List[PredictSkillBreakdownResponse] |
| summary: str |
|
|
|
|
| class PredictCriteriaResponse(BaseModel): |
| criteria_id: int |
| model: str |
| top_k: int |
| results: List[PredictCandidateResponse] |
|
|
|
|
| |
| |
| |
|
|
| def _normalize_weight(weight: int) -> int: |
| return max(0, min(100, int(weight))) |
|
|
|
|
| def _get_or_create_skill(db: Session, skill_name: str) -> Skill: |
| normalized_name = normalize_skill_name(skill_name) |
| if not normalized_name: |
| raise HTTPException(status_code=400, detail="Skill name cannot be empty") |
|
|
| existing_skill = db.query(Skill).filter(Skill.name.ilike(normalized_name)).first() |
| if existing_skill: |
| return existing_skill |
|
|
| created_skill = Skill(name=normalized_name, category="tech") |
| db.add(created_skill) |
| db.flush() |
| return created_skill |
|
|
|
|
| def _replace_criteria_skills(db: Session, criteria_id: int, required_skills: List[RequiredSkillInput]) -> None: |
| db.query(CriteriaSkill).filter(CriteriaSkill.criteria_id == criteria_id).delete() |
|
|
| for req_skill in required_skills: |
| if not normalize_skill_name(req_skill.name): |
| continue |
| skill = _get_or_create_skill(db, req_skill.name) |
| db.add(CriteriaSkill( |
| criteria_id=criteria_id, |
| skill_id=skill.id, |
| weight=_normalize_weight(req_skill.weight) |
| )) |
|
|
|
|
| def _build_criteria_response(criteria: JobCriteria, db: Session) -> JobCriteriaResponse: |
| criteria_skills = db.query(CriteriaSkill).filter(CriteriaSkill.criteria_id == criteria.id).all() |
| required_skills = [ |
| {"name": cs.skill.name, "weight": cs.weight} |
| for cs in criteria_skills |
| ] |
|
|
| return JobCriteriaResponse( |
| id=criteria.id, |
| recruiter_id=criteria.recruiter_id, |
| title=normalize_text(criteria.title), |
| description=normalize_text(criteria.description), |
| created_at=criteria.created_at, |
| required_skills=required_skills |
| ) |
|
|
|
|
| def _load_criteria_skills(criteria_id: int, db: Session) -> List[Dict[str, int]]: |
| rows = db.query(CriteriaSkill).filter(CriteriaSkill.criteria_id == criteria_id).order_by(CriteriaSkill.weight.desc(), CriteriaSkill.id.asc()).all() |
| return [{"name": row.skill.name, "weight": row.weight} for row in rows] |
|
|
|
|
| def _serialize_match_result(candidate: Candidate, criteria_id: int, score: float, details: Dict[str, object], stored_id: int = 0, job_title: Optional[str] = None) -> CriteriaMatchResultResponse: |
| explanation_payload = build_explanation_payload(score, details, job_title) |
| return CriteriaMatchResultResponse( |
| match_result_id=stored_id, |
| criteria_id=criteria_id, |
| candidate_id=cast(int, candidate.id), |
| candidate_name=cast(str, candidate.full_name), |
| candidate_email=cast(str, candidate.email), |
| score=score, |
| coverage=float(details.get("coverage", 0)), |
| matched_skills=list(details.get("matched_skills", [])), |
| missing_skills=list(details.get("missing_skills", [])), |
| skill_breakdown=[SkillBreakdownResponse(**item) for item in details.get("skill_breakdown", [])], |
| summary=str(explanation_payload.get("summary", "")), |
| created_at=datetime.utcnow(), |
| ) |
|
|
|
|
| def _score_all_candidates(criteria: JobCriteria, db: Session) -> List[CriteriaMatchResultResponse]: |
| criteria_skills = _load_criteria_skills(criteria.id, db) |
| skill_universe = build_skill_universe(db) |
| |
| candidates = db.query(Candidate).filter( |
| ((Candidate.is_fully_extracted == True) | (Candidate.extraction_quality_score >= 80)), |
| Candidate.full_name.isnot(None), |
| Candidate.full_name != "Unknown", |
| Candidate.full_name != "", |
| Candidate.raw_text.isnot(None) |
| ).order_by(Candidate.created_at.desc()).all() |
|
|
| results: List[CriteriaMatchResultResponse] = [] |
| for candidate in candidates: |
| score, details = score_candidate_against_criteria(candidate, criteria_skills, skill_universe) |
| results.append(_serialize_match_result(candidate, criteria.id, score, details, job_title=criteria.title)) |
|
|
| results.sort(key=lambda item: item.score, reverse=True) |
| return results |
|
|
|
|
| def _persist_match_results(db: Session, criteria_id: int, results: List[CriteriaMatchResultResponse]) -> List[MatchResult]: |
| db.query(MatchResult).filter(MatchResult.criteria_id == criteria_id).delete() |
| db.flush() |
|
|
| stored_results: List[MatchResult] = [] |
| for result in results: |
| stored = MatchResult( |
| criteria_id=criteria_id, |
| candidate_id=result.candidate_id, |
| score=result.score, |
| explanation=json.dumps({ |
| "summary": result.summary, |
| "coverage": result.coverage, |
| "matched_skills": result.matched_skills, |
| "missing_skills": result.missing_skills, |
| "skill_breakdown": [item.model_dump() for item in result.skill_breakdown], |
| }, ensure_ascii=False), |
| ) |
| db.add(stored) |
| stored_results.append(stored) |
|
|
| db.commit() |
| for stored in stored_results: |
| db.refresh(stored) |
| return stored_results |
|
|
|
|
| def _format_stored_result(result: MatchResult) -> CriteriaMatchResultResponse: |
| explanation: Dict[str, object] = {} |
| if result.explanation: |
| try: |
| explanation = json.loads(result.explanation) |
| except Exception: |
| explanation = {"summary": result.explanation} |
|
|
| candidate = result.candidate |
| matched_skills = explanation.get("matched_skills", []) if isinstance(explanation, dict) else [] |
| missing_skills = explanation.get("missing_skills", []) if isinstance(explanation, dict) else [] |
| skill_breakdown_data = explanation.get("skill_breakdown", []) if isinstance(explanation, dict) else [] |
|
|
| return CriteriaMatchResultResponse( |
| match_result_id=result.id, |
| criteria_id=result.criteria_id, |
| candidate_id=result.candidate_id, |
| candidate_name=candidate.full_name if candidate else "Unknown", |
| candidate_email=candidate.email if candidate else "", |
| score=result.score, |
| coverage=float(explanation.get("coverage", 0) if isinstance(explanation, dict) else 0), |
| matched_skills=[str(item) for item in matched_skills], |
| missing_skills=[str(item) for item in missing_skills], |
| skill_breakdown=[SkillBreakdownResponse(**item) for item in skill_breakdown_data if isinstance(item, dict)], |
| summary=str(explanation.get("summary", "")) if isinstance(explanation, dict) else "", |
| created_at=result.created_at, |
| ) |
|
|
|
|
| def _build_prediction_explainability(candidate: Candidate, criteria: JobCriteria, criteria_skills: List[Dict[str, int]]) -> Dict[str, object]: |
| candidate_skill_names: List[str] = [] |
| for candidate_skill in getattr(candidate, "candidate_skills", []) or []: |
| skill = getattr(candidate_skill, "skill", None) |
| if skill and getattr(skill, "name", None): |
| candidate_skill_names.append(str(skill.name)) |
|
|
| criteria_skill_names = [str(skill.get("name", "")) for skill in criteria_skills if skill.get("name")] |
| candidate_skill_lookup = {item.lower() for item in candidate_skill_names} |
| matched_skills = [skill for skill in criteria_skill_names if skill.lower() in candidate_skill_lookup] |
| missing_skills = [skill for skill in criteria_skill_names if skill.lower() not in candidate_skill_lookup] |
|
|
| total = len(criteria_skill_names) or 1 |
| coverage = round((len(matched_skills) / total) * 100, 1) |
|
|
| skill_breakdown: List[Dict[str, object]] = [] |
| for skill in criteria_skills: |
| skill_name = str(skill.get("name", "")) |
| is_present = skill_name.lower() in candidate_skill_lookup |
| skill_breakdown.append({ |
| "skill": skill_name, |
| "present": is_present, |
| "weight": int(skill.get("weight", 50)), |
| "matched": is_present, |
| }) |
|
|
| if matched_skills: |
| summary = f"{candidate.full_name} couvre {len(matched_skills)}/{len(criteria_skill_names)} compétences clés ({coverage:.0f}%)." |
| else: |
| summary = f"{candidate.full_name} ne couvre pas encore les compétences prioritaires du poste." |
|
|
| return { |
| "coverage": coverage, |
| "matched_skills": matched_skills, |
| "missing_skills": missing_skills, |
| "skill_breakdown": skill_breakdown, |
| "summary": summary, |
| } |
|
|
|
|
| def _compute_candidate_matches(criteria: JobCriteria, db: Session) -> List[CandidateMatchResponse]: |
| from ai_module.matching import CosineScorer |
|
|
| |
| criteria_skills_db = db.query(CriteriaSkill).filter(CriteriaSkill.criteria_id == criteria.id).all() |
| criteria_skills_dict = {cs.skill.name: _normalize_weight(cs.weight) for cs in criteria_skills_db} |
|
|
| |
| all_skills_objs = db.query(Skill).all() |
| all_skills = [s.name for s in all_skills_objs] |
|
|
| results: List[CandidateMatchResponse] = [] |
|
|
| |
| candidates = db.query(Candidate).filter( |
| ((Candidate.is_fully_extracted == True) | (Candidate.extraction_quality_score >= 80)), |
| Candidate.full_name.isnot(None), |
| Candidate.full_name != "Unknown", |
| Candidate.full_name != "", |
| Candidate.raw_text.isnot(None) |
| ).all() |
| for cand in candidates: |
| cand_skills = [cs.skill.name for cs in getattr(cand, "candidate_skills", [])] |
| details = CosineScorer.calculate_match_score(cand_skills, criteria_skills_dict, all_skills) |
| score = details.get("score", 0.0) |
|
|
| results.append(CandidateMatchResponse( |
| candidate_id=cast(int, cand.id), |
| full_name=cast(str, cand.full_name), |
| email=cast(str, cand.email), |
| match_score=score, |
| explanation=str(details.get("skill_breakdown", {})) |
| )) |
|
|
| |
| results.sort(key=lambda r: r.match_score, reverse=True) |
| return results |
|
|
|
|
| def calculate_match_score(candidate: Candidate, criteria_skills: List[dict] | Dict[str, int], criteria_job_title: str = "", criteria_companies: Optional[List[str]] = None) -> Tuple[float, Dict]: |
| """Wrapper that adapts candidate/criteria structures to the internal scorer.""" |
| from ai_module.matching import CosineScorer |
|
|
| |
| criteria_dict: Dict[str, int] = {} |
| if isinstance(criteria_skills, dict): |
| criteria_dict = {k: _normalize_weight(v) for k, v in criteria_skills.items()} |
| else: |
| for item in (criteria_skills or []): |
| if isinstance(item, dict): |
| name = item.get("name") or item.get("skill") |
| weight = item.get("weight", 50) |
| if name: |
| criteria_dict[name] = _normalize_weight(weight) |
|
|
| |
| candidate_skills = [] |
| try: |
| candidate_skills = [cs.skill.name for cs in getattr(candidate, "candidate_skills", [])] |
| except Exception: |
| if isinstance(candidate, dict): |
| candidate_skills = candidate.get("skills", []) or [] |
|
|
| |
| all_skills = list({*candidate_skills, *list(criteria_dict.keys())}) |
|
|
| details = CosineScorer.calculate_match_score(candidate_skills, criteria_dict, all_skills) |
| return details.get("score", 0.0), details |
|
|
|
|
| def _generate_profile_payload(request: GenerateProfileRequest) -> dict: |
| """Generate the ideal profile payload shared by both IA routes.""" |
| generated_profile: dict = {} |
|
|
| if PROFILE_GENERATOR_AVAILABLE: |
| try: |
| generated_profile = ProfileGenerator.generate_from_text(request.description) |
| except Exception: |
| generated_profile = {} |
|
|
| if not isinstance(generated_profile, dict): |
| generated_profile = {} |
|
|
| generated_skills = generated_profile.get("ideal_skills") or [] |
| if not generated_skills and SKILL_EXTRACTOR_AVAILABLE: |
| extractor = SkillExtractor() |
| extracted = extractor.extract_skills(request.description, threshold=85) |
| generated_skills = [{"name": item["name"], "weight": 90, "level": "Advanced"} for item in extracted[:8]] |
|
|
| if not generated_skills: |
| generated_skills = [ |
| {"name": "Communication", "weight": 80, "level": "Advanced"}, |
| {"name": "Problem Solving", "weight": 80, "level": "Advanced"}, |
| {"name": "Team Work", "weight": 70, "level": "Intermediate"}, |
| ] |
|
|
| return { |
| "title": request.job_title, |
| "description": request.description, |
| "ideal_skills": generated_skills, |
| "ideal_experience_years": generated_profile.get("ideal_experience_years", 5), |
| "ideal_education": generated_profile.get("ideal_education", "Bachelor's degree or equivalent"), |
| "ideal_languages": generated_profile.get("ideal_languages", []), |
| "industries": generated_profile.get("industries", []), |
| } |
|
|
|
|
|
|
| |
| |
| |
|
|
| @router.post("/criteria", response_model=JobCriteriaResponse) |
| async def create_job_criteria( |
| criteria: JobCriteriaCreate, |
| current_user: User = Depends(get_current_user), |
| db: Session = Depends(get_db) |
| ): |
| """ |
| Create job criteria for matching |
| |
| 🅰️ MODE 1 (Search): Décrire les besoins, le système cherche dans les candidats |
| """ |
| |
| |
| recruiter_id = current_user.id |
| |
| |
| db_criteria = JobCriteria( |
| recruiter_id=recruiter_id, |
| title=criteria.title, |
| description=criteria.description |
| ) |
| db.add(db_criteria) |
| db.flush() |
| |
| |
| _replace_criteria_skills(db, db_criteria.id, criteria.required_skills) |
| |
| db.commit() |
| db.refresh(db_criteria) |
| |
| return JobCriteriaResponse( |
| id=cast(int, db_criteria.id), |
| recruiter_id=cast(int, db_criteria.recruiter_id), |
| title=cast(str, db_criteria.title), |
| description=cast(str, db_criteria.description), |
| created_at=cast(datetime, db_criteria.created_at), |
| required_skills=criteria.required_skills |
| ) |
|
|
| |
|
|
|
|
| @router.post("/search/{criteria_id}") |
| async def search_candidates( |
| criteria_id: int, |
| current_user: User = Depends(get_current_user), |
| db: Session = Depends(get_db) |
| ) -> List[CandidateMatchResponse]: |
| """ |
| 🅰️ MODE 1 - Search candidates matching criteria |
| |
| Utilise semantic matching pour matcher intelligemment les compétences |
| même si les noms ne correspondent pas exactement. |
| |
| Algorithme: |
| 1. Récupère tous les candidats |
| 2. Calcule score de match pour chacun avec embeddings sémantiques |
| 3. Retourne triés par score (DESC) |
| """ |
| |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
| |
| return _compute_candidate_matches(criteria, db) |
|
|
|
|
|
|
|
|
|
|
| @router.get("/candidate/{candidate_id}/analysis") |
| async def get_candidate_match_analysis( |
| candidate_id: int, |
| criteria_id: int = None, |
| db: Session = Depends(get_db) |
| ) -> dict: |
| """ |
| 🔄 ÉTAPE 7 - Get detailed match analysis for candidate |
| |
| Shows: |
| - NER-extracted data (companies, job titles, skills) |
| - Component scores (skills, experience, companies) |
| - Data quality metrics |
| - Matching recommendations |
| """ |
| candidate = db.query(Candidate).filter(Candidate.id == candidate_id).first() |
| if not candidate: |
| raise HTTPException(status_code=404, detail="Candidate not found") |
| |
| |
| criteria_skills_dict = [] |
| criteria_title = "" |
| |
| if criteria_id: |
| criteria_skills = db.query(CriteriaSkill).filter( |
| CriteriaSkill.criteria_id == criteria_id |
| ).all() |
| criteria_skills_dict = [ |
| {"name": cs.skill.name, "weight": cs.weight} |
| for cs in criteria_skills |
| ] |
| |
| |
| score, details = calculate_match_score( |
| candidate, |
| criteria_skills_dict, |
| criteria_job_title=criteria_title |
| ) |
| |
| |
| import json |
| return { |
| "candidate": { |
| "id": candidate.id, |
| "name": candidate.full_name, |
| "email": candidate.email, |
| }, |
| "extraction_quality": { |
| "overall_score": candidate.extraction_quality_score or 0, |
| "fully_extracted": candidate.is_fully_extracted, |
| "data_completeness": f"{(candidate.extraction_quality_score or 0):.0f}%" |
| }, |
| "ner_extracted_data": { |
| "name": candidate.extracted_name, |
| "emails": json.loads(candidate.extracted_emails or "[]"), |
| "phones": json.loads(candidate.extracted_phones or "[]"), |
| "job_titles": json.loads(candidate.extracted_job_titles or "[]"), |
| "companies": json.loads(candidate.extracted_companies or "[]"), |
| "education": json.loads(candidate.extracted_education or "[]") |
| }, |
| "matching_analysis": { |
| "overall_score": min(100, max(0, score)), |
| "component_scores": details.get("component_scores", {}), |
| "method": details.get("method", "standard"), |
| "data_sources": details.get("data_sources", {}), |
| "matched_skills_count": details.get("matched_skills", 0), |
| "total_criteria_skills": details.get("total_skills", 0) |
| }, |
| "recommendations": { |
| "strengths": _get_strengths(candidate, details), |
| "gaps": _get_gaps(candidate, details), |
| "priority_match": score >= 75 |
| } |
| } |
|
|
|
|
| def _get_strengths(candidate: Candidate, details: Dict) -> List[str]: |
| """Extract match strengths""" |
| strengths = [] |
| |
| if details.get("component_scores", {}).get("skills", 0) >= 70: |
| strengths.append("Strong skill match") |
| |
| if details.get("component_scores", {}).get("experience_level", 0) >= 80: |
| strengths.append("High experience level") |
| |
| if candidate.extraction_quality_score and candidate.extraction_quality_score >= 70: |
| strengths.append("Complete data extraction") |
| |
| if len(candidate.candidate_skills) >= 15: |
| strengths.append("Diverse skill portfolio") |
| |
| return strengths or ["Potential candidate"] |
|
|
|
|
| def _get_gaps(candidate: Candidate, details: Dict) -> List[str]: |
| """Extract match gaps""" |
| gaps = [] |
| |
| if details.get("component_scores", {}).get("skills", 0) < 50: |
| gaps.append("Key skills missing - consider training") |
| |
| if details.get("component_scores", {}).get("experience_level", 0) < 50: |
| gaps.append("Less experience than required") |
| |
| if not candidate.extracted_job_titles: |
| gaps.append("Job title extraction unavailable") |
| |
| if not candidate.extracted_companies: |
| gaps.append("Company background extraction unavailable") |
| |
| return gaps |
|
|
|
|
| @router.post("/calculate/{candidate_id}/{criteria_id}", response_model=MatchResultResponse) |
| async def calculate_match( |
| candidate_id: int, |
| criteria_id: int, |
| db: Session = Depends(get_db) |
| ): |
| """Calculate match score for one candidate and one criteria.""" |
| candidate = db.query(Candidate).filter(Candidate.id == candidate_id).first() |
| if not candidate: |
| raise HTTPException(status_code=404, detail="Candidate not found") |
|
|
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
|
|
| criteria_skills = db.query(CriteriaSkill).filter( |
| CriteriaSkill.criteria_id == criteria_id |
| ).all() |
|
|
| criteria_skills_dict = [ |
| {"name": cs.skill.name, "weight": cs.weight} |
| for cs in criteria_skills |
| ] |
|
|
| score, details = calculate_match_score(candidate, criteria_skills_dict) |
| explanation = details.get("details") or ( |
| f"Matched {len([s for s in criteria_skills_dict if s['name'] in [cs.skill.name for cs in candidate.candidate_skills]])} required skills" |
| if criteria_skills_dict else "No skills defined for criteria" |
| ) |
|
|
| match_result = MatchResult( |
| criteria_id=criteria_id, |
| candidate_id=candidate_id, |
| score=score, |
| explanation=explanation |
| ) |
| db.add(match_result) |
| db.commit() |
| db.refresh(match_result) |
|
|
| return match_result |
|
|
|
|
|
|
| |
| |
| |
|
|
| @router.post("/generate-profile") |
| async def generate_ideal_profile( |
| request: GenerateProfileRequest, |
| current_user: User = Depends(get_current_user), |
| db: Session = Depends(get_db) |
| ) -> dict: |
| """ |
| 🅱️ MODE 2 - Generate ideal candidate profile from job description |
| |
| Utilise un générateur de profil local basé sur des règles simples. |
| """ |
| return _generate_profile_payload(request) |
|
|
|
|
| class GenerateAndMatchRequest(BaseModel): |
| """Request body for generate and match endpoint""" |
| job_title: str |
| description: str |
|
|
|
|
| @router.post("/generate-and-match") |
| async def generate_and_match( |
| request: GenerateAndMatchRequest, |
| current_user: User = Depends(get_current_user), |
| db: Session = Depends(get_db) |
| ) -> dict: |
| """ |
| 🅱️ MODE 2 - Complete workflow: |
| 1. Generate ideal profile from description |
| 2. Match against all candidates with semantic matching |
| 3. Return ranked results |
| """ |
| |
| generated_profile = _generate_profile_payload( |
| GenerateProfileRequest(job_title=request.job_title, description=request.description) |
| ) |
|
|
| ideal_skills = generated_profile.get("ideal_skills", []) |
| candidates = db.query(Candidate).filter( |
| ((Candidate.is_fully_extracted == True) | (Candidate.extraction_quality_score >= 80)), |
| Candidate.full_name.isnot(None), |
| Candidate.full_name != "Unknown", |
| Candidate.full_name != "", |
| Candidate.raw_text.isnot(None) |
| ).all() |
|
|
| |
| matches: List[CandidateMatchResponse] = [] |
| for candidate in candidates: |
| score, details = calculate_match_score( |
| candidate, |
| ideal_skills, |
| criteria_job_title=request.job_title, |
| criteria_companies=[] |
| ) |
| matches.append(CandidateMatchResponse( |
| candidate_id=candidate.id, |
| full_name=candidate.full_name, |
| email=candidate.email, |
| match_score=score, |
| explanation=details.get("details", "") |
| )) |
|
|
| matches.sort(key=lambda m: m.match_score, reverse=True) |
|
|
| return { |
| "ideal_profile": generated_profile, |
| "matches": [match.model_dump() for match in matches] |
| } |
|
|
|
|
| class RankAllResult(BaseModel): |
| """Ranked candidate entry for rank-all endpoint.""" |
| rank: int |
| candidate_id: int |
| full_name: str |
| email: str |
| score: float |
| coverage: float |
| matched_skills: List[str] |
| missing_skills: List[str] |
|
|
| class Config: |
| from_attributes = True |
|
|
|
|
| @router.get("/{criteria_id}/rank-all", response_model=List[RankAllResult]) |
| def rank_all_candidates( |
| criteria_id: int, |
| current_user: User = Depends(get_current_user), |
| db: Session = Depends(get_db), |
| ): |
| """Return all recruiter candidates scored and ranked by match score (best first). |
| |
| Only candidates belonging to the current recruiter (recruiter_id or user_id) are |
| included. The score is computed on-the-fly using CosineScorer so that even |
| candidates without prior MatchResult records are ranked. |
| """ |
| from sqlalchemy import or_ as sa_or |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
|
|
| criteria_skills = _load_criteria_skills(criteria_id, db) |
| skill_universe = build_skill_universe(db) |
|
|
| candidates = ( |
| db.query(Candidate) |
| .filter( |
| sa_or( |
| Candidate.recruiter_id == current_user.id, |
| Candidate.user_id == current_user.id, |
| ) |
| ) |
| .all() |
| ) |
|
|
| ranked: List[Dict] = [] |
| for cand in candidates: |
| score, details = score_candidate_against_criteria(cand, criteria_skills, skill_universe) |
| ranked.append({ |
| "candidate_id": cast(int, cand.id), |
| "full_name": cast(str, cand.full_name or ""), |
| "email": cast(str, cand.email or ""), |
| "score": score, |
| "coverage": float(details.get("coverage", 0)), |
| "matched_skills": list(details.get("matched_skills", [])), |
| "missing_skills": list(details.get("missing_skills", [])), |
| }) |
|
|
| ranked.sort(key=lambda item: item["score"], reverse=True) |
| return [RankAllResult(rank=idx + 1, **entry) for idx, entry in enumerate(ranked)] |
|
|
|
|
| @router.post("/{criteria_id:int}/results", response_model=List[CriteriaMatchResultResponse]) |
| async def launch_matching_for_criteria( |
| criteria_id: int, |
| db: Session = Depends(get_db), |
| ): |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
|
|
| results = _score_all_candidates(criteria, db) |
| stored_results = _persist_match_results(db, criteria_id, results) |
|
|
| return [_format_stored_result(stored_result) for stored_result in sorted(stored_results, key=lambda item: item.score, reverse=True)] |
|
|
|
|
| @router.post("/{criteria_id}/results/", response_model=List[CriteriaMatchResultResponse]) |
| async def launch_matching_for_criteria_with_trailing_slash( |
| criteria_id: int, |
| db: Session = Depends(get_db), |
| ): |
| return await launch_matching_for_criteria(criteria_id, db) |
|
|
|
|
| @router.get("/{criteria_id:int}/results", response_model=List[CriteriaMatchResultResponse]) |
| async def get_matching_results_for_criteria( |
| criteria_id: int, |
| db: Session = Depends(get_db), |
| ): |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
|
|
| stored_results = db.query(MatchResult).filter(MatchResult.criteria_id == criteria_id).order_by(MatchResult.score.desc(), MatchResult.id.asc()).all() |
| if not stored_results: |
| stored_results = _persist_match_results(db, criteria_id, _score_all_candidates(criteria, db)) |
|
|
| return [_format_stored_result(result) for result in stored_results] |
|
|
|
|
| @router.get("/{criteria_id}/results/", response_model=List[CriteriaMatchResultResponse]) |
| async def get_matching_results_for_criteria_with_trailing_slash( |
| criteria_id: int, |
| db: Session = Depends(get_db), |
| ): |
| return await get_matching_results_for_criteria(criteria_id, db) |
|
|
|
|
| |
| |
| |
|
|
| @router.get("/results", response_model=List[MatchResultResponse]) |
| async def get_match_results( |
| criteria_id: Optional[int] = None, |
| candidate_id: Optional[int] = None, |
| skip: int = 0, |
| limit: int = 100, |
| db: Session = Depends(get_db) |
| ): |
| """Get all match results""" |
| query = db.query(MatchResult) |
| |
| if criteria_id is not None: |
| query = query.filter(MatchResult.criteria_id == criteria_id) |
| if candidate_id is not None: |
| query = query.filter(MatchResult.candidate_id == candidate_id) |
| |
| results = query.offset(skip).limit(limit).all() |
| return results |
|
|
|
|
| @router.get("/criteria/{criteria_id}", response_model=JobCriteriaResponse) |
| async def get_criteria( |
| criteria_id: int, |
| db: Session = Depends(get_db) |
| ): |
| """Get criteria details""" |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
| |
| return JobCriteriaResponse( |
| id=cast(int, criteria.id), |
| recruiter_id=cast(int, criteria.recruiter_id), |
| title=cast(str, criteria.title), |
| description=cast(str, criteria.description), |
| created_at=cast(datetime, criteria.created_at) |
| ) |
|
|
|
|
| def _predict_fallback(criteria_id: int, top_k: int, db: Session) -> dict: |
| """Return predict-shaped results using CosineScorer when no ML model file is available. |
| |
| Returning 404 when the .joblib file is absent misleads clients into thinking |
| the route does not exist. This fallback keeps the endpoint alive with the |
| same response schema, using the same rule-based scorer as /rank-all. |
| """ |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
| results = _score_all_candidates(criteria, db)[:top_k] |
| return { |
| "criteria_id": criteria_id, |
| "model": "cosine_fallback", |
| "top_k": top_k, |
| "results": [ |
| { |
| "candidate_id": r.candidate_id, |
| "full_name": r.candidate_name, |
| "email": r.candidate_email, |
| "predicted_score": r.score, |
| "decision": _decision_from_score(r.score), |
| "coverage": r.coverage, |
| "matched_skills": r.matched_skills, |
| "missing_skills": r.missing_skills, |
| "skill_breakdown": [ |
| { |
| "skill": item.skill, |
| "present": item.present, |
| "weight": item.weight, |
| "matched": item.present, |
| } |
| for item in r.skill_breakdown |
| ], |
| "summary": r.summary, |
| } |
| for r in results |
| ], |
| } |
|
|
|
|
| @router.post("/{criteria_id:int}/predict", response_model=PredictCriteriaResponse) |
| async def predict_for_criteria( |
| criteria_id: int, |
| top_k: int = 20, |
| model_type: str = "baseline", |
| db: Session = Depends(get_db) |
| ): |
| """ |
| Predict match probabilities for all candidates for a given criteria. |
| model_type can be 'baseline' or 'siamese'. |
| Returns top_k candidates with predicted score (0-100). |
| Falls back to CosineScorer when no ML model file is available. |
| """ |
| selected_model_type = model_type.strip().lower() |
| if selected_model_type not in {"baseline", "siamese"}: |
| raise HTTPException(status_code=400, detail="model_type must be 'baseline' or 'siamese'") |
|
|
| model = None |
| meta: dict = {} |
|
|
| if selected_model_type == "baseline": |
| model_bundle = _load_baseline_model() |
| if not model_bundle: |
| return _predict_fallback(criteria_id, top_k, db) |
| model = model_bundle.get('model') |
| meta = model_bundle.get('meta') or {} |
| else: |
| model, model_path = _load_siamese_model() |
| if model is None: |
| return _predict_fallback(criteria_id, top_k, db) |
|
|
| criteria = db.query(JobCriteria).filter(JobCriteria.id == criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Criteria not found") |
|
|
| |
| criteria_skills = db.query(CriteriaSkill).filter(CriteriaSkill.criteria_id == criteria.id).all() |
| skill_names = [cs.skill.name for cs in criteria_skills] |
| job_text = f"{criteria.title} \n {criteria.description} \n Skills: {'; '.join(skill_names)}" |
|
|
| candidates = db.query(Candidate).filter( |
| ((Candidate.is_fully_extracted == True) | (Candidate.extraction_quality_score >= 80)), |
| Candidate.full_name.isnot(None), |
| Candidate.full_name != "Unknown", |
| Candidate.full_name != "", |
| Candidate.raw_text.isnot(None) |
| ).all() |
| scored: List[Tuple[Candidate, float, Dict[str, object]]] = [] |
|
|
| for cand in candidates: |
| |
| try: |
| skills = [cs.skill.name for cs in getattr(cand, 'candidate_skills', [])] |
| except Exception: |
| skills = [] |
|
|
| extracted = [] |
| try: |
| extracted.extend(json.loads(cand.extracted_job_titles or '[]')) |
| except Exception: |
| pass |
| try: |
| extracted.extend(json.loads(cand.extracted_companies or '[]')) |
| except Exception: |
| pass |
|
|
| candidate_text = f"{cand.full_name or ''} \n {'; '.join(skills)} \n {'; '.join(extracted)}" |
|
|
| explainability = _build_prediction_explainability(cand, criteria, criteria_skills) |
|
|
| try: |
| if selected_model_type == "baseline": |
| X = _build_pair_features_single(candidate_text, job_text, meta) |
| prob = None |
| try: |
| prob = model.predict_proba(X)[:,1][0] |
| except Exception: |
| try: |
| prob = model.decision_function(X)[0] |
| prob = 1 / (1 + np.exp(-prob)) |
| except Exception: |
| prob = float(model.predict(X)[0]) |
| score_pct = float(np.clip(prob * 100, 0, 100)) |
| else: |
| score_pct = _score_with_siamese(model, candidate_text, job_text) |
| except Exception: |
| score_pct = 0.0 |
|
|
| scored.append((cand, score_pct, explainability)) |
|
|
| scored.sort(key=lambda t: t[1], reverse=True) |
|
|
| results = [] |
| for cand, score, explainability in scored[:top_k]: |
| results.append({ |
| 'candidate_id': cand.id, |
| 'full_name': cand.full_name, |
| 'email': cand.email, |
| 'predicted_score': score, |
| 'decision': _decision_from_score(score), |
| 'coverage': explainability.get('coverage', 0), |
| 'matched_skills': explainability.get('matched_skills', []), |
| 'missing_skills': explainability.get('missing_skills', []), |
| 'skill_breakdown': explainability.get('skill_breakdown', []), |
| 'summary': explainability.get('summary', ''), |
| }) |
|
|
| return {'criteria_id': criteria_id, 'model': selected_model_type, 'top_k': top_k, 'results': results} |
|
|
|
|
| |
|
|
| class ExplainabilityRequest(BaseModel): |
| """Request for match explanation.""" |
| candidate_id: int |
| job_criteria_id: int |
|
|
|
|
| class ExplainabilityResponse(BaseModel): |
| """Response with human-readable match explanation.""" |
| candidate_name: str |
| job_title: str |
| overall_score: float |
| interpretation: str |
| matching_skills: list[str] |
| missing_skills: list[str] |
| experience_alignment: str |
| key_reason: str |
| recommendations: list[str] |
|
|
|
|
| @router.post("/match-explanation", response_model=ExplainabilityResponse) |
| def get_match_explanation( |
| request: ExplainabilityRequest, |
| db: Session = Depends(get_db), |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Generate human-readable explanation for why a candidate matches (or doesn't match) a job. |
| |
| Phase 2 Feature: LLM-style explicability for recruiter decision-making. |
| """ |
| |
| candidate = db.query(Candidate).filter(Candidate.id == request.candidate_id).first() |
| if not candidate: |
| raise HTTPException(status_code=404, detail="Candidate not found") |
| |
| |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == request.job_criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Job criteria not found") |
| |
| try: |
| criteria_skills = _load_criteria_skills(criteria.id, db) |
| skill_universe = build_skill_universe(db) |
| score, details = score_candidate_against_criteria(candidate, criteria_skills, skill_universe) |
|
|
| match_score = { |
| "match_score": float(score) / 100.0, |
| "text_similarity": float(details.get("similarity", 0.0)), |
| "skills_match": float(details.get("coverage", 0.0)) / 100.0, |
| } |
|
|
| matching_skills = [str(item) for item in details.get("matched_skills", [])] |
| missing_skills = [str(item) for item in details.get("missing_skills", [])] |
|
|
| |
| |
| explanation = generate_explanation( |
| candidate_name=candidate.full_name, |
| job_title=criteria.title, |
| match_score=match_score, |
| matching_skills=matching_skills, |
| missing_skills=missing_skills, |
| candidate_years_exp=float(getattr(candidate, "years_of_experience", 0) or 0), |
| required_years_exp=float(getattr(criteria, "years_of_experience_required", 0) or 0), |
| ) |
| except Exception as exc: |
| |
| skill_names = [] |
| for candidate_skill in getattr(candidate, "candidate_skills", []) or []: |
| skill = getattr(candidate_skill, "skill", None) |
| skill_name = getattr(skill, "name", None) |
| if skill_name: |
| skill_names.append(str(skill_name)) |
|
|
| explanation = generate_explanation( |
| candidate_name=candidate.full_name, |
| job_title=criteria.title, |
| match_score={"match_score": 0.0, "text_similarity": 0.0, "skills_match": 0.0}, |
| matching_skills=skill_names[:5], |
| missing_skills=[skill.get("name", "") for skill in _load_criteria_skills(criteria.id, db)[:5]], |
| candidate_years_exp=float(getattr(candidate, "years_of_experience", 0) or 0), |
| required_years_exp=float(getattr(criteria, "years_of_experience_required", 0) or 0), |
| ) |
| |
| return ExplainabilityResponse( |
| candidate_name=explanation.candidate_name, |
| job_title=explanation.job_title, |
| overall_score=explanation.overall_score, |
| interpretation=explanation.interpretation, |
| matching_skills=explanation.matching_skills, |
| missing_skills=explanation.missing_skills, |
| experience_alignment=explanation.experience_alignment, |
| key_reason=explanation.key_reason, |
| recommendations=explanation.recommendations, |
| ) |
|
|
|
|
| class ShortlistSummaryRequest(BaseModel): |
| """Request for shortlist summary.""" |
| job_criteria_id: int |
|
|
|
|
| class ShortlistSummaryResponse(BaseModel): |
| """Summary of candidate shortlist.""" |
| total_candidates_screened: int |
| strong_matches: int |
| moderate_matches: int |
| top_skills_in_pool: list[str] |
| recommendations: list[str] |
|
|
|
|
| @router.post("/shortlist-summary", response_model=ShortlistSummaryResponse) |
| def get_shortlist_summary( |
| request: ShortlistSummaryRequest, |
| db: Session = Depends(get_db), |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Generate summary insights for a job's candidate shortlist. |
| |
| Phase 2 Feature: Strategic recommendations for recruitment workflow. |
| """ |
| |
| criteria = db.query(JobCriteria).filter(JobCriteria.id == request.job_criteria_id).first() |
| if not criteria: |
| raise HTTPException(status_code=404, detail="Job criteria not found") |
| |
| |
| match_results = db.query(MatchResult).filter(MatchResult.criteria_id == request.job_criteria_id).all() |
| |
| |
| matches = [] |
| for result in match_results: |
| candidate = db.query(Candidate).filter(Candidate.id == result.candidate_id).first() |
| if candidate: |
| matches.append({ |
| "candidate_id": candidate.id, |
| "full_name": candidate.full_name, |
| "score": result.score / 100.0, |
| "matching_skills": result.matched_skills or [], |
| }) |
| |
| |
| summary = generate_shortlist_summary(matches, criteria.title, top_n=5) |
| |
| return ShortlistSummaryResponse( |
| total_candidates_screened=summary["total_candidates_screened"], |
| strong_matches=summary["strong_matches"], |
| moderate_matches=summary["moderate_matches"], |
| top_skills_in_pool=summary["top_skills_in_pool"], |
| recommendations=summary["recommendations"], |
| ) |
| from app.services.matching_engine import build_skill_universe, build_explanation_payload, score_candidate_against_criteria, generate_enriched_explanation |
|
|