avinash-rai commited on
Commit
d045bbd
·
1 Parent(s): 467e1fc

Fix: Stop rebuild_intelligence_baseline on every turn, skip LLM detection when sticky

Browse files
app/agents/intelligence_extractor.py CHANGED
@@ -17,6 +17,7 @@ from app.utils.logger import AgentLogger
17
  from app.utils.json_utils import robust_json_loads
18
  from app.config import settings
19
  import re
 
20
 
21
 
22
  class IntelligenceExtractor:
@@ -126,6 +127,24 @@ class IntelligenceExtractor:
126
  async def llm_extract(self, message: str, context: Optional[Any] = None) -> Dict[str, List[str]]:
127
  """Perform semantic extraction using the LLM."""
128
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
  # Define Strict Schema for Intelligence
130
  schema = {
131
  "type": "object",
@@ -184,7 +203,16 @@ class IntelligenceExtractor:
184
  def clean_list(lst):
185
  return [str(v).strip() for v in lst if v and len(str(v).strip()) > 3]
186
 
187
- return {k: clean_list(v) for k, v in data.items() if isinstance(v, list)}
 
 
 
 
 
 
 
 
 
188
 
189
  except Exception as e:
190
  self.logger.error("LLM Extraction failed", error=str(e))
 
17
  from app.utils.json_utils import robust_json_loads
18
  from app.config import settings
19
  import re
20
+ from app.core.session_cache import session_cache
21
 
22
 
23
  class IntelligenceExtractor:
 
127
  async def llm_extract(self, message: str, context: Optional[Any] = None) -> Dict[str, List[str]]:
128
  """Perform semantic extraction using the LLM."""
129
  try:
130
+ # --- SESSION CACHE CHECK (Fast-win) ---
131
+ session_id = None
132
+ try:
133
+ if context is not None:
134
+ # TurnContext has `.session` or `.session_id`
135
+ session = getattr(context, "session", None)
136
+ if isinstance(session, dict):
137
+ session_id = session.get("id") or session.get("conversation_id") or session.get("session_id")
138
+ if not session_id:
139
+ session_id = getattr(context, "session_id", None)
140
+ except Exception:
141
+ session_id = None
142
+
143
+ cached = session_cache.get(session_id, message)
144
+ if cached:
145
+ self.logger.info("LLM extraction cache HIT", session_id=session_id)
146
+ return cached
147
+
148
  # Define Strict Schema for Intelligence
149
  schema = {
150
  "type": "object",
 
203
  def clean_list(lst):
204
  return [str(v).strip() for v in lst if v and len(str(v).strip()) > 3]
205
 
206
+ cleaned = {k: clean_list(v) for k, v in data.items() if isinstance(v, list)}
207
+
208
+ # Store in session cache for subsequent turns
209
+ try:
210
+ if session_id:
211
+ session_cache.set(session_id, message, cleaned)
212
+ except Exception:
213
+ pass
214
+
215
+ return cleaned
216
 
217
  except Exception as e:
218
  self.logger.error("LLM Extraction failed", error=str(e))
app/agents/orchestrator.py CHANGED
@@ -305,15 +305,18 @@ class HoneypotOrchestrator:
305
  # but we can check if the last turn was 'conclude' or 'escalate'
306
 
307
  if existing_scam: # If sticky, we can run extraction immediately with high confidence
308
- detection_task = asyncio.Future()
309
- detection_task.set_result({
310
  "scam_type": existing_scam,
311
  "confidence": conversation.get("scam_confidence", 0.9),
312
  "is_scam": True,
313
  "reasoning": "Sticky detection from session memory",
314
- "source": "memory"
315
- })
316
- extraction_task = self.intel_extractor.extract(
 
 
 
317
  message,
318
  context=ctx,
319
  turn_count=message_count,
@@ -321,7 +324,6 @@ class HoneypotOrchestrator:
321
  current_confidence=conversation.get("scam_confidence", 0.9),
322
  behavior_changed=behavior_changed
323
  )
324
- detection, intelligence = await asyncio.gather(detection_task, extraction_task)
325
  else:
326
  # If not sticky, we MUST run detection first to get 'current_confidence' for extraction novelty
327
  try:
 
305
  # but we can check if the last turn was 'conclude' or 'escalate'
306
 
307
  if existing_scam: # If sticky, we can run extraction immediately with high confidence
308
+ # 🔥 SKIP LLM DETECTION entirely - use cached result
309
+ detection = {
310
  "scam_type": existing_scam,
311
  "confidence": conversation.get("scam_confidence", 0.9),
312
  "is_scam": True,
313
  "reasoning": "Sticky detection from session memory",
314
+ "source": "memory",
315
+ "matched_keywords": conversation.get("aggregated_intelligence", {}).get("keywords", [])
316
+ }
317
+ # 🔥 SKIP LLM EXTRACTION on most turns - regex is sufficient
318
+ # Only do LLM extraction on turn 1 and every 5th turn (handled inside extract())
319
+ intelligence = await self.intel_extractor.extract(
320
  message,
321
  context=ctx,
322
  turn_count=message_count,
 
324
  current_confidence=conversation.get("scam_confidence", 0.9),
325
  behavior_changed=behavior_changed
326
  )
 
327
  else:
328
  # If not sticky, we MUST run detection first to get 'current_confidence' for extraction novelty
329
  try:
app/core/session_cache.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import hashlib
3
+ from collections import OrderedDict
4
+ from typing import Any, Dict, Optional
5
+
6
+
7
+ class SessionCache:
8
+ """Lightweight in-memory per-session cache with TTL and simple LRU eviction.
9
+
10
+ Keyed by session_id -> dict of key -> (timestamp, value)
11
+ """
12
+
13
+ def __init__(self, max_per_session: int = 200, default_ttl: int = 300):
14
+ self.store: Dict[str, OrderedDict] = {}
15
+ self.max_per_session = max_per_session
16
+ self.default_ttl = default_ttl
17
+
18
+ @staticmethod
19
+ def _make_key(text: str) -> str:
20
+ return hashlib.sha1(text.encode('utf-8', errors='ignore')).hexdigest()
21
+
22
+ def get(self, session_id: Optional[str], text: str) -> Optional[Any]:
23
+ if not session_id:
24
+ return None
25
+ key = self._make_key(text)
26
+ bucket = self.store.get(session_id)
27
+ if not bucket:
28
+ return None
29
+ item = bucket.get(key)
30
+ if not item:
31
+ return None
32
+ ts, value = item
33
+ if time.time() - ts > self.default_ttl:
34
+ # expired
35
+ del bucket[key]
36
+ return None
37
+ # Move to end (LRU)
38
+ bucket.move_to_end(key)
39
+ return value
40
+
41
+ def set(self, session_id: Optional[str], text: str, value: Any) -> None:
42
+ if not session_id:
43
+ return
44
+ key = self._make_key(text)
45
+ bucket = self.store.get(session_id)
46
+ if not bucket:
47
+ bucket = OrderedDict()
48
+ self.store[session_id] = bucket
49
+ bucket[key] = (time.time(), value)
50
+ bucket.move_to_end(key)
51
+ # Evict oldest if over limit
52
+ while len(bucket) > self.max_per_session:
53
+ bucket.popitem(last=False)
54
+
55
+ def invalidate_session(self, session_id: str) -> None:
56
+ if session_id in self.store:
57
+ del self.store[session_id]
58
+
59
+
60
+ # Create a default global cache instance
61
+ session_cache = SessionCache()
app/utils/guvi_handler.py CHANGED
@@ -169,20 +169,26 @@ class GUVIHandler:
169
  )
170
 
171
  # Inject history
172
- if request.conversationHistory:
 
 
 
 
 
173
  try:
174
- # conv already fetched above
175
- # [SCORING] Replay FULL history from request to ensure state consistency
176
- # This prevents Turn 1 resets if database is purged or session ID shifts
177
  full_history = request.conversationHistory
178
- if len(conv.get("history", [])) < len(full_history):
179
- # Clear existing history and replay to ensure perfect sync
180
- # (Only if history is provided by platform)
 
 
 
181
  if hasattr(orchestrator.conversation_manager.memory, "clear"):
182
  await orchestrator.conversation_manager.memory.clear(session_id)
183
 
184
  for i, msg in enumerate(full_history):
185
- # Robust extraction from Any type msg
186
  h_text = ""
187
  h_sender = "scammer"
188
 
@@ -198,8 +204,6 @@ class GUVIHandler:
198
 
199
  if h_text:
200
  is_scammer = h_sender == "scammer"
201
- # [OPTIMIZATION] Use Regex extraction for history to avoid "Latency Bomb"
202
- # We assume history was already processed for logic in previous runs
203
  hist_intel = extract_all(h_text)
204
  await orchestrator.conversation_manager.update(
205
  conversation_id=session_id,
@@ -210,13 +214,17 @@ class GUVIHandler:
210
  scam_type=None, persona=None
211
  )
212
 
213
- # [SCORING] Finalize baseline rebuild (Guarded)
214
  if hasattr(orchestrator, "rebuild_intelligence_baseline"):
215
  await orchestrator.rebuild_intelligence_baseline(session_id)
 
 
 
 
 
216
  except Exception as hist_e:
217
  safe_error = str(hist_e).encode('utf-8', 'replace').decode('utf-8')
218
  logger.warning(f"Error parsing history: {safe_error}")
219
- # Continue anyway, history is secondary
220
 
221
  # [LATENCY] Turbo Mode: Only run expensive forensics (XAI) on the concluding turn.
222
  # We predict if this is the end using the unified lifecycle rules.
 
169
  )
170
 
171
  # Inject history
172
+ # 🔥 OPTIMIZATION: Only replay history ONCE per session (cold start recovery)
173
+ # Track with 'sys_history_synced' flag to avoid expensive rebuild every turn
174
+ agg_intel = conv.get("aggregated_intelligence", {})
175
+ already_synced = agg_intel.get("sys_history_synced", False)
176
+
177
+ if request.conversationHistory and not already_synced:
178
  try:
179
+ # Only sync if our DB is significantly behind (cold start scenario)
180
+ # A gap of 1-2 messages is normal turn progression, not cold start
 
181
  full_history = request.conversationHistory
182
+ db_len = len(conv.get("history", []))
183
+ history_gap = len(full_history) - db_len
184
+
185
+ # Only do full replay if gap >= 3 messages (actual cold start)
186
+ if history_gap >= 3:
187
+ logger.info(f"Cold start detected (gap={history_gap}). Replaying history...")
188
  if hasattr(orchestrator.conversation_manager.memory, "clear"):
189
  await orchestrator.conversation_manager.memory.clear(session_id)
190
 
191
  for i, msg in enumerate(full_history):
 
192
  h_text = ""
193
  h_sender = "scammer"
194
 
 
204
 
205
  if h_text:
206
  is_scammer = h_sender == "scammer"
 
 
207
  hist_intel = extract_all(h_text)
208
  await orchestrator.conversation_manager.update(
209
  conversation_id=session_id,
 
214
  scam_type=None, persona=None
215
  )
216
 
217
+ # Rebuild baseline ONCE
218
  if hasattr(orchestrator, "rebuild_intelligence_baseline"):
219
  await orchestrator.rebuild_intelligence_baseline(session_id)
220
+
221
+ # Mark synced to prevent future rebuilds this session
222
+ await orchestrator.conversation_manager.update_intelligence(
223
+ session_id, {"sys_history_synced": True}
224
+ )
225
  except Exception as hist_e:
226
  safe_error = str(hist_e).encode('utf-8', 'replace').decode('utf-8')
227
  logger.warning(f"Error parsing history: {safe_error}")
 
228
 
229
  # [LATENCY] Turbo Mode: Only run expensive forensics (XAI) on the concluding turn.
230
  # We predict if this is the end using the unified lifecycle rules.