sentinel-scam-honeypo / app /agents /scam_detector.py
avinash-rai's picture
Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600
# app/agents/scam_detector.py - Scam detection agent
"""Hybrid LLM + keyword scam detection with SOC-grade regex and heuristics."""
import re
import json
from typing import Dict, Any, List, Optional
from collections import Counter
from app.core.llm_client import LLMClient, ModelRole
from app.core.prompts import SCAM_DETECTION_PROMPT
from app.config import settings
from app.utils.logger import AgentLogger
from app.intelligence.emotional_analyzer import emotional_analyzer
from app.utils.json_utils import robust_json_loads
# 1. Expanded Scam Taxonomy (SOC-Grade)
SCAM_DATABASE = {
"lottery_scam": {
"keywords": ["won", "winner", "lottery", "prize", "lucky draw",
"jackpot", "crore", "lakh", "claim", "congratulations",
"selected", "reward", "cash prize", "bumper", "draw"],
"regex_patterns": [
r"w[o0]n", r"pr[i1]ze", r"l[u\s]*cky", r"j[a@]ckp[o0]t",
r"c[o0]ngrat[us]", r"cr[o0]re", r"l[a@]kh"
],
"threat_level": "high",
"category": "Financial Fraud",
"persona": "elderly_excited",
"description": "Fake lottery/prize winning notification",
"risk_indicators": ["Unsolicited prize", "Advance fee request"]
},
"job_scam": {
"keywords": ["work from home", "earn money", "job offer", "hiring",
"data entry", "part time", "typing job", "vacancy",
"salary", "income", "registration fee", "joining fee"],
"regex_patterns": [
r"w[o0]rk\s+fr[o0]m\s+h[o0]me", r"e[a@]rn\s+m[o0]ney",
r"j[o0]b\s+off?er", r"p[a@]rt\s+t[i1]me", r"d[a@]ta\s+entry"
],
"threat_level": "high",
"category": "Employment Fraud",
"persona": "desperate_jobseeker",
"description": "Fake job offers requiring payment"
},
"banking_scam": {
"keywords": ["kyc", "account blocked", "verify", "bank", "otp",
"update details", "suspend", "deactivate", "pan card",
"aadhar link", "account closed", "urgent verification"],
"regex_patterns": [
r"k\.?y\.?c", r"a\/?c\s+bl[o0]ck", r"v[e3]r[i1]fy", r"o\.?t\.?p",
r"p[a@]n\s+c[a@]rd", r"a{2}dh[a@]r"
],
"threat_level": "critical",
"category": "Banking Fraud",
"persona": "worried_customer",
"description": "Fake bank/KYC verification requests"
},
"phishing_scam": {
"keywords": ["click here", "link", "update account", "security alert",
"login", "official", "customer support", "verify identity"],
"regex_patterns": [
r"cl[i1]ck", r"l[i1]nk", r"l[o0]g[i1]n", r"v[e3]r[i1]fy"
],
"threat_level": "high",
"category": "Credential Theft",
"persona": "confused_user",
"description": "Fake login/link phishing attempts"
},
"investment_scam": {
"keywords": ["invest", "guaranteed returns", "double money", "bitcoin",
"trading", "profit", "forex", "stock tips", "mutual fund",
"high returns", "100% profit", "no risk"],
"regex_patterns": [
r"inv[e3]st", r"gu[a@]r[a@]nt[e3]{2}", r"d[o0]uble", r"b[i1]tc[o0]in",
r"pr[o0]f[i1]t"
],
"threat_level": "high",
"category": "Investment Fraud",
"persona": "curious_investor",
"description": "Fraudulent investment schemes"
},
"loan_scam": {
"keywords": ["instant loan", "no documents", "low interest", "approved",
"processing fee", "pre-approved", "personal loan"],
"regex_patterns": [
r"inst[a@]nt\s+l[o0][a@]n", r"pr[e3][\s-]?appr[o0]ved", r"pr[o0]cess[i1]ng\s+f[e3]{2}"
],
"threat_level": "high",
"category": "Loan Fraud",
"persona": "needy_borrower",
"description": "Fake instant loan offers"
},
"government_scam": {
"keywords": ["tax refund", "legal notice", "arrest warrant", "police",
"court", "fine", "income tax", "cbi", "enforcement",
"legal action", "ed", "narcotics"],
"regex_patterns": [
r"p[o0]l[i1]ce", r"[a@]rrest", r"w[a@]rr[a@]nt", r"t[a@]x",
r"c\.?b\.?i", r"n[a@]rc[o0]t[i1]cs"
],
"threat_level": "critical",
"category": "Government Impersonation",
"persona": "scared_citizen",
"description": "Fake government/legal notices"
},
"delivery_scam": {
"keywords": ["package", "delivery failed", "customs", "courier",
"stuck", "pay fee", "undelivered", "reshipping"],
"regex_patterns": [
r"p[a@]ck[a@]ge", r"d[e3]l[i1]very", r"cust[o0]ms", r"c[o0]ur[i1]er"
],
"threat_level": "medium",
"category": "Delivery Fraud",
"persona": "expecting_customer",
"description": "Fake delivery/customs fee requests"
},
"tech_support_scam": {
"keywords": ["virus", "hacked", "security alert", "microsoft",
"computer problem", "remote access", "tech support"],
"regex_patterns": [
r"v[i1]rus", r"h[a@]ck[e3]d", r"m[i1]cr[o0]s[o0]ft", r"rem[o0]te"
],
"threat_level": "medium",
"category": "Tech Support Fraud",
"persona": "confused_elderly",
"description": "Fake tech support queries"
},
"romance_scam": {
"keywords": ["love you", "relationship", "lonely", "marriage",
"stuck abroad", "need money", "emergency", "gift"],
"threat_level": "high",
"category": "Romance Fraud",
"persona": "lonely_victim",
"description": "Fake romantic interest for money"
},
"crypto_scam": {
"keywords": ["crypto", "ethereum", "wallet", "airdrop",
"free coins", "blockchain", "trading bot"],
"threat_level": "high",
"category": "Crypto Fraud",
"persona": "crypto_curious",
"description": "Cryptocurrency fraud"
},
# 🆕 NEW VECTORS (SOC Recommendation)
"sim_swap_scam": {
"keywords": ["sim card", "upgrade 4g", "5g upgrade", "sim block",
"network issue", "port number", "esim activation"],
"regex_patterns": [r"s[i1]m\s*c[a@]rd", r"5g\s*upgr[a@]de", r"e-?s[i1]m"],
"threat_level": "critical",
"category": "Telecom Fraud",
"persona": "tech_illiterate",
"description": "SIM swap/eSIM activation fraud"
},
"qr_code_scam": {
"keywords": ["scan code", "qr code", "receive payment", "scan to pay",
"gpay qr", "phonepe qr", "paytm qr"],
"regex_patterns": [r"qr\s*c[o0]de", r"sc[a@]n", r"rec[ei]ve\s*p[a@]yment"],
"threat_level": "high",
"category": "Payment Fraud",
"persona": "shopkeeper",
"description": "QR Code payment reversal scam"
},
"refund_scam": {
"keywords": ["refund", "wrong transaction", "money sent by mistake",
"return money", "cashback", "refund processed"],
"regex_patterns": [r"ref[u]?nd", r"c[a@]shb[a@]ck", r"wr[o0]ng\s*tr[a@]ns"],
"threat_level": "medium",
"category": "Refund Fraud",
"persona": "honest_person",
"description": "Fake accidental transfer refund"
},
"fake_support": {
"keywords": ["customer care", "helpline", "support number",
"complaint", "toll free", "service center"],
"regex_patterns": [r"cust[o0]mer\s*c[a@]re", r"h[e3]lp\s*l[i1]ne"],
"threat_level": "high",
"category": "Impersonation",
"persona": "angry_customer",
"description": "Fake customer support numbers"
},
"deepfake_scam": {
"keywords": ["voice", "audio", "video call", "urgent need",
"accident", "jail", "kidnapped"],
"regex_patterns": [r"v[o0][i1]ce", r"v[i1]de[o0]", r"k[i1]dn[a@]p"],
"threat_level": "critical",
"category": "Deepfake/AI Fraud",
"persona": "scared_relative",
"description": "AI-generated voice/video impersonation"
}
}
class ScamDetector:
"""
Scam Detection Agent using hybrid approach:
1. SOC-grade Fuzzy Regex pre-filtering
2. LLM-based accurate classification
3. Weighted Confidence Ensemble
"""
def __init__(self, llm_client: Optional[LLMClient] = None):
self.llm_client = llm_client
self.logger = AgentLogger("scam_detector")
self._compile_regexes()
def _compile_regexes(self):
"""Pre-compile regex patterns for performance optimization."""
self.compiled_patterns = {}
for scam_type, data in SCAM_DATABASE.items():
patterns = data.get("regex_patterns", [])
# Also escape and use raw keywords as fallback
for kw in data["keywords"]:
# SOC FIX: Only auto-add single-token keywords (precision)
if " " not in kw and len(kw) > 3:
patterns.append(re.escape(kw))
# Join into one massive Optimized Regex per scam type
full_pattern = "|".join(patterns)
if full_pattern:
self.compiled_patterns[scam_type] = re.compile(full_pattern, re.IGNORECASE)
def detect_heuristic(self, message: str) -> Dict[str, Any]:
"""Public alias for SOC-grade regex detection (Zero Latency)."""
return self._keyword_detection(message)
async def detect(self, message: str, context: Optional[Any] = None, turn_count: int = 1) -> Dict[str, Any]:
"""Hybrid detection pipeline."""
self.logger.debug("Detecting scam", message_length=len(message))
# Step 1: SOC-Grade Regex Parsing
keyword_result = self._keyword_detection(message)
# 🔥 LATENCY OPTIMIZATION: HEURISTIC FAST-PATH (Refined Thresholds)
# Turn 1: Require extremely high confidence (> 0.85) to avoid false positives.
# Turn 2+: Lower threshold to 0.70 since we already have session context.
threshold = 0.85 if turn_count <= 1 else 0.70
if keyword_result.get("confidence", 0) >= threshold:
self.logger.info(
f"⚡ FAST-PATH TRIGGERED (Turn {turn_count}): Skipping LLM Detection",
scam_type=keyword_result["scam_type"],
confidence=keyword_result["confidence"]
)
# Ensure essential keys exist
# SOC FIX: Populate intent for Fast-Path compatibility
keyword_result["intent"] = "money_theft"
keyword_result["agent_notes"] = keyword_result.get("agent_notes", "") + f" [FAST-PATH: REGEX (T{turn_count})]"
# Still perform emotional analysis (It's fast, regex-based)
emotional_profile = emotional_analyzer.analyze(message)
keyword_result["emotional_profile"] = emotional_profile.to_dict()
return keyword_result.copy()
# Step 2: LLM detection (Only if Regex was unsure)
llm_result = None
if settings.ENABLE_LLM_DETECTION and self.llm_client and self.llm_client.is_available:
# Check context to prevent redundant calls if already decided elsewhere
if context and context.scam_decided:
self.logger.info("Decision already made in context, skipping LLM detection.")
else:
llm_result = await self._llm_detection(message, context=context)
# Step 3: Ensemble Logic
if llm_result:
final_result = self._combine_results(keyword_result, llm_result)
else:
final_result = keyword_result
# 🔥 Step 4: Behavioral & Emotional Analysis (NEW CONNECTION)
# Adds research-backed behavioral scoring (Urgency/Fear/Greed)
emotional_profile = emotional_analyzer.analyze(message)
final_result["emotional_profile"] = emotional_profile.to_dict()
# Boost confidence if high emotional manipulation is detected
if emotional_profile.overall_manipulation > 0.6:
final_result["confidence"] = min(1.0, final_result["confidence"] + 0.1)
final_result["threat_level"] = "critical" if final_result["confidence"] > 0.9 else final_result["threat_level"]
# Log decision with agent notes (HK Bonus)
self.logger.info(
"Scam detected with emotional profile",
scam_type=final_result["scam_type"],
confidence=final_result["confidence"],
tactic=emotional_profile.primary_tactic
)
return final_result
def _keyword_detection(self, message: str) -> Dict[str, Any]:
"""Fuzzy regex-based detection with weighted confidence."""
best_match = None
max_matches = 0
matched_keywords = [] # Actually matched patterns
# O(N) Scan using pre-compiled regex
for scam_type, pattern in self.compiled_patterns.items():
matches = pattern.findall(message)
unique_matches = list(set(matches)) # Count unique hits higher
if len(unique_matches) > max_matches:
max_matches = len(unique_matches)
best_match = scam_type
matched_keywords = unique_matches
if max_matches == 0:
return {
"is_scam": False,
"scam_type": "not_scam",
"confidence": 0.0, # Explicit 0.0 for non-scam
"threat_level": "none",
"category": "Safe",
"matched_keywords": [],
"risk_indicators": [],
"description": "No known scam pattern detected"
}
# Weighted Confidence Formula (TF-IDF Inspired)
# SOC FIX: Clamp low evidence matches to prevent accidental escalation
if max_matches == 1:
confidence = 0.35
elif max_matches == 2:
confidence = 0.55
else:
confidence = min(0.95, 0.4 + (max_matches / 3.0) * 0.4)
scam_data = SCAM_DATABASE[best_match]
return {
"is_scam": True,
"scam_type": best_match,
"confidence": round(confidence, 2),
"threat_level": scam_data["threat_level"],
"category": scam_data["category"],
"matched_keywords": matched_keywords,
"risk_indicators": scam_data.get("risk_indicators", ["Pattern Match"]),
"description": scam_data["description"],
"persona": scam_data["persona"],
"agent_notes": f"High-confidence pattern match: {', '.join(matched_keywords)}"
}
async def _llm_detection(self, message: str, context: Optional[Any] = None) -> Optional[Dict[str, Any]]:
"""LLM-based detection with Strict Schema Sync."""
try:
# 1. Dynamic Enum Sync (Fixes Strict Mode 400 Errors)
scam_enum = list(SCAM_DATABASE.keys()) + ["unknown", "novel_scam"]
schema = {
"type": "object",
"properties": {
"is_scam": {"type": "boolean"},
"scam_type": {
"type": "string",
"enum": scam_enum
},
"confidence": {"type": "number"},
"threat_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]},
"intent": {"type": "string"},
"risk_indicators": {"type": "array", "items": {"type": "string"}},
"reasoning": {"type": "string"}
},
"required": ["is_scam", "scam_type", "confidence", "threat_level", "reasoning"]
}
# --- CACHE-OPTIMIZED MODULAR PROMPT ---
# We split the prompt into Static System (Taxonomy + Rules) and Dynamic User (The Message)
# This ensures the 1000+ token Taxonomy is cached globally for all detection calls.
from app.core.prompts import SCAM_DETECTION_PROMPT
# Extract system part (up to ### EXPECTED OUTPUT or ### INPUT)
system_part = SCAM_DETECTION_PROMPT.split("### INPUT")[0].strip()
messages = [
{"role": "system", "content": system_part},
{"role": "user", "content": f"MESSAGE TO ANALYZE:\n\"\"\"{message}\"\"\""}
]
response = await self.llm_client.generate_with_cot(
prompt="", # Required positional arg but we'll use messages instead
messages=messages,
schema=schema
)
# ⚡ Extraction from LLMResponse
if not response or not response.content:
return None
try:
res = json.loads(response.content)
except Exception:
# ⚡ SELF-HEALING: If structured failed but returned a string slug
content = response.content.strip()
# SAFETY GUARD: If content is too long, it's likely a hallucinated reply, not a type slug
if len(content) > 50:
self.logger.warning("LLM returned conversational text instead of scam_type", content_excerpt=content[:50])
return None # Fallback to regex/heuristic
# SOC FIX: Use allowlist for non-scam detection
non_scam_tokens = {"non_scam", "safe", "legit", "not_scam"}
res = {
"is_scam": content.lower() not in non_scam_tokens,
"scam_type": content,
"confidence": 0.9,
"threat_level": "medium",
"intent": "unknown",
"reasoning": "Direct slug extraction fallback",
"risk_indicators": ["String-only LLM output"]
}
# 2. SOC Normalization (Self-Healing & Schema Compliance)
final_res = self._validate_json(res)
# Capture reasoning into the dict for downstream compatibility
if response.reasoning:
final_res["agent_notes"] = response.reasoning
return final_res
except Exception as e:
self.logger.error(f"LLM detection failed: {e}")
return None
def _parse_llm_response(self, response: str) -> Optional[Dict[str, Any]]:
"""Robust JSON parsing with multiple fallbacks."""
data = robust_json_loads(response)
if data:
return self._validate_json(data)
return None
def _validate_json(self, data: Dict) -> Dict:
"""Validate and normalize LLM JSON output."""
return {
"is_scam": data.get("is_scam", False),
"scam_type": data.get("scam_type", "unknown"),
"confidence": float(data.get("confidence", 0.5)),
"threat_level": data.get("threat_level", "medium"),
"risk_indicators": data.get("risk_indicators", []),
"agent_notes": data.get("reasoning", "") or data.get("description", "")
}
def _combine_results(
self,
keyword_result: Dict,
llm_result: Dict
) -> Dict[str, Any]:
"""Ensemble Voting Logic (SOC Standard)."""
# SOC FIX: Prevent mutation of original inputs
keyword_result = keyword_result.copy()
llm_result = llm_result.copy()
kw_conf = keyword_result.get("confidence", 0)
llm_conf = llm_result.get("confidence", 0)
# Rule 1: High-confidence Keyword > Low-confidence LLM
# (Regex is deterministic, LLMs hallucinate)
if kw_conf > 0.8:
final = keyword_result
final["agent_notes"] += f" (Confirmed by verified regex pattern)"
# Boost confidence slightly if LLM agrees
if llm_result.get("is_scam"):
final["confidence"] = min(0.99, kw_conf + 0.05)
# Ensure indicators are merged
final["risk_indicators"] = list(set(final.get("risk_indicators", []) + llm_result.get("risk_indicators", [])))
return final
# Rule 2: High-confidence LLM > Weak Keyword
if llm_conf > 0.7 and kw_conf < 0.4:
result = llm_result
result["matched_keywords"] = keyword_result.get("matched_keywords", [])
return result
# Rule 3: Agreement = High Confidence
if keyword_result.get("is_scam") and llm_result.get("is_scam"):
avg_conf = (kw_conf + llm_conf) / 2
# Boost logic: agreement warrants higher trust
boosted_conf = min(0.98, avg_conf + 0.15)
result = llm_result # Prefer LLM's classification specificity
result["confidence"] = round(boosted_conf, 2)
result["matched_keywords"] = keyword_result.get("matched_keywords", [])
current_notes = result.get("agent_notes", "")
result["agent_notes"] = f"{current_notes} | Regex detected: {result.get('matched_keywords', [])}"
return result
# Default: Average both
final = keyword_result if kw_conf > llm_conf else llm_result
final["confidence"] = round((kw_conf + llm_conf) / 2, 2)
return final
def get_persona_for_scam(self, scam_type: str) -> str:
"""Get recommended persona for scam type."""
if scam_type in SCAM_DATABASE:
return SCAM_DATABASE[scam_type].get("persona", "elderly_excited")
return "elderly_excited"
def get_scam_info(self, scam_type: str) -> Dict[str, Any]:
"""Get information about a scam type."""
return SCAM_DATABASE.get(scam_type, {})
# Export for import
__all__ = ["ScamDetector", "SCAM_DATABASE"]