# app/agents/scam_detector.py - Scam detection agent """Hybrid LLM + keyword scam detection with SOC-grade regex and heuristics.""" import re import json from typing import Dict, Any, List, Optional from collections import Counter from app.core.llm_client import LLMClient, ModelRole from app.core.prompts import SCAM_DETECTION_PROMPT from app.config import settings from app.utils.logger import AgentLogger from app.intelligence.emotional_analyzer import emotional_analyzer from app.utils.json_utils import robust_json_loads # 1. Expanded Scam Taxonomy (SOC-Grade) SCAM_DATABASE = { "lottery_scam": { "keywords": ["won", "winner", "lottery", "prize", "lucky draw", "jackpot", "crore", "lakh", "claim", "congratulations", "selected", "reward", "cash prize", "bumper", "draw"], "regex_patterns": [ r"w[o0]n", r"pr[i1]ze", r"l[u\s]*cky", r"j[a@]ckp[o0]t", r"c[o0]ngrat[us]", r"cr[o0]re", r"l[a@]kh" ], "threat_level": "high", "category": "Financial Fraud", "persona": "elderly_excited", "description": "Fake lottery/prize winning notification", "risk_indicators": ["Unsolicited prize", "Advance fee request"] }, "job_scam": { "keywords": ["work from home", "earn money", "job offer", "hiring", "data entry", "part time", "typing job", "vacancy", "salary", "income", "registration fee", "joining fee"], "regex_patterns": [ r"w[o0]rk\s+fr[o0]m\s+h[o0]me", r"e[a@]rn\s+m[o0]ney", r"j[o0]b\s+off?er", r"p[a@]rt\s+t[i1]me", r"d[a@]ta\s+entry" ], "threat_level": "high", "category": "Employment Fraud", "persona": "desperate_jobseeker", "description": "Fake job offers requiring payment" }, "banking_scam": { "keywords": ["kyc", "account blocked", "verify", "bank", "otp", "update details", "suspend", "deactivate", "pan card", "aadhar link", "account closed", "urgent verification"], "regex_patterns": [ r"k\.?y\.?c", r"a\/?c\s+bl[o0]ck", r"v[e3]r[i1]fy", r"o\.?t\.?p", r"p[a@]n\s+c[a@]rd", r"a{2}dh[a@]r" ], "threat_level": "critical", "category": "Banking Fraud", "persona": "worried_customer", "description": "Fake bank/KYC verification requests" }, "phishing_scam": { "keywords": ["click here", "link", "update account", "security alert", "login", "official", "customer support", "verify identity"], "regex_patterns": [ r"cl[i1]ck", r"l[i1]nk", r"l[o0]g[i1]n", r"v[e3]r[i1]fy" ], "threat_level": "high", "category": "Credential Theft", "persona": "confused_user", "description": "Fake login/link phishing attempts" }, "investment_scam": { "keywords": ["invest", "guaranteed returns", "double money", "bitcoin", "trading", "profit", "forex", "stock tips", "mutual fund", "high returns", "100% profit", "no risk"], "regex_patterns": [ r"inv[e3]st", r"gu[a@]r[a@]nt[e3]{2}", r"d[o0]uble", r"b[i1]tc[o0]in", r"pr[o0]f[i1]t" ], "threat_level": "high", "category": "Investment Fraud", "persona": "curious_investor", "description": "Fraudulent investment schemes" }, "loan_scam": { "keywords": ["instant loan", "no documents", "low interest", "approved", "processing fee", "pre-approved", "personal loan"], "regex_patterns": [ r"inst[a@]nt\s+l[o0][a@]n", r"pr[e3][\s-]?appr[o0]ved", r"pr[o0]cess[i1]ng\s+f[e3]{2}" ], "threat_level": "high", "category": "Loan Fraud", "persona": "needy_borrower", "description": "Fake instant loan offers" }, "government_scam": { "keywords": ["tax refund", "legal notice", "arrest warrant", "police", "court", "fine", "income tax", "cbi", "enforcement", "legal action", "ed", "narcotics"], "regex_patterns": [ r"p[o0]l[i1]ce", r"[a@]rrest", r"w[a@]rr[a@]nt", r"t[a@]x", r"c\.?b\.?i", r"n[a@]rc[o0]t[i1]cs" ], "threat_level": "critical", "category": "Government Impersonation", "persona": "scared_citizen", "description": "Fake government/legal notices" }, "delivery_scam": { "keywords": ["package", "delivery failed", "customs", "courier", "stuck", "pay fee", "undelivered", "reshipping"], "regex_patterns": [ r"p[a@]ck[a@]ge", r"d[e3]l[i1]very", r"cust[o0]ms", r"c[o0]ur[i1]er" ], "threat_level": "medium", "category": "Delivery Fraud", "persona": "expecting_customer", "description": "Fake delivery/customs fee requests" }, "tech_support_scam": { "keywords": ["virus", "hacked", "security alert", "microsoft", "computer problem", "remote access", "tech support"], "regex_patterns": [ r"v[i1]rus", r"h[a@]ck[e3]d", r"m[i1]cr[o0]s[o0]ft", r"rem[o0]te" ], "threat_level": "medium", "category": "Tech Support Fraud", "persona": "confused_elderly", "description": "Fake tech support queries" }, "romance_scam": { "keywords": ["love you", "relationship", "lonely", "marriage", "stuck abroad", "need money", "emergency", "gift"], "threat_level": "high", "category": "Romance Fraud", "persona": "lonely_victim", "description": "Fake romantic interest for money" }, "crypto_scam": { "keywords": ["crypto", "ethereum", "wallet", "airdrop", "free coins", "blockchain", "trading bot"], "threat_level": "high", "category": "Crypto Fraud", "persona": "crypto_curious", "description": "Cryptocurrency fraud" }, # 🆕 NEW VECTORS (SOC Recommendation) "sim_swap_scam": { "keywords": ["sim card", "upgrade 4g", "5g upgrade", "sim block", "network issue", "port number", "esim activation"], "regex_patterns": [r"s[i1]m\s*c[a@]rd", r"5g\s*upgr[a@]de", r"e-?s[i1]m"], "threat_level": "critical", "category": "Telecom Fraud", "persona": "tech_illiterate", "description": "SIM swap/eSIM activation fraud" }, "qr_code_scam": { "keywords": ["scan code", "qr code", "receive payment", "scan to pay", "gpay qr", "phonepe qr", "paytm qr"], "regex_patterns": [r"qr\s*c[o0]de", r"sc[a@]n", r"rec[ei]ve\s*p[a@]yment"], "threat_level": "high", "category": "Payment Fraud", "persona": "shopkeeper", "description": "QR Code payment reversal scam" }, "refund_scam": { "keywords": ["refund", "wrong transaction", "money sent by mistake", "return money", "cashback", "refund processed"], "regex_patterns": [r"ref[u]?nd", r"c[a@]shb[a@]ck", r"wr[o0]ng\s*tr[a@]ns"], "threat_level": "medium", "category": "Refund Fraud", "persona": "honest_person", "description": "Fake accidental transfer refund" }, "fake_support": { "keywords": ["customer care", "helpline", "support number", "complaint", "toll free", "service center"], "regex_patterns": [r"cust[o0]mer\s*c[a@]re", r"h[e3]lp\s*l[i1]ne"], "threat_level": "high", "category": "Impersonation", "persona": "angry_customer", "description": "Fake customer support numbers" }, "deepfake_scam": { "keywords": ["voice", "audio", "video call", "urgent need", "accident", "jail", "kidnapped"], "regex_patterns": [r"v[o0][i1]ce", r"v[i1]de[o0]", r"k[i1]dn[a@]p"], "threat_level": "critical", "category": "Deepfake/AI Fraud", "persona": "scared_relative", "description": "AI-generated voice/video impersonation" } } class ScamDetector: """ Scam Detection Agent using hybrid approach: 1. SOC-grade Fuzzy Regex pre-filtering 2. LLM-based accurate classification 3. Weighted Confidence Ensemble """ def __init__(self, llm_client: Optional[LLMClient] = None): self.llm_client = llm_client self.logger = AgentLogger("scam_detector") self._compile_regexes() def _compile_regexes(self): """Pre-compile regex patterns for performance optimization.""" self.compiled_patterns = {} for scam_type, data in SCAM_DATABASE.items(): patterns = data.get("regex_patterns", []) # Also escape and use raw keywords as fallback for kw in data["keywords"]: # SOC FIX: Only auto-add single-token keywords (precision) if " " not in kw and len(kw) > 3: patterns.append(re.escape(kw)) # Join into one massive Optimized Regex per scam type full_pattern = "|".join(patterns) if full_pattern: self.compiled_patterns[scam_type] = re.compile(full_pattern, re.IGNORECASE) def detect_heuristic(self, message: str) -> Dict[str, Any]: """Public alias for SOC-grade regex detection (Zero Latency).""" return self._keyword_detection(message) async def detect(self, message: str, context: Optional[Any] = None, turn_count: int = 1) -> Dict[str, Any]: """Hybrid detection pipeline.""" self.logger.debug("Detecting scam", message_length=len(message)) # Step 1: SOC-Grade Regex Parsing keyword_result = self._keyword_detection(message) # 🔥 LATENCY OPTIMIZATION: HEURISTIC FAST-PATH (Refined Thresholds) # Turn 1: Require extremely high confidence (> 0.85) to avoid false positives. # Turn 2+: Lower threshold to 0.70 since we already have session context. threshold = 0.85 if turn_count <= 1 else 0.70 if keyword_result.get("confidence", 0) >= threshold: self.logger.info( f"⚡ FAST-PATH TRIGGERED (Turn {turn_count}): Skipping LLM Detection", scam_type=keyword_result["scam_type"], confidence=keyword_result["confidence"] ) # Ensure essential keys exist # SOC FIX: Populate intent for Fast-Path compatibility keyword_result["intent"] = "money_theft" keyword_result["agent_notes"] = keyword_result.get("agent_notes", "") + f" [FAST-PATH: REGEX (T{turn_count})]" # Still perform emotional analysis (It's fast, regex-based) emotional_profile = emotional_analyzer.analyze(message) keyword_result["emotional_profile"] = emotional_profile.to_dict() return keyword_result.copy() # Step 2: LLM detection (Only if Regex was unsure) llm_result = None if settings.ENABLE_LLM_DETECTION and self.llm_client and self.llm_client.is_available: # Check context to prevent redundant calls if already decided elsewhere if context and context.scam_decided: self.logger.info("Decision already made in context, skipping LLM detection.") else: llm_result = await self._llm_detection(message, context=context) # Step 3: Ensemble Logic if llm_result: final_result = self._combine_results(keyword_result, llm_result) else: final_result = keyword_result # 🔥 Step 4: Behavioral & Emotional Analysis (NEW CONNECTION) # Adds research-backed behavioral scoring (Urgency/Fear/Greed) emotional_profile = emotional_analyzer.analyze(message) final_result["emotional_profile"] = emotional_profile.to_dict() # Boost confidence if high emotional manipulation is detected if emotional_profile.overall_manipulation > 0.6: final_result["confidence"] = min(1.0, final_result["confidence"] + 0.1) final_result["threat_level"] = "critical" if final_result["confidence"] > 0.9 else final_result["threat_level"] # Log decision with agent notes (HK Bonus) self.logger.info( "Scam detected with emotional profile", scam_type=final_result["scam_type"], confidence=final_result["confidence"], tactic=emotional_profile.primary_tactic ) return final_result def _keyword_detection(self, message: str) -> Dict[str, Any]: """Fuzzy regex-based detection with weighted confidence.""" best_match = None max_matches = 0 matched_keywords = [] # Actually matched patterns # O(N) Scan using pre-compiled regex for scam_type, pattern in self.compiled_patterns.items(): matches = pattern.findall(message) unique_matches = list(set(matches)) # Count unique hits higher if len(unique_matches) > max_matches: max_matches = len(unique_matches) best_match = scam_type matched_keywords = unique_matches if max_matches == 0: return { "is_scam": False, "scam_type": "not_scam", "confidence": 0.0, # Explicit 0.0 for non-scam "threat_level": "none", "category": "Safe", "matched_keywords": [], "risk_indicators": [], "description": "No known scam pattern detected" } # Weighted Confidence Formula (TF-IDF Inspired) # SOC FIX: Clamp low evidence matches to prevent accidental escalation if max_matches == 1: confidence = 0.35 elif max_matches == 2: confidence = 0.55 else: confidence = min(0.95, 0.4 + (max_matches / 3.0) * 0.4) scam_data = SCAM_DATABASE[best_match] return { "is_scam": True, "scam_type": best_match, "confidence": round(confidence, 2), "threat_level": scam_data["threat_level"], "category": scam_data["category"], "matched_keywords": matched_keywords, "risk_indicators": scam_data.get("risk_indicators", ["Pattern Match"]), "description": scam_data["description"], "persona": scam_data["persona"], "agent_notes": f"High-confidence pattern match: {', '.join(matched_keywords)}" } async def _llm_detection(self, message: str, context: Optional[Any] = None) -> Optional[Dict[str, Any]]: """LLM-based detection with Strict Schema Sync.""" try: # 1. Dynamic Enum Sync (Fixes Strict Mode 400 Errors) scam_enum = list(SCAM_DATABASE.keys()) + ["unknown", "novel_scam"] schema = { "type": "object", "properties": { "is_scam": {"type": "boolean"}, "scam_type": { "type": "string", "enum": scam_enum }, "confidence": {"type": "number"}, "threat_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]}, "intent": {"type": "string"}, "risk_indicators": {"type": "array", "items": {"type": "string"}}, "reasoning": {"type": "string"} }, "required": ["is_scam", "scam_type", "confidence", "threat_level", "reasoning"] } # --- CACHE-OPTIMIZED MODULAR PROMPT --- # We split the prompt into Static System (Taxonomy + Rules) and Dynamic User (The Message) # This ensures the 1000+ token Taxonomy is cached globally for all detection calls. from app.core.prompts import SCAM_DETECTION_PROMPT # Extract system part (up to ### EXPECTED OUTPUT or ### INPUT) system_part = SCAM_DETECTION_PROMPT.split("### INPUT")[0].strip() messages = [ {"role": "system", "content": system_part}, {"role": "user", "content": f"MESSAGE TO ANALYZE:\n\"\"\"{message}\"\"\""} ] response = await self.llm_client.generate_with_cot( prompt="", # Required positional arg but we'll use messages instead messages=messages, schema=schema ) # ⚡ Extraction from LLMResponse if not response or not response.content: return None try: res = json.loads(response.content) except Exception: # ⚡ SELF-HEALING: If structured failed but returned a string slug content = response.content.strip() # SAFETY GUARD: If content is too long, it's likely a hallucinated reply, not a type slug if len(content) > 50: self.logger.warning("LLM returned conversational text instead of scam_type", content_excerpt=content[:50]) return None # Fallback to regex/heuristic # SOC FIX: Use allowlist for non-scam detection non_scam_tokens = {"non_scam", "safe", "legit", "not_scam"} res = { "is_scam": content.lower() not in non_scam_tokens, "scam_type": content, "confidence": 0.9, "threat_level": "medium", "intent": "unknown", "reasoning": "Direct slug extraction fallback", "risk_indicators": ["String-only LLM output"] } # 2. SOC Normalization (Self-Healing & Schema Compliance) final_res = self._validate_json(res) # Capture reasoning into the dict for downstream compatibility if response.reasoning: final_res["agent_notes"] = response.reasoning return final_res except Exception as e: self.logger.error(f"LLM detection failed: {e}") return None def _parse_llm_response(self, response: str) -> Optional[Dict[str, Any]]: """Robust JSON parsing with multiple fallbacks.""" data = robust_json_loads(response) if data: return self._validate_json(data) return None def _validate_json(self, data: Dict) -> Dict: """Validate and normalize LLM JSON output.""" return { "is_scam": data.get("is_scam", False), "scam_type": data.get("scam_type", "unknown"), "confidence": float(data.get("confidence", 0.5)), "threat_level": data.get("threat_level", "medium"), "risk_indicators": data.get("risk_indicators", []), "agent_notes": data.get("reasoning", "") or data.get("description", "") } def _combine_results( self, keyword_result: Dict, llm_result: Dict ) -> Dict[str, Any]: """Ensemble Voting Logic (SOC Standard).""" # SOC FIX: Prevent mutation of original inputs keyword_result = keyword_result.copy() llm_result = llm_result.copy() kw_conf = keyword_result.get("confidence", 0) llm_conf = llm_result.get("confidence", 0) # Rule 1: High-confidence Keyword > Low-confidence LLM # (Regex is deterministic, LLMs hallucinate) if kw_conf > 0.8: final = keyword_result final["agent_notes"] += f" (Confirmed by verified regex pattern)" # Boost confidence slightly if LLM agrees if llm_result.get("is_scam"): final["confidence"] = min(0.99, kw_conf + 0.05) # Ensure indicators are merged final["risk_indicators"] = list(set(final.get("risk_indicators", []) + llm_result.get("risk_indicators", []))) return final # Rule 2: High-confidence LLM > Weak Keyword if llm_conf > 0.7 and kw_conf < 0.4: result = llm_result result["matched_keywords"] = keyword_result.get("matched_keywords", []) return result # Rule 3: Agreement = High Confidence if keyword_result.get("is_scam") and llm_result.get("is_scam"): avg_conf = (kw_conf + llm_conf) / 2 # Boost logic: agreement warrants higher trust boosted_conf = min(0.98, avg_conf + 0.15) result = llm_result # Prefer LLM's classification specificity result["confidence"] = round(boosted_conf, 2) result["matched_keywords"] = keyword_result.get("matched_keywords", []) current_notes = result.get("agent_notes", "") result["agent_notes"] = f"{current_notes} | Regex detected: {result.get('matched_keywords', [])}" return result # Default: Average both final = keyword_result if kw_conf > llm_conf else llm_result final["confidence"] = round((kw_conf + llm_conf) / 2, 2) return final def get_persona_for_scam(self, scam_type: str) -> str: """Get recommended persona for scam type.""" if scam_type in SCAM_DATABASE: return SCAM_DATABASE[scam_type].get("persona", "elderly_excited") return "elderly_excited" def get_scam_info(self, scam_type: str) -> Dict[str, Any]: """Get information about a scam type.""" return SCAM_DATABASE.get(scam_type, {}) # Export for import __all__ = ["ScamDetector", "SCAM_DATABASE"]