from __future__ import annotations # ═══════════════════════════════════════════════════════════════════════════════ # File: app/agents/intelligence_extractor.py # Description: Intelligence extraction agent # ═══════════════════════════════════════════════════════════════════════════════ """Intelligence Extraction Agent for scam data gathering.""" from typing import Dict, List, Any, Optional, TYPE_CHECKING import json import asyncio from app.utils.extractors import extract_all, aggregate_intelligence, has_payment_info, has_contact_info, is_valid_phone, is_valid_upi if TYPE_CHECKING: from app.core.llm_client import LLMClient, ModelRole from app.core.prompts import INTELLIGENCE_EXTRACTION_PROMPT, MATH_FORENSIC_PROMPT from app.utils.logger import AgentLogger from app.utils.json_utils import robust_json_loads from app.config import settings import re from app.core.session_cache import session_cache class IntelligenceExtractor: """ Agent for extracting actionable intelligence from scam messages. Extracts: - Phone numbers (Indian format) - UPI IDs (all major providers) - Bank account numbers - IFSC codes - Emails and URLs - PAN and Aadhar numbers - Cryptocurrency addresses """ def __init__(self, llm_client: Optional['LLMClient'] = None): self.logger = AgentLogger("intelligence_extractor") self.llm_client = llm_client async def extract(self, message: str, context: Optional[Any] = None, turn_count: int = 1, last_confidence: float = 0.0, current_confidence: float = 0.0, behavior_changed: bool = False) -> Dict[str, Any]: """ Hybrid extraction pipeline using Regex and LLM. """ # Step 1: Run Regex pass (Fast & Reliable) intelligence = extract_all(message) # SOC FIX: Always compute baseline risk score from regex results intelligence["risk_score"] = self._calculate_risk_score(intelligence) # 🔥 THROTTLED EXTRACTION (LLM CALL OPTIMIZATION) # Logic: # 1. Always extract on Turn 1. # 2. Extract every 3rd turn (Turn 4, 7, 10...). # 3. Extract if Regex found *new* PII not seen before (handled by aggregate logic in orchestrator, but here we check message content highlights). # 4. OVERRIDE: Extract if confidence jumped significantly (>0.2) or behavior changed. # 🔥 ULTRA-THROTTLED: Only use LLM on turn 1 and every 5th turn # Regex is sufficient for intel extraction on most turns should_llm_extract = False if turn_count == 1: should_llm_extract = True elif turn_count % 5 == 0: # Every 5th turn instead of 3rd should_llm_extract = True # REMOVED: confidence jump trigger (causes false positives) # REMOVED: behavior_changed trigger (redundant) # REMOVED: payment/contact info trigger (regex handles this) # Step 2: Run LLM semantic pass (Context-aware) if should_llm_extract and self.llm_client and self.llm_client.is_available: try: llm_intel = await self.llm_extract(message, context=context) # Merge results (Deduplicate & Validate) from app.utils.extractors import is_valid_phone, is_valid_upi for key, values in llm_intel.items(): validated_values = [] for v in values: v_str = str(v).strip() # SOC-GRADE VALIDATION for specific types if len(v_str) <= 3: continue if key == "phone_numbers": # If LLM extracted something, strictly check if it looks like a phone number # Use regex validator or lenient length/digit check import re if re.search(r'\d{10}', v_str): validated_values.append(v_str) elif key == "upi_ids": if "@" in v_str and not " " in v_str: validated_values.append(v_str) else: validated_values.append(v_str) if key in intelligence and isinstance(intelligence[key], list): intelligence[key] = list(set(intelligence[key] + validated_values)) elif key not in intelligence and validated_values: intelligence[key] = validated_values # 🔥 AUGMENT RISK SCORE (Reactive to LLM findings) intelligence["risk_score"] = self._calculate_risk_score(intelligence) except Exception as e: self.logger.error(f"LLM Extraction failed: {e}. Falling back to Pure Regex.") # 🧮 MATH FORENSICS (Forensic Clinic Upgrade) if settings.ENABLE_MATH_FORENSICS: try: math_intel = await self._run_math_forensics(message) if math_intel: intelligence["forensic_analysis"] = math_intel if math_intel.get("forensic_flag") == "RED_FLAG": intelligence["risk_score"] = min(100, intelligence["risk_score"] + 30) except Exception as e: self.logger.warning(f"Math forensics failed: {e}") # Calculate derived metrics intelligence["scam_confidence"] = self._calculate_confidence(intelligence) intelligence["risk_level"] = self._get_risk_level(intelligence["risk_score"]) # Log findings masked_intel = self.mask_intelligence(intelligence) found = {k: v for k, v in masked_intel.items() if v and k not in ["risk_score", "scam_confidence", "risk_level"]} if found: self.logger.info("Intelligence extracted (Hybrid)", types=list(found.keys())) return intelligence async def llm_extract(self, message: str, context: Optional[Any] = None) -> Dict[str, List[str]]: """Perform semantic extraction using the LLM.""" try: # --- SESSION CACHE CHECK (Fast-win) --- session_id = None try: if context is not None: # TurnContext has `.session` or `.session_id` session = getattr(context, "session", None) if isinstance(session, dict): session_id = session.get("id") or session.get("conversation_id") or session.get("session_id") if not session_id: session_id = getattr(context, "session_id", None) except Exception: session_id = None cached = session_cache.get(session_id, message) if cached: self.logger.info("LLM extraction cache HIT", session_id=session_id) return cached # Define Strict Schema for Intelligence schema = { "type": "object", "properties": { "phone_numbers": {"type": "array", "items": {"type": "string"}}, "upi_ids": {"type": "array", "items": {"type": "string"}}, "bank_accounts": {"type": "array", "items": {"type": "string"}}, "urls": {"type": "array", "items": {"type": "string"}}, "crypto_addresses": {"type": "array", "items": {"type": "string"}}, "emails": {"type": "array", "items": {"type": "string"}}, "ifsc_codes": {"type": "array", "items": {"type": "string"}}, "names": {"type": "array", "items": {"type": "string"}}, "pan_cards": {"type": "array", "items": {"type": "string"}}, "aadhar_numbers": {"type": "array", "items": {"type": "string"}}, "credit_cards": {"type": "array", "items": {"type": "string"}}, "otps": {"type": "array", "items": {"type": "string"}}, "rat_apps": {"type": "array", "items": {"type": "string"}} }, "required": [ "phone_numbers", "upi_ids", "bank_accounts", "urls", "crypto_addresses", "emails", "ifsc_codes", "names", "pan_cards", "aadhar_numbers", "credit_cards", "otps", "rat_apps" ], "additionalProperties": False } # --- CACHE-OPTIMIZED MODULAR PROMPT --- # Use the global INTELLIGENCE_EXTRACTION_PROMPT already imported at top level system_part = INTELLIGENCE_EXTRACTION_PROMPT.split("### INPUT")[0].strip() messages = [ {"role": "system", "content": system_part}, {"role": "user", "content": f"TEXT TO ANALYZE:\n\"\"\"{message}\"\"\""} ] # 🔥 OPTIMIZED: Use simple structured output instead of 4-phase CoVe # CoVe used 4 API calls - this uses only 1 response = await self.llm_client.generate_structured( prompt="", schema=schema, messages=messages, context=context # Pass TurnContext for budget enforcement ) # ⚡ Extraction from LLMResponse if not response or not response.content: return {} data = robust_json_loads(response.content) if not data: return {} # Helper to clean lists # SOC FIX: Filter junk strings < 3 chars from LLM output def clean_list(lst): return [str(v).strip() for v in lst if v and len(str(v).strip()) > 3] cleaned = {k: clean_list(v) for k, v in data.items() if isinstance(v, list)} # Store in session cache for subsequent turns try: if session_id: session_cache.set(session_id, message, cleaned) except Exception: pass return cleaned except Exception as e: self.logger.error("LLM Extraction failed", error=str(e)) return {} async def _run_math_forensics(self, message: str) -> Optional[Dict[str, Any]]: """ Runs mathematical verification on detected financial claims. Triggered when ROI, interest rates, or large amounts are detected. """ # Trigger Detection (Lightweight) trigger_patterns = [ r'\b(?:\d+%)', r'\b(?:ROI|interest|return|profit|investment)\b', r'\b(?:\d+\s*(?:lakh|crore|thousand|INR|Rs))\b' ] if not any(re.search(p, message, re.IGNORECASE) for p in trigger_patterns): return None try: self.logger.info("Forensic Clinic: Running Math Verification...") from app.core.prompts import MATH_FORENSIC_PROMPT system_part = MATH_FORENSIC_PROMPT.split("### INPUT")[0].strip() messages = [ {"role": "system", "content": system_part}, {"role": "user", "content": f"CLAIM TO VERIFY:\n\"\"\"{message}\"\"\""} ] # Use Compound-Mini (Optimized for Logic/Math) response = await self.llm_client.generate_smart( prompt="", messages=messages, model="groq/compound-mini", enabled_tools=["code_interpreter"] ) # ⚡ Extraction from LLMResponse if not response or not response.content: return None data = response.content # If the response is a string (legacy/fallback), try to parse it if isinstance(data, str): data = robust_json_loads(data) return data except Exception as e: self.logger.error("Math Forensics failed", error=str(e)) return None async def extract_from_conversation( self, messages: List[Dict] ) -> Dict[str, Any]: """ Aggregate intelligence from entire conversation. Args: messages: List of message dictionaries containing 'text' Returns: Aggregated intelligence """ # SOC FIX: Add Dedup Cache to prevent redundant LLM calls seen_texts = set() intel_messages = [] timeline = [] for msg in messages: text = msg.get("text", "") or msg.get("message", "") sender = msg.get("sender", "unknown") if text: # Skip duplicate messages in history if text in seen_texts: continue seen_texts.add(text) intel = await self.extract(str(text)) intel_messages.append({"intelligence": intel}) # Build timeline # SOC FIX: Granular PII masking for details list sensitive_list = intel["upi_ids"] + intel["bank_accounts"] + intel["urls"] if sensitive_list: timeline.append({ "timestamp": msg.get("timestamp", "N/A"), "sender": sender, "event": "sensitive_data_shared", "details": [self.mask_pii(v) for v in sensitive_list] }) aggregated = aggregate_intelligence(intel_messages) aggregated["timeline"] = timeline aggregated["scam_confidence"] = self._calculate_confidence(aggregated) aggregated["risk_level"] = self._get_risk_level(aggregated["risk_score"]) return aggregated def mask_intelligence(self, intelligence: Dict[str, Any]) -> Dict[str, Any]: """Create a masked copy of intelligence for safe logging/reporting.""" masked = intelligence.copy() sensitive_keys = [ "phone_numbers", "upi_ids", "bank_accounts", "pan_cards", "aadhar_numbers", "emails", "credit_cards", "otps" ] for key in sensitive_keys: if key in masked and isinstance(masked[key], list): masked[key] = [self.mask_pii(item) for item in masked[key]] return masked @staticmethod def mask_pii(value: str, keep: int = 2) -> str: """Mask sensitive string values.""" if len(value) <= keep * 2: return "XXXX" return f"{value[:keep]}XXXX{value[-keep:]}" def assess_risk(self, intelligence: Dict) -> Dict[str, Any]: """Assess risk based on extracted intelligence.""" score = self._calculate_risk_score(intelligence) return { "score": score, "level": self._get_risk_level(score), "confidence": self._calculate_confidence(intelligence) } def _calculate_risk_score(self, intel: Dict) -> int: """Calculate forensic risk score based on finding density.""" score = 0 if intel.get("upi_ids"): score += 20 * len(intel["upi_ids"]) if intel.get("bank_accounts"): score += 30 * len(intel["bank_accounts"]) if intel.get("credit_cards"): score += 100 # Critical if intel.get("rat_apps"): score += 90 # Critical if intel.get("otps"): score += 40 if intel.get("phone_numbers"): score += 10 if intel.get("urls"): score += 10 * len(intel["urls"]) if intel.get("pan_cards") or intel.get("aadhar_numbers"): score += 50 return min(100, score) def _calculate_confidence(self, intelligence: Dict) -> float: """Calculate scam confidence (0.0 - 1.0) based on intel density.""" score = intelligence.get("risk_score", 0) # Simple heuristic: higher risk = higher confidence of scam INTENT confidence = min(1.0, score / 80.0) # Boost if specific high-value targets found # SOC FIX: Gate confidence boost by risk_score > 40 to prevent premature jumps if (intelligence.get("upi_ids") or intelligence.get("bank_accounts") or intelligence.get("otps")) and intelligence.get("risk_score", 0) > 40: confidence = max(confidence, 0.85) return round(confidence, 2) def _get_risk_level(self, score: int) -> str: """Map score to risk level.""" if score >= 75: return "HIGH" if score >= 40: return "MEDIUM" return "LOW" def has_payment_info(self, intelligence: Dict) -> bool: """Check if payment information was extracted.""" return has_payment_info(intelligence) def has_contact_info(self, intelligence: Dict) -> bool: """Check if contact information was extracted.""" return has_contact_info(intelligence) def get_priority_intel(self, intelligence: Dict) -> Dict[str, List[str]]: """ Get high-priority intelligence for law enforcement. Returns only actionable items: UPI, phone, bank accounts, URLs """ return { "upi_ids": intelligence.get("upi_ids", []), "phone_numbers": intelligence.get("phone_numbers", []), "bank_accounts": intelligence.get("bank_accounts", []), "urls": intelligence.get("urls", []), "credit_cards": intelligence.get("credit_cards", []) } def get_intelligence_summary(self, intelligence: Dict, mask: bool = False) -> str: """Get human-readable summary of intelligence.""" intel = self.mask_intelligence(intelligence) if mask else intelligence parts = [] if intel.get("phone_numbers"): parts.append(f"📞 Phones: {', '.join(intel['phone_numbers'])}") if intel.get("upi_ids"): parts.append(f"💳 UPIs: {', '.join(intel['upi_ids'])}") if intel.get("bank_accounts"): parts.append(f"🏦 Accounts: {', '.join(intel['bank_accounts'])}") if intel.get("credit_cards"): parts.append(f"💳 Cards: {', '.join(intel['credit_cards'])}") if intel.get("otps"): parts.append(f"🔐 OTPs: {', '.join(intel['otps'])}") if intel.get("rat_apps"): parts.append(f"⚠️ RAT: {', '.join(intel['rat_apps'])}") if intel.get("urls"): parts.append(f"🔗 URLs: {', '.join(intel['urls'][:3])}") return "\n".join(parts) if parts else "No intelligence extracted yet" __all__ = ["IntelligenceExtractor"]