Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600 | # app/agents/scam_detector.py - Scam detection agent | |
| """Hybrid LLM + keyword scam detection with SOC-grade regex and heuristics.""" | |
| import re | |
| import json | |
| from typing import Dict, Any, List, Optional | |
| from collections import Counter | |
| from app.core.llm_client import LLMClient, ModelRole | |
| from app.core.prompts import SCAM_DETECTION_PROMPT | |
| from app.config import settings | |
| from app.utils.logger import AgentLogger | |
| from app.intelligence.emotional_analyzer import emotional_analyzer | |
| from app.utils.json_utils import robust_json_loads | |
| # 1. Expanded Scam Taxonomy (SOC-Grade) | |
| SCAM_DATABASE = { | |
| "lottery_scam": { | |
| "keywords": ["won", "winner", "lottery", "prize", "lucky draw", | |
| "jackpot", "crore", "lakh", "claim", "congratulations", | |
| "selected", "reward", "cash prize", "bumper", "draw"], | |
| "regex_patterns": [ | |
| r"w[o0]n", r"pr[i1]ze", r"l[u\s]*cky", r"j[a@]ckp[o0]t", | |
| r"c[o0]ngrat[us]", r"cr[o0]re", r"l[a@]kh" | |
| ], | |
| "threat_level": "high", | |
| "category": "Financial Fraud", | |
| "persona": "elderly_excited", | |
| "description": "Fake lottery/prize winning notification", | |
| "risk_indicators": ["Unsolicited prize", "Advance fee request"] | |
| }, | |
| "job_scam": { | |
| "keywords": ["work from home", "earn money", "job offer", "hiring", | |
| "data entry", "part time", "typing job", "vacancy", | |
| "salary", "income", "registration fee", "joining fee"], | |
| "regex_patterns": [ | |
| r"w[o0]rk\s+fr[o0]m\s+h[o0]me", r"e[a@]rn\s+m[o0]ney", | |
| r"j[o0]b\s+off?er", r"p[a@]rt\s+t[i1]me", r"d[a@]ta\s+entry" | |
| ], | |
| "threat_level": "high", | |
| "category": "Employment Fraud", | |
| "persona": "desperate_jobseeker", | |
| "description": "Fake job offers requiring payment" | |
| }, | |
| "banking_scam": { | |
| "keywords": ["kyc", "account blocked", "verify", "bank", "otp", | |
| "update details", "suspend", "deactivate", "pan card", | |
| "aadhar link", "account closed", "urgent verification"], | |
| "regex_patterns": [ | |
| r"k\.?y\.?c", r"a\/?c\s+bl[o0]ck", r"v[e3]r[i1]fy", r"o\.?t\.?p", | |
| r"p[a@]n\s+c[a@]rd", r"a{2}dh[a@]r" | |
| ], | |
| "threat_level": "critical", | |
| "category": "Banking Fraud", | |
| "persona": "worried_customer", | |
| "description": "Fake bank/KYC verification requests" | |
| }, | |
| "phishing_scam": { | |
| "keywords": ["click here", "link", "update account", "security alert", | |
| "login", "official", "customer support", "verify identity"], | |
| "regex_patterns": [ | |
| r"cl[i1]ck", r"l[i1]nk", r"l[o0]g[i1]n", r"v[e3]r[i1]fy" | |
| ], | |
| "threat_level": "high", | |
| "category": "Credential Theft", | |
| "persona": "confused_user", | |
| "description": "Fake login/link phishing attempts" | |
| }, | |
| "investment_scam": { | |
| "keywords": ["invest", "guaranteed returns", "double money", "bitcoin", | |
| "trading", "profit", "forex", "stock tips", "mutual fund", | |
| "high returns", "100% profit", "no risk"], | |
| "regex_patterns": [ | |
| r"inv[e3]st", r"gu[a@]r[a@]nt[e3]{2}", r"d[o0]uble", r"b[i1]tc[o0]in", | |
| r"pr[o0]f[i1]t" | |
| ], | |
| "threat_level": "high", | |
| "category": "Investment Fraud", | |
| "persona": "curious_investor", | |
| "description": "Fraudulent investment schemes" | |
| }, | |
| "loan_scam": { | |
| "keywords": ["instant loan", "no documents", "low interest", "approved", | |
| "processing fee", "pre-approved", "personal loan"], | |
| "regex_patterns": [ | |
| r"inst[a@]nt\s+l[o0][a@]n", r"pr[e3][\s-]?appr[o0]ved", r"pr[o0]cess[i1]ng\s+f[e3]{2}" | |
| ], | |
| "threat_level": "high", | |
| "category": "Loan Fraud", | |
| "persona": "needy_borrower", | |
| "description": "Fake instant loan offers" | |
| }, | |
| "government_scam": { | |
| "keywords": ["tax refund", "legal notice", "arrest warrant", "police", | |
| "court", "fine", "income tax", "cbi", "enforcement", | |
| "legal action", "ed", "narcotics"], | |
| "regex_patterns": [ | |
| r"p[o0]l[i1]ce", r"[a@]rrest", r"w[a@]rr[a@]nt", r"t[a@]x", | |
| r"c\.?b\.?i", r"n[a@]rc[o0]t[i1]cs" | |
| ], | |
| "threat_level": "critical", | |
| "category": "Government Impersonation", | |
| "persona": "scared_citizen", | |
| "description": "Fake government/legal notices" | |
| }, | |
| "delivery_scam": { | |
| "keywords": ["package", "delivery failed", "customs", "courier", | |
| "stuck", "pay fee", "undelivered", "reshipping"], | |
| "regex_patterns": [ | |
| r"p[a@]ck[a@]ge", r"d[e3]l[i1]very", r"cust[o0]ms", r"c[o0]ur[i1]er" | |
| ], | |
| "threat_level": "medium", | |
| "category": "Delivery Fraud", | |
| "persona": "expecting_customer", | |
| "description": "Fake delivery/customs fee requests" | |
| }, | |
| "tech_support_scam": { | |
| "keywords": ["virus", "hacked", "security alert", "microsoft", | |
| "computer problem", "remote access", "tech support"], | |
| "regex_patterns": [ | |
| r"v[i1]rus", r"h[a@]ck[e3]d", r"m[i1]cr[o0]s[o0]ft", r"rem[o0]te" | |
| ], | |
| "threat_level": "medium", | |
| "category": "Tech Support Fraud", | |
| "persona": "confused_elderly", | |
| "description": "Fake tech support queries" | |
| }, | |
| "romance_scam": { | |
| "keywords": ["love you", "relationship", "lonely", "marriage", | |
| "stuck abroad", "need money", "emergency", "gift"], | |
| "threat_level": "high", | |
| "category": "Romance Fraud", | |
| "persona": "lonely_victim", | |
| "description": "Fake romantic interest for money" | |
| }, | |
| "crypto_scam": { | |
| "keywords": ["crypto", "ethereum", "wallet", "airdrop", | |
| "free coins", "blockchain", "trading bot"], | |
| "threat_level": "high", | |
| "category": "Crypto Fraud", | |
| "persona": "crypto_curious", | |
| "description": "Cryptocurrency fraud" | |
| }, | |
| # 🆕 NEW VECTORS (SOC Recommendation) | |
| "sim_swap_scam": { | |
| "keywords": ["sim card", "upgrade 4g", "5g upgrade", "sim block", | |
| "network issue", "port number", "esim activation"], | |
| "regex_patterns": [r"s[i1]m\s*c[a@]rd", r"5g\s*upgr[a@]de", r"e-?s[i1]m"], | |
| "threat_level": "critical", | |
| "category": "Telecom Fraud", | |
| "persona": "tech_illiterate", | |
| "description": "SIM swap/eSIM activation fraud" | |
| }, | |
| "qr_code_scam": { | |
| "keywords": ["scan code", "qr code", "receive payment", "scan to pay", | |
| "gpay qr", "phonepe qr", "paytm qr"], | |
| "regex_patterns": [r"qr\s*c[o0]de", r"sc[a@]n", r"rec[ei]ve\s*p[a@]yment"], | |
| "threat_level": "high", | |
| "category": "Payment Fraud", | |
| "persona": "shopkeeper", | |
| "description": "QR Code payment reversal scam" | |
| }, | |
| "refund_scam": { | |
| "keywords": ["refund", "wrong transaction", "money sent by mistake", | |
| "return money", "cashback", "refund processed"], | |
| "regex_patterns": [r"ref[u]?nd", r"c[a@]shb[a@]ck", r"wr[o0]ng\s*tr[a@]ns"], | |
| "threat_level": "medium", | |
| "category": "Refund Fraud", | |
| "persona": "honest_person", | |
| "description": "Fake accidental transfer refund" | |
| }, | |
| "fake_support": { | |
| "keywords": ["customer care", "helpline", "support number", | |
| "complaint", "toll free", "service center"], | |
| "regex_patterns": [r"cust[o0]mer\s*c[a@]re", r"h[e3]lp\s*l[i1]ne"], | |
| "threat_level": "high", | |
| "category": "Impersonation", | |
| "persona": "angry_customer", | |
| "description": "Fake customer support numbers" | |
| }, | |
| "deepfake_scam": { | |
| "keywords": ["voice", "audio", "video call", "urgent need", | |
| "accident", "jail", "kidnapped"], | |
| "regex_patterns": [r"v[o0][i1]ce", r"v[i1]de[o0]", r"k[i1]dn[a@]p"], | |
| "threat_level": "critical", | |
| "category": "Deepfake/AI Fraud", | |
| "persona": "scared_relative", | |
| "description": "AI-generated voice/video impersonation" | |
| } | |
| } | |
| class ScamDetector: | |
| """ | |
| Scam Detection Agent using hybrid approach: | |
| 1. SOC-grade Fuzzy Regex pre-filtering | |
| 2. LLM-based accurate classification | |
| 3. Weighted Confidence Ensemble | |
| """ | |
| def __init__(self, llm_client: Optional[LLMClient] = None): | |
| self.llm_client = llm_client | |
| self.logger = AgentLogger("scam_detector") | |
| self._compile_regexes() | |
| def _compile_regexes(self): | |
| """Pre-compile regex patterns for performance optimization.""" | |
| self.compiled_patterns = {} | |
| for scam_type, data in SCAM_DATABASE.items(): | |
| patterns = data.get("regex_patterns", []) | |
| # Also escape and use raw keywords as fallback | |
| for kw in data["keywords"]: | |
| # SOC FIX: Only auto-add single-token keywords (precision) | |
| if " " not in kw and len(kw) > 3: | |
| patterns.append(re.escape(kw)) | |
| # Join into one massive Optimized Regex per scam type | |
| full_pattern = "|".join(patterns) | |
| if full_pattern: | |
| self.compiled_patterns[scam_type] = re.compile(full_pattern, re.IGNORECASE) | |
| def detect_heuristic(self, message: str) -> Dict[str, Any]: | |
| """Public alias for SOC-grade regex detection (Zero Latency).""" | |
| return self._keyword_detection(message) | |
| async def detect(self, message: str, context: Optional[Any] = None, turn_count: int = 1) -> Dict[str, Any]: | |
| """Hybrid detection pipeline.""" | |
| self.logger.debug("Detecting scam", message_length=len(message)) | |
| # Step 1: SOC-Grade Regex Parsing | |
| keyword_result = self._keyword_detection(message) | |
| # 🔥 LATENCY OPTIMIZATION: HEURISTIC FAST-PATH (Refined Thresholds) | |
| # Turn 1: Require extremely high confidence (> 0.85) to avoid false positives. | |
| # Turn 2+: Lower threshold to 0.70 since we already have session context. | |
| threshold = 0.85 if turn_count <= 1 else 0.70 | |
| if keyword_result.get("confidence", 0) >= threshold: | |
| self.logger.info( | |
| f"⚡ FAST-PATH TRIGGERED (Turn {turn_count}): Skipping LLM Detection", | |
| scam_type=keyword_result["scam_type"], | |
| confidence=keyword_result["confidence"] | |
| ) | |
| # Ensure essential keys exist | |
| # SOC FIX: Populate intent for Fast-Path compatibility | |
| keyword_result["intent"] = "money_theft" | |
| keyword_result["agent_notes"] = keyword_result.get("agent_notes", "") + f" [FAST-PATH: REGEX (T{turn_count})]" | |
| # Still perform emotional analysis (It's fast, regex-based) | |
| emotional_profile = emotional_analyzer.analyze(message) | |
| keyword_result["emotional_profile"] = emotional_profile.to_dict() | |
| return keyword_result.copy() | |
| # Step 2: LLM detection (Only if Regex was unsure) | |
| llm_result = None | |
| if settings.ENABLE_LLM_DETECTION and self.llm_client and self.llm_client.is_available: | |
| # Check context to prevent redundant calls if already decided elsewhere | |
| if context and context.scam_decided: | |
| self.logger.info("Decision already made in context, skipping LLM detection.") | |
| else: | |
| llm_result = await self._llm_detection(message, context=context) | |
| # Step 3: Ensemble Logic | |
| if llm_result: | |
| final_result = self._combine_results(keyword_result, llm_result) | |
| else: | |
| final_result = keyword_result | |
| # 🔥 Step 4: Behavioral & Emotional Analysis (NEW CONNECTION) | |
| # Adds research-backed behavioral scoring (Urgency/Fear/Greed) | |
| emotional_profile = emotional_analyzer.analyze(message) | |
| final_result["emotional_profile"] = emotional_profile.to_dict() | |
| # Boost confidence if high emotional manipulation is detected | |
| if emotional_profile.overall_manipulation > 0.6: | |
| final_result["confidence"] = min(1.0, final_result["confidence"] + 0.1) | |
| final_result["threat_level"] = "critical" if final_result["confidence"] > 0.9 else final_result["threat_level"] | |
| # Log decision with agent notes (HK Bonus) | |
| self.logger.info( | |
| "Scam detected with emotional profile", | |
| scam_type=final_result["scam_type"], | |
| confidence=final_result["confidence"], | |
| tactic=emotional_profile.primary_tactic | |
| ) | |
| return final_result | |
| def _keyword_detection(self, message: str) -> Dict[str, Any]: | |
| """Fuzzy regex-based detection with weighted confidence.""" | |
| best_match = None | |
| max_matches = 0 | |
| matched_keywords = [] # Actually matched patterns | |
| # O(N) Scan using pre-compiled regex | |
| for scam_type, pattern in self.compiled_patterns.items(): | |
| matches = pattern.findall(message) | |
| unique_matches = list(set(matches)) # Count unique hits higher | |
| if len(unique_matches) > max_matches: | |
| max_matches = len(unique_matches) | |
| best_match = scam_type | |
| matched_keywords = unique_matches | |
| if max_matches == 0: | |
| return { | |
| "is_scam": False, | |
| "scam_type": "not_scam", | |
| "confidence": 0.0, # Explicit 0.0 for non-scam | |
| "threat_level": "none", | |
| "category": "Safe", | |
| "matched_keywords": [], | |
| "risk_indicators": [], | |
| "description": "No known scam pattern detected" | |
| } | |
| # Weighted Confidence Formula (TF-IDF Inspired) | |
| # SOC FIX: Clamp low evidence matches to prevent accidental escalation | |
| if max_matches == 1: | |
| confidence = 0.35 | |
| elif max_matches == 2: | |
| confidence = 0.55 | |
| else: | |
| confidence = min(0.95, 0.4 + (max_matches / 3.0) * 0.4) | |
| scam_data = SCAM_DATABASE[best_match] | |
| return { | |
| "is_scam": True, | |
| "scam_type": best_match, | |
| "confidence": round(confidence, 2), | |
| "threat_level": scam_data["threat_level"], | |
| "category": scam_data["category"], | |
| "matched_keywords": matched_keywords, | |
| "risk_indicators": scam_data.get("risk_indicators", ["Pattern Match"]), | |
| "description": scam_data["description"], | |
| "persona": scam_data["persona"], | |
| "agent_notes": f"High-confidence pattern match: {', '.join(matched_keywords)}" | |
| } | |
| async def _llm_detection(self, message: str, context: Optional[Any] = None) -> Optional[Dict[str, Any]]: | |
| """LLM-based detection with Strict Schema Sync.""" | |
| try: | |
| # 1. Dynamic Enum Sync (Fixes Strict Mode 400 Errors) | |
| scam_enum = list(SCAM_DATABASE.keys()) + ["unknown", "novel_scam"] | |
| schema = { | |
| "type": "object", | |
| "properties": { | |
| "is_scam": {"type": "boolean"}, | |
| "scam_type": { | |
| "type": "string", | |
| "enum": scam_enum | |
| }, | |
| "confidence": {"type": "number"}, | |
| "threat_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]}, | |
| "intent": {"type": "string"}, | |
| "risk_indicators": {"type": "array", "items": {"type": "string"}}, | |
| "reasoning": {"type": "string"} | |
| }, | |
| "required": ["is_scam", "scam_type", "confidence", "threat_level", "reasoning"] | |
| } | |
| # --- CACHE-OPTIMIZED MODULAR PROMPT --- | |
| # We split the prompt into Static System (Taxonomy + Rules) and Dynamic User (The Message) | |
| # This ensures the 1000+ token Taxonomy is cached globally for all detection calls. | |
| from app.core.prompts import SCAM_DETECTION_PROMPT | |
| # Extract system part (up to ### EXPECTED OUTPUT or ### INPUT) | |
| system_part = SCAM_DETECTION_PROMPT.split("### INPUT")[0].strip() | |
| messages = [ | |
| {"role": "system", "content": system_part}, | |
| {"role": "user", "content": f"MESSAGE TO ANALYZE:\n\"\"\"{message}\"\"\""} | |
| ] | |
| response = await self.llm_client.generate_with_cot( | |
| prompt="", # Required positional arg but we'll use messages instead | |
| messages=messages, | |
| schema=schema | |
| ) | |
| # ⚡ Extraction from LLMResponse | |
| if not response or not response.content: | |
| return None | |
| try: | |
| res = json.loads(response.content) | |
| except Exception: | |
| # ⚡ SELF-HEALING: If structured failed but returned a string slug | |
| content = response.content.strip() | |
| # SAFETY GUARD: If content is too long, it's likely a hallucinated reply, not a type slug | |
| if len(content) > 50: | |
| self.logger.warning("LLM returned conversational text instead of scam_type", content_excerpt=content[:50]) | |
| return None # Fallback to regex/heuristic | |
| # SOC FIX: Use allowlist for non-scam detection | |
| non_scam_tokens = {"non_scam", "safe", "legit", "not_scam"} | |
| res = { | |
| "is_scam": content.lower() not in non_scam_tokens, | |
| "scam_type": content, | |
| "confidence": 0.9, | |
| "threat_level": "medium", | |
| "intent": "unknown", | |
| "reasoning": "Direct slug extraction fallback", | |
| "risk_indicators": ["String-only LLM output"] | |
| } | |
| # 2. SOC Normalization (Self-Healing & Schema Compliance) | |
| final_res = self._validate_json(res) | |
| # Capture reasoning into the dict for downstream compatibility | |
| if response.reasoning: | |
| final_res["agent_notes"] = response.reasoning | |
| return final_res | |
| except Exception as e: | |
| self.logger.error(f"LLM detection failed: {e}") | |
| return None | |
| def _parse_llm_response(self, response: str) -> Optional[Dict[str, Any]]: | |
| """Robust JSON parsing with multiple fallbacks.""" | |
| data = robust_json_loads(response) | |
| if data: | |
| return self._validate_json(data) | |
| return None | |
| def _validate_json(self, data: Dict) -> Dict: | |
| """Validate and normalize LLM JSON output.""" | |
| return { | |
| "is_scam": data.get("is_scam", False), | |
| "scam_type": data.get("scam_type", "unknown"), | |
| "confidence": float(data.get("confidence", 0.5)), | |
| "threat_level": data.get("threat_level", "medium"), | |
| "risk_indicators": data.get("risk_indicators", []), | |
| "agent_notes": data.get("reasoning", "") or data.get("description", "") | |
| } | |
| def _combine_results( | |
| self, | |
| keyword_result: Dict, | |
| llm_result: Dict | |
| ) -> Dict[str, Any]: | |
| """Ensemble Voting Logic (SOC Standard).""" | |
| # SOC FIX: Prevent mutation of original inputs | |
| keyword_result = keyword_result.copy() | |
| llm_result = llm_result.copy() | |
| kw_conf = keyword_result.get("confidence", 0) | |
| llm_conf = llm_result.get("confidence", 0) | |
| # Rule 1: High-confidence Keyword > Low-confidence LLM | |
| # (Regex is deterministic, LLMs hallucinate) | |
| if kw_conf > 0.8: | |
| final = keyword_result | |
| final["agent_notes"] += f" (Confirmed by verified regex pattern)" | |
| # Boost confidence slightly if LLM agrees | |
| if llm_result.get("is_scam"): | |
| final["confidence"] = min(0.99, kw_conf + 0.05) | |
| # Ensure indicators are merged | |
| final["risk_indicators"] = list(set(final.get("risk_indicators", []) + llm_result.get("risk_indicators", []))) | |
| return final | |
| # Rule 2: High-confidence LLM > Weak Keyword | |
| if llm_conf > 0.7 and kw_conf < 0.4: | |
| result = llm_result | |
| result["matched_keywords"] = keyword_result.get("matched_keywords", []) | |
| return result | |
| # Rule 3: Agreement = High Confidence | |
| if keyword_result.get("is_scam") and llm_result.get("is_scam"): | |
| avg_conf = (kw_conf + llm_conf) / 2 | |
| # Boost logic: agreement warrants higher trust | |
| boosted_conf = min(0.98, avg_conf + 0.15) | |
| result = llm_result # Prefer LLM's classification specificity | |
| result["confidence"] = round(boosted_conf, 2) | |
| result["matched_keywords"] = keyword_result.get("matched_keywords", []) | |
| current_notes = result.get("agent_notes", "") | |
| result["agent_notes"] = f"{current_notes} | Regex detected: {result.get('matched_keywords', [])}" | |
| return result | |
| # Default: Average both | |
| final = keyword_result if kw_conf > llm_conf else llm_result | |
| final["confidence"] = round((kw_conf + llm_conf) / 2, 2) | |
| return final | |
| def get_persona_for_scam(self, scam_type: str) -> str: | |
| """Get recommended persona for scam type.""" | |
| if scam_type in SCAM_DATABASE: | |
| return SCAM_DATABASE[scam_type].get("persona", "elderly_excited") | |
| return "elderly_excited" | |
| def get_scam_info(self, scam_type: str) -> Dict[str, Any]: | |
| """Get information about a scam type.""" | |
| return SCAM_DATABASE.get(scam_type, {}) | |
| # Export for import | |
| __all__ = ["ScamDetector", "SCAM_DATABASE"] | |