| """Semantic skill matching using modern open-source embeddings + optional FAISS. |
| |
| Default model: |
| - BAAI/bge-small-en |
| |
| Key capabilities: |
| - text embedding cache |
| - cosine similarity |
| - optional FAISS inner-product index for fast nearest-neighbor lookup |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| from typing import Dict, List, Optional, Tuple |
|
|
| import numpy as np |
|
|
| try: |
| from sentence_transformers import SentenceTransformer |
|
|
| SENTENCE_TRANSFORMERS_AVAILABLE = True |
| except ImportError: |
| SENTENCE_TRANSFORMERS_AVAILABLE = False |
| print("Warning: sentence-transformers not installed. Run: pip install sentence-transformers") |
|
|
| try: |
| import faiss |
|
|
| FAISS_AVAILABLE = True |
| except ImportError: |
| FAISS_AVAILABLE = False |
|
|
|
|
| class SemanticSkillMatcher: |
| """Match candidate skills with weighted criteria using semantic embeddings.""" |
|
|
| MODEL_NAME = os.getenv("SEMANTIC_EMBEDDING_MODEL", "BAAI/bge-small-en") |
| DEFAULT_THRESHOLD = float(os.getenv("SEMANTIC_MATCH_THRESHOLD", "0.60")) |
|
|
| _model = None |
| _embedding_cache: Dict[str, np.ndarray] = {} |
| |
| @classmethod |
| def _load_model(cls) -> Optional["SentenceTransformer"]: |
| """Load and cache the sentence-transformers model once.""" |
| if cls._model is not None: |
| return cls._model |
|
|
| if not SENTENCE_TRANSFORMERS_AVAILABLE: |
| print("sentence-transformers not available") |
| return None |
|
|
| try: |
| print(f"Loading {cls.MODEL_NAME}...") |
| cls._model = SentenceTransformer(cls.MODEL_NAME) |
| print(f"✓ Model loaded successfully. Embedding dimension: {cls._model.get_sentence_embedding_dimension()}") |
| return cls._model |
| except Exception as e: |
| print(f"Error loading model: {e}") |
| return None |
| |
| @staticmethod |
| def _normalize(vecs: np.ndarray) -> np.ndarray: |
| """L2 normalize vectors for cosine similarity via dot product.""" |
| norms = np.linalg.norm(vecs, axis=1, keepdims=True) |
| norms = np.where(norms == 0, 1.0, norms) |
| return vecs / norms |
|
|
| @classmethod |
| def get_embedding(cls, text: str) -> Optional[np.ndarray]: |
| """Get one normalized embedding and cache it.""" |
| key = text.strip().lower() |
| if key in cls._embedding_cache: |
| return cls._embedding_cache[key] |
|
|
| model = cls._load_model() |
| if model is None: |
| return None |
|
|
| try: |
| embedding = model.encode([text], convert_to_numpy=True).astype(np.float32) |
| embedding = cls._normalize(embedding)[0] |
| cls._embedding_cache[key] = embedding |
| return embedding |
| except Exception as e: |
| print(f"Error embedding text '{text}': {e}") |
| return None |
| |
| @classmethod |
| def get_embeddings_batch(cls, texts: List[str]) -> Optional[np.ndarray]: |
| """Get normalized embeddings for multiple texts and cache each item.""" |
| model = cls._load_model() |
| if model is None: |
| return None |
|
|
| try: |
| embeddings = model.encode(texts, convert_to_numpy=True).astype(np.float32) |
| embeddings = cls._normalize(embeddings) |
|
|
| for text, embedding in zip(texts, embeddings): |
| cls._embedding_cache[text.strip().lower()] = embedding |
|
|
| return embeddings |
| except Exception as e: |
| print(f"Error getting batch embeddings: {e}") |
| return None |
| |
| @classmethod |
| def semantic_similarity(cls, text1: str, text2: str) -> float: |
| """Cosine similarity in [0, 1] between two texts.""" |
| embed1 = cls.get_embedding(text1) |
| embed2 = cls.get_embedding(text2) |
|
|
| if embed1 is None or embed2 is None: |
| return 0.0 |
|
|
| similarity = float(np.dot(embed1, embed2)) |
| return float(np.clip(similarity, 0.0, 1.0)) |
|
|
| @classmethod |
| def build_faiss_index(cls, corpus: List[str]) -> Optional[Tuple["faiss.IndexFlatIP", List[str]]]: |
| """Build a FAISS inner-product index for a corpus of phrases.""" |
| if not FAISS_AVAILABLE: |
| return None |
|
|
| cleaned = [item.strip() for item in corpus if item and item.strip()] |
| if not cleaned: |
| return None |
|
|
| embeddings = cls.get_embeddings_batch(cleaned) |
| if embeddings is None: |
| return None |
|
|
| index = faiss.IndexFlatIP(embeddings.shape[1]) |
| index.add(embeddings.astype(np.float32)) |
| return index, cleaned |
|
|
| @classmethod |
| def search_similar(cls, query: str, corpus: List[str], top_k: int = 5) -> List[Tuple[str, float]]: |
| """Return top-k most similar corpus entries for query.""" |
| if not corpus: |
| return [] |
|
|
| top_k = max(1, min(top_k, len(corpus))) |
| q = cls.get_embedding(query) |
| if q is None: |
| return [] |
|
|
| |
| index_bundle = cls.build_faiss_index(corpus) |
| if index_bundle is not None: |
| index, cleaned = index_bundle |
| scores, idxs = index.search(np.expand_dims(q.astype(np.float32), axis=0), top_k) |
| return [ |
| (cleaned[int(i)], float(np.clip(scores[0][rank], 0.0, 1.0))) |
| for rank, i in enumerate(idxs[0]) |
| if int(i) >= 0 |
| ] |
|
|
| |
| similarities = [(item, cls.semantic_similarity(query, item)) for item in corpus] |
| similarities.sort(key=lambda pair: pair[1], reverse=True) |
| return similarities[:top_k] |
| |
| @classmethod |
| def match_candidate_skills( |
| cls, |
| candidate_skills: List[str], |
| criteria_skills: List[Dict[str, object]], |
| threshold: float = DEFAULT_THRESHOLD, |
| ) -> Dict[str, object]: |
| """Match candidate skills to weighted criteria with semantic nearest-neighbor.""" |
| if not candidate_skills or not criteria_skills: |
| return { |
| "matched_skills": [], |
| "score": 0.0, |
| "details": "No skills to match", |
| } |
|
|
| candidate_skills_clean = [s.strip() for s in candidate_skills if s and s.strip()] |
| if not candidate_skills_clean: |
| return { |
| "matched_skills": [], |
| "score": 0.0, |
| "details": "No candidate skills available", |
| } |
|
|
| matched_skills: List[Dict[str, object]] = [] |
| total_weight = 0 |
| total_matched_weight = 0 |
|
|
| for criteria in criteria_skills: |
| criteria_name = str(criteria.get("name", "")).strip() |
| criteria_weight = int(criteria.get("weight", 50) or 50) |
|
|
| if not criteria_name: |
| continue |
|
|
| total_weight += criteria_weight |
|
|
| nearest = cls.search_similar(criteria_name, candidate_skills_clean, top_k=1) |
| if not nearest: |
| continue |
|
|
| best_match, best_similarity = nearest[0] |
| if best_similarity >= threshold: |
| total_matched_weight += criteria_weight |
| matched_skills.append({ |
| "criteria_skill": criteria_name.lower(), |
| "matched_skill": best_match, |
| "similarity": float(best_similarity), |
| "weight": criteria_weight, |
| }) |
|
|
| overall_score = (total_matched_weight / total_weight * 100) if total_weight > 0 else 0.0 |
|
|
| return { |
| "matched_skills": matched_skills, |
| "score": float(np.clip(overall_score, 0.0, 100.0)), |
| "total_matches": len(matched_skills), |
| "total_criteria": len(criteria_skills), |
| "details": f"Matched {len(matched_skills)}/{len(criteria_skills)} criteria skills", |
| } |
| |
| @classmethod |
| def clear_cache(cls): |
| """Clear embedding cache.""" |
| cls._embedding_cache.clear() |
| |
| @classmethod |
| def get_cache_size(cls) -> int: |
| """Return number of cached embeddings.""" |
| return len(cls._embedding_cache) |
|
|
|
|
| |
| def semantic_skill_match(skill1: str, skill2: str, threshold: float = 0.6) -> Tuple[bool, float]: |
| """Simple helper that returns boolean semantic match + similarity.""" |
| similarity = SemanticSkillMatcher.semantic_similarity(skill1, skill2) |
| is_match = similarity >= threshold |
| return is_match, similarity |
|
|