File size: 8,400 Bytes
9df97a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""Semantic skill matching using modern open-source embeddings + optional FAISS.

Default model:
- BAAI/bge-small-en

Key capabilities:
- text embedding cache
- cosine similarity
- optional FAISS inner-product index for fast nearest-neighbor lookup
"""

from __future__ import annotations

import os
from typing import Dict, List, Optional, Tuple

import numpy as np

try:
    from sentence_transformers import SentenceTransformer

    SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
    SENTENCE_TRANSFORMERS_AVAILABLE = False
    print("Warning: sentence-transformers not installed. Run: pip install sentence-transformers")

try:
    import faiss  # type: ignore

    FAISS_AVAILABLE = True
except ImportError:
    FAISS_AVAILABLE = False


class SemanticSkillMatcher:
    """Match candidate skills with weighted criteria using semantic embeddings."""

    MODEL_NAME = os.getenv("SEMANTIC_EMBEDDING_MODEL", "BAAI/bge-small-en")
    DEFAULT_THRESHOLD = float(os.getenv("SEMANTIC_MATCH_THRESHOLD", "0.60"))

    _model = None
    _embedding_cache: Dict[str, np.ndarray] = {}
    
    @classmethod
    def _load_model(cls) -> Optional["SentenceTransformer"]:
        """Load and cache the sentence-transformers model once."""
        if cls._model is not None:
            return cls._model

        if not SENTENCE_TRANSFORMERS_AVAILABLE:
            print("sentence-transformers not available")
            return None

        try:
            print(f"Loading {cls.MODEL_NAME}...")
            cls._model = SentenceTransformer(cls.MODEL_NAME)
            print(f"✓ Model loaded successfully. Embedding dimension: {cls._model.get_sentence_embedding_dimension()}")
            return cls._model
        except Exception as e:
            print(f"Error loading model: {e}")
            return None
    
    @staticmethod
    def _normalize(vecs: np.ndarray) -> np.ndarray:
        """L2 normalize vectors for cosine similarity via dot product."""
        norms = np.linalg.norm(vecs, axis=1, keepdims=True)
        norms = np.where(norms == 0, 1.0, norms)
        return vecs / norms

    @classmethod
    def get_embedding(cls, text: str) -> Optional[np.ndarray]:
        """Get one normalized embedding and cache it."""
        key = text.strip().lower()
        if key in cls._embedding_cache:
            return cls._embedding_cache[key]

        model = cls._load_model()
        if model is None:
            return None

        try:
            embedding = model.encode([text], convert_to_numpy=True).astype(np.float32)
            embedding = cls._normalize(embedding)[0]
            cls._embedding_cache[key] = embedding
            return embedding
        except Exception as e:
            print(f"Error embedding text '{text}': {e}")
            return None
    
    @classmethod
    def get_embeddings_batch(cls, texts: List[str]) -> Optional[np.ndarray]:
        """Get normalized embeddings for multiple texts and cache each item."""
        model = cls._load_model()
        if model is None:
            return None

        try:
            embeddings = model.encode(texts, convert_to_numpy=True).astype(np.float32)
            embeddings = cls._normalize(embeddings)

            for text, embedding in zip(texts, embeddings):
                cls._embedding_cache[text.strip().lower()] = embedding

            return embeddings
        except Exception as e:
            print(f"Error getting batch embeddings: {e}")
            return None
    
    @classmethod
    def semantic_similarity(cls, text1: str, text2: str) -> float:
        """Cosine similarity in [0, 1] between two texts."""
        embed1 = cls.get_embedding(text1)
        embed2 = cls.get_embedding(text2)

        if embed1 is None or embed2 is None:
            return 0.0

        similarity = float(np.dot(embed1, embed2))
        return float(np.clip(similarity, 0.0, 1.0))

    @classmethod
    def build_faiss_index(cls, corpus: List[str]) -> Optional[Tuple["faiss.IndexFlatIP", List[str]]]:
        """Build a FAISS inner-product index for a corpus of phrases."""
        if not FAISS_AVAILABLE:
            return None

        cleaned = [item.strip() for item in corpus if item and item.strip()]
        if not cleaned:
            return None

        embeddings = cls.get_embeddings_batch(cleaned)
        if embeddings is None:
            return None

        index = faiss.IndexFlatIP(embeddings.shape[1])
        index.add(embeddings.astype(np.float32))
        return index, cleaned

    @classmethod
    def search_similar(cls, query: str, corpus: List[str], top_k: int = 5) -> List[Tuple[str, float]]:
        """Return top-k most similar corpus entries for query."""
        if not corpus:
            return []

        top_k = max(1, min(top_k, len(corpus)))
        q = cls.get_embedding(query)
        if q is None:
            return []

        # Use FAISS when available.
        index_bundle = cls.build_faiss_index(corpus)
        if index_bundle is not None:
            index, cleaned = index_bundle
            scores, idxs = index.search(np.expand_dims(q.astype(np.float32), axis=0), top_k)
            return [
                (cleaned[int(i)], float(np.clip(scores[0][rank], 0.0, 1.0)))
                for rank, i in enumerate(idxs[0])
                if int(i) >= 0
            ]

        # Fallback brute force.
        similarities = [(item, cls.semantic_similarity(query, item)) for item in corpus]
        similarities.sort(key=lambda pair: pair[1], reverse=True)
        return similarities[:top_k]
    
    @classmethod
    def match_candidate_skills(
        cls,
        candidate_skills: List[str],
        criteria_skills: List[Dict[str, object]],
        threshold: float = DEFAULT_THRESHOLD,
    ) -> Dict[str, object]:
        """Match candidate skills to weighted criteria with semantic nearest-neighbor."""
        if not candidate_skills or not criteria_skills:
            return {
                "matched_skills": [],
                "score": 0.0,
                "details": "No skills to match",
            }

        candidate_skills_clean = [s.strip() for s in candidate_skills if s and s.strip()]
        if not candidate_skills_clean:
            return {
                "matched_skills": [],
                "score": 0.0,
                "details": "No candidate skills available",
            }

        matched_skills: List[Dict[str, object]] = []
        total_weight = 0
        total_matched_weight = 0

        for criteria in criteria_skills:
            criteria_name = str(criteria.get("name", "")).strip()
            criteria_weight = int(criteria.get("weight", 50) or 50)

            if not criteria_name:
                continue

            total_weight += criteria_weight

            nearest = cls.search_similar(criteria_name, candidate_skills_clean, top_k=1)
            if not nearest:
                continue

            best_match, best_similarity = nearest[0]
            if best_similarity >= threshold:
                total_matched_weight += criteria_weight
                matched_skills.append({
                    "criteria_skill": criteria_name.lower(),
                    "matched_skill": best_match,
                    "similarity": float(best_similarity),
                    "weight": criteria_weight,
                })

        overall_score = (total_matched_weight / total_weight * 100) if total_weight > 0 else 0.0

        return {
            "matched_skills": matched_skills,
            "score": float(np.clip(overall_score, 0.0, 100.0)),
            "total_matches": len(matched_skills),
            "total_criteria": len(criteria_skills),
            "details": f"Matched {len(matched_skills)}/{len(criteria_skills)} criteria skills",
        }
    
    @classmethod
    def clear_cache(cls):
        """Clear embedding cache."""
        cls._embedding_cache.clear()
    
    @classmethod
    def get_cache_size(cls) -> int:
        """Return number of cached embeddings."""
        return len(cls._embedding_cache)


# Utility function for simple similarity check
def semantic_skill_match(skill1: str, skill2: str, threshold: float = 0.6) -> Tuple[bool, float]:
    """Simple helper that returns boolean semantic match + similarity."""
    similarity = SemanticSkillMatcher.semantic_similarity(skill1, skill2)
    is_match = similarity >= threshold
    return is_match, similarity