Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600 | # app/agents/adaptive_strategy.py | |
| # Module: Adaptive Behavioral Logic Layer | |
| # Description: Implements dynamic counter-intelligence strategies based on threat actor psychology. | |
| # Adjusts engagement tactics (Urgency, Confusion, Compliance) in real-time. | |
| """ | |
| Adaptive Strategy Agent. | |
| This module is responsible for the dynamic analysis of threat actor inputs and the | |
| real-time adjustment of honeypot behavior. It uses behavioral heuristics to | |
| counteract specific social engineering tactics (e.g., urgency, reassurance). | |
| """ | |
| from typing import Dict, Any, List, Optional | |
| from app.utils.logger import AgentLogger | |
| class AdaptiveStrategyAgent: | |
| """ | |
| Adaptive Strategy Agent that modifies honeypot behavior | |
| based on scammer's responses and conversation dynamics. | |
| This makes the honeypot appear more human and extract | |
| more intelligence by adapting to scammer tactics. | |
| """ | |
| # Scammer behavior patterns to detect | |
| BEHAVIOR_PATTERNS = { | |
| "impatient": { | |
| "keywords": ["jaldi", "fast", "hurry", "now", "abhi", "urgent", "immediately"], | |
| "strategy": "speed_up_payment_offer", | |
| "response_modifier": "Show more urgency, claim you're about to pay" | |
| }, | |
| "suspicious": { | |
| "keywords": ["fake", "fraud", "scam", "real", "genuine", "proof", "verify"], | |
| "strategy": "add_confusion_delay", | |
| "response_modifier": "Act confused, say you don't understand technical terms" # Downgraded from 'ask for proof' to avoid direct confrontation | |
| }, | |
| "aggressive": { | |
| "keywords": ["police", "complaint", "action", "block", "cancel", "angry"], | |
| "strategy": "show_fear_compliance", | |
| "response_modifier": "Act scared, promise to comply quickly" | |
| }, | |
| "pushing_payment": { | |
| "keywords": ["send", "transfer", "pay", "amount", "fee", "deposit"], | |
| "strategy": "request_their_details", | |
| "response_modifier": "Ask for their payment info to 'verify'" | |
| }, | |
| "reassuring": { | |
| "keywords": ["trust", "safe", "guaranteed", "promise", "sure", "100%"], | |
| "strategy": "show_interest_extract", | |
| "response_modifier": "Show trust, ask for more details to proceed" | |
| } | |
| } | |
| # Intelligence gaps that need filling | |
| INTELLIGENCE_PRIORITIES = [ | |
| ("upi_ids", "UPI ID", "UPI ID bhejo verify karne ke liye"), | |
| ("phone_numbers", "phone", "Callback number do apna"), | |
| ("bank_accounts", "bank account", "Account number batao transfer ke liye"), | |
| ("urls", "website", "Website link bhejo dekh lun"), | |
| ] | |
| def __init__(self): | |
| self.logger = AgentLogger("adaptive_strategy") | |
| async def analyze_scammer_behavior(self, message: str) -> Dict[str, Any]: | |
| """ | |
| Analyze scammer's message for behavioral patterns. | |
| """ | |
| message_lower = message.lower() | |
| detected_behaviors = [] | |
| # 1. Check Hardcoded Patterns (Fast) | |
| for behavior, config in self.BEHAVIOR_PATTERNS.items(): | |
| matches = [kw for kw in config["keywords"] if kw in message_lower] | |
| if matches: | |
| detected_behaviors.append({ | |
| "behavior": behavior, | |
| "matched_keywords": matches, | |
| "strategy": config["strategy"], | |
| "modifier": config["response_modifier"] | |
| }) | |
| # 2. Return primary behavior (most matches) | |
| if detected_behaviors: | |
| primary = max(detected_behaviors, key=lambda x: len(x["matched_keywords"])) | |
| self.logger.info( | |
| "Scammer behavior detected", | |
| behavior=primary["behavior"], | |
| strategy=primary["strategy"] | |
| ) | |
| return primary | |
| return {"behavior": "neutral", "strategy": "continue_normal", "modifier": None} | |
| def get_intelligence_gap(self, intelligence: Dict, phase: str = "engage") -> Optional[Dict[str, str]]: | |
| """ | |
| Identify what intelligence is still missing. | |
| Args: | |
| intelligence: Currently extracted intelligence | |
| Returns: | |
| Gap info or None if all collected | |
| """ | |
| for key, label, prompt in self.INTELLIGENCE_PRIORITIES: | |
| # SOC FIX: Prioritize intel by phase (Don't ask for banks in hook phase) | |
| if key == "bank_accounts" and phase != "extract": | |
| continue | |
| if not intelligence.get(key): | |
| return { | |
| "type": key, | |
| "label": label, | |
| "prompt": prompt | |
| } | |
| return None | |
| # Variations for common adaptive strategies to avoid repetition | |
| STRATEGY_VARIATIONS = { | |
| "speed_up": [ | |
| "Main abhi kar raha hoon, bas 2 minute!", | |
| "Ruko ruko, abhi karta hoon transfer.", | |
| "Haan, processing fee ready hai, bas ek second.", | |
| "Bas abhi phone se kar raha hoon, line pe raho." | |
| ], | |
| "confused": [ | |
| "Beta samajh nahi aaya, thoda aur explain karo? Main confuse ho gaya.", | |
| "Arre, ye kya keh rahe ho? Mujhe samajh nahi aa raha.", | |
| "Ek baar aur batao, thoda slow. Main thoda confused hoon.", | |
| "Mujhe clear nahi hua, fir se samjhaoge please?" | |
| ], | |
| "scared": [ | |
| "Haan haan sir! Mat karo complaint! Main abhi karta hoon! Batao kya karun!", | |
| "Sir please police ko mat bulana! Main abhi payment kar raha hoon!", | |
| "Main darr gaya hoon, please wait. Main abhi sab details bhejunga.", | |
| "Theek hai theek hai! Jo keh rahe ho wahi karunga, bas complaint mat karna." | |
| ] | |
| } | |
| def adapt_response( | |
| self, | |
| base_response: str, | |
| scammer_behavior: Dict, | |
| intelligence_gap: Optional[Dict], | |
| phase: str | |
| ) -> str: | |
| """ | |
| Adapt the base response based on strategy analysis. | |
| """ | |
| import random | |
| strategy = scammer_behavior.get("strategy", "continue_normal") | |
| # In extract phase with missing intel, prioritize getting it | |
| if phase == "extract" and intelligence_gap: | |
| # If base response already asks for intel, don't overwrite it | |
| if intelligence_gap["label"].lower() in base_response.lower(): | |
| return base_response | |
| return intelligence_gap["prompt"] | |
| # SOC FIX: Length Guard (Prevent long responses) | |
| if len(base_response.split()) > 12: | |
| return base_response | |
| # Apply strategy-specific adaptations with variety | |
| if strategy == "speed_up_payment_offer": | |
| suffix = random.choice(self.STRATEGY_VARIATIONS["speed_up"]) | |
| return f"{base_response} {suffix}" | |
| elif strategy == "add_confusion_delay": | |
| # If base response contains confusion keywords, just use it | |
| confusion_terms = ["confuse", "samajh", "batao", "explain", "uncle", "wait"] | |
| if any(term in base_response.lower() for term in confusion_terms): | |
| return base_response | |
| return random.choice(self.STRATEGY_VARIATIONS["confused"]) | |
| elif strategy == "show_fear_compliance": | |
| # SOC FIX: Gate Fear Compliance by Phase (Avoid self-burn in early phases) | |
| if phase not in ["extract", "stall"]: | |
| return random.choice(self.STRATEGY_VARIATIONS["confused"]) # Downgrade to confusion | |
| if "police" in base_response.lower() or "sir" in base_response.lower(): | |
| return base_response | |
| return random.choice(self.STRATEGY_VARIATIONS["scared"]) | |
| elif strategy == "request_their_details": | |
| if intelligence_gap: | |
| if intelligence_gap['label'].lower() in base_response.lower(): | |
| return base_response | |
| return f"Main ready hoon! Pehle apna {intelligence_gap['label']} bhejo verify karne ke liye." | |
| # SOC FIX: Soften Extraction Phrasing (Verification Framing) | |
| return base_response if base_response else "Pehle aapka UPI ID bhejo, verify karke hi payment hota hai na?" | |
| elif strategy == "show_interest_extract": | |
| if any(term in base_response.lower() for term in ["detail", "bhejo", "acha"]): | |
| return base_response | |
| return f"{base_response} Acha lagta hai! Sab details bhejo, main abhi start karta hoon!" | |
| return base_response | |
| def get_escalation_recommendation( | |
| self, | |
| conversation: Dict, | |
| intelligence: Dict | |
| ) -> Dict[str, Any]: | |
| """ | |
| Recommend whether to escalate or continue engagement. | |
| Returns recommendation for the orchestrator. | |
| """ | |
| message_count = len(conversation.get("history", [])) | |
| has_upi = bool(intelligence.get("upi_ids")) | |
| has_phone = bool(intelligence.get("phone_numbers")) | |
| has_account = bool(intelligence.get("bank_accounts")) | |
| # Calculate value of continuing | |
| intel_score = sum([has_upi, has_phone, has_account]) | |
| # 1. Critical Intelligence Threshold: Detect high-confidence phishing indicators. | |
| # This triggers immediate reporting protocols when actionable intelligence (URLs) is verified. | |
| has_url = bool(intelligence.get("urls")) or bool(intelligence.get("phishingLinks")) | |
| if has_url: | |
| return { | |
| "action": "can_conclude", | |
| "reason": "Critical Intelligence (Phishing Link) captured. Threshold exceeded.", | |
| "intel_score": intel_score + 5 | |
| } | |
| # 2. Urgent Threat Block: Detect immediate financial coercion patterns. | |
| # If text contains "urgent", "block", "immediately", prioritize intervention. | |
| keywords = intelligence.get("keywords", []) | |
| urgency_terms = ["urgent", "immediately", "block", "suspend", "24 hours", "otp", "compromised"] | |
| is_urgent = any(term in str(keywords).lower() for term in urgency_terms) | |
| # If we have keywords OR the message text itself implies urgency (hack: check intel context) | |
| # Note: We rely on keywords extracted by ScamDetector for this context. | |
| # SOC FIX: Hard Intel Gate (Require at least ONE hard indicator + urgency to conclude) | |
| if is_urgent and (has_phone or has_upi or has_account or has_url): | |
| return { | |
| "action": "can_conclude", | |
| "reason": "Critical Banking Threat (Block/Suspend Coercion) detected with Identifiers.", | |
| "intel_score": intel_score + 4 | |
| } | |
| # 3. Technical Threat Vector: Detect Remote Access Tools (RATs) and OTP requests. | |
| # These indicators signify an active, high-risk session requiring immediate conclusion. | |
| has_rat = bool(intelligence.get("rat_apps")) | |
| has_otp = bool(intelligence.get("otps")) | |
| has_card = bool(intelligence.get("credit_cards")) | |
| if (has_rat or has_otp or has_card): | |
| return { | |
| "action": "can_conclude", | |
| "reason": "High-Risk Threat Vector (RAT/OTP/Financial Info) detected.", | |
| "intel_score": intel_score + 10 # Massive Bonus | |
| } | |
| # If we already have good intel and many messages, can consider wrapping up | |
| if intel_score >= 2 and message_count > 10: | |
| return { | |
| "action": "can_conclude", | |
| "reason": "Sufficient intelligence collected", | |
| "intel_score": intel_score | |
| } | |
| # If few messages, keep going regardless | |
| if message_count < 5: | |
| return { | |
| "action": "continue_engagement", | |
| "reason": "Building rapport phase", | |
| "intel_score": intel_score | |
| } | |
| # If no intel yet, push harder | |
| if intel_score == 0: | |
| return { | |
| "action": "escalate_extraction", | |
| "reason": "No intelligence collected yet", | |
| "intel_score": intel_score | |
| } | |
| return { | |
| "action": "continue_engagement", | |
| "reason": "More intelligence possible", | |
| "intel_score": intel_score | |
| } | |
| __all__ = ["AdaptiveStrategyAgent"] | |