# app/agents/adaptive_strategy.py # Module: Adaptive Behavioral Logic Layer # Description: Implements dynamic counter-intelligence strategies based on threat actor psychology. # Adjusts engagement tactics (Urgency, Confusion, Compliance) in real-time. """ Adaptive Strategy Agent. This module is responsible for the dynamic analysis of threat actor inputs and the real-time adjustment of honeypot behavior. It uses behavioral heuristics to counteract specific social engineering tactics (e.g., urgency, reassurance). """ from typing import Dict, Any, List, Optional from app.utils.logger import AgentLogger class AdaptiveStrategyAgent: """ Adaptive Strategy Agent that modifies honeypot behavior based on scammer's responses and conversation dynamics. This makes the honeypot appear more human and extract more intelligence by adapting to scammer tactics. """ # Scammer behavior patterns to detect BEHAVIOR_PATTERNS = { "impatient": { "keywords": ["jaldi", "fast", "hurry", "now", "abhi", "urgent", "immediately"], "strategy": "speed_up_payment_offer", "response_modifier": "Show more urgency, claim you're about to pay" }, "suspicious": { "keywords": ["fake", "fraud", "scam", "real", "genuine", "proof", "verify"], "strategy": "add_confusion_delay", "response_modifier": "Act confused, say you don't understand technical terms" # Downgraded from 'ask for proof' to avoid direct confrontation }, "aggressive": { "keywords": ["police", "complaint", "action", "block", "cancel", "angry"], "strategy": "show_fear_compliance", "response_modifier": "Act scared, promise to comply quickly" }, "pushing_payment": { "keywords": ["send", "transfer", "pay", "amount", "fee", "deposit"], "strategy": "request_their_details", "response_modifier": "Ask for their payment info to 'verify'" }, "reassuring": { "keywords": ["trust", "safe", "guaranteed", "promise", "sure", "100%"], "strategy": "show_interest_extract", "response_modifier": "Show trust, ask for more details to proceed" } } # Intelligence gaps that need filling INTELLIGENCE_PRIORITIES = [ ("upi_ids", "UPI ID", "UPI ID bhejo verify karne ke liye"), ("phone_numbers", "phone", "Callback number do apna"), ("bank_accounts", "bank account", "Account number batao transfer ke liye"), ("urls", "website", "Website link bhejo dekh lun"), ] def __init__(self): self.logger = AgentLogger("adaptive_strategy") async def analyze_scammer_behavior(self, message: str) -> Dict[str, Any]: """ Analyze scammer's message for behavioral patterns. """ message_lower = message.lower() detected_behaviors = [] # 1. Check Hardcoded Patterns (Fast) for behavior, config in self.BEHAVIOR_PATTERNS.items(): matches = [kw for kw in config["keywords"] if kw in message_lower] if matches: detected_behaviors.append({ "behavior": behavior, "matched_keywords": matches, "strategy": config["strategy"], "modifier": config["response_modifier"] }) # 2. Return primary behavior (most matches) if detected_behaviors: primary = max(detected_behaviors, key=lambda x: len(x["matched_keywords"])) self.logger.info( "Scammer behavior detected", behavior=primary["behavior"], strategy=primary["strategy"] ) return primary return {"behavior": "neutral", "strategy": "continue_normal", "modifier": None} def get_intelligence_gap(self, intelligence: Dict, phase: str = "engage") -> Optional[Dict[str, str]]: """ Identify what intelligence is still missing. Args: intelligence: Currently extracted intelligence Returns: Gap info or None if all collected """ for key, label, prompt in self.INTELLIGENCE_PRIORITIES: # SOC FIX: Prioritize intel by phase (Don't ask for banks in hook phase) if key == "bank_accounts" and phase != "extract": continue if not intelligence.get(key): return { "type": key, "label": label, "prompt": prompt } return None # Variations for common adaptive strategies to avoid repetition STRATEGY_VARIATIONS = { "speed_up": [ "Main abhi kar raha hoon, bas 2 minute!", "Ruko ruko, abhi karta hoon transfer.", "Haan, processing fee ready hai, bas ek second.", "Bas abhi phone se kar raha hoon, line pe raho." ], "confused": [ "Beta samajh nahi aaya, thoda aur explain karo? Main confuse ho gaya.", "Arre, ye kya keh rahe ho? Mujhe samajh nahi aa raha.", "Ek baar aur batao, thoda slow. Main thoda confused hoon.", "Mujhe clear nahi hua, fir se samjhaoge please?" ], "scared": [ "Haan haan sir! Mat karo complaint! Main abhi karta hoon! Batao kya karun!", "Sir please police ko mat bulana! Main abhi payment kar raha hoon!", "Main darr gaya hoon, please wait. Main abhi sab details bhejunga.", "Theek hai theek hai! Jo keh rahe ho wahi karunga, bas complaint mat karna." ] } def adapt_response( self, base_response: str, scammer_behavior: Dict, intelligence_gap: Optional[Dict], phase: str ) -> str: """ Adapt the base response based on strategy analysis. """ import random strategy = scammer_behavior.get("strategy", "continue_normal") # In extract phase with missing intel, prioritize getting it if phase == "extract" and intelligence_gap: # If base response already asks for intel, don't overwrite it if intelligence_gap["label"].lower() in base_response.lower(): return base_response return intelligence_gap["prompt"] # SOC FIX: Length Guard (Prevent long responses) if len(base_response.split()) > 12: return base_response # Apply strategy-specific adaptations with variety if strategy == "speed_up_payment_offer": suffix = random.choice(self.STRATEGY_VARIATIONS["speed_up"]) return f"{base_response} {suffix}" elif strategy == "add_confusion_delay": # If base response contains confusion keywords, just use it confusion_terms = ["confuse", "samajh", "batao", "explain", "uncle", "wait"] if any(term in base_response.lower() for term in confusion_terms): return base_response return random.choice(self.STRATEGY_VARIATIONS["confused"]) elif strategy == "show_fear_compliance": # SOC FIX: Gate Fear Compliance by Phase (Avoid self-burn in early phases) if phase not in ["extract", "stall"]: return random.choice(self.STRATEGY_VARIATIONS["confused"]) # Downgrade to confusion if "police" in base_response.lower() or "sir" in base_response.lower(): return base_response return random.choice(self.STRATEGY_VARIATIONS["scared"]) elif strategy == "request_their_details": if intelligence_gap: if intelligence_gap['label'].lower() in base_response.lower(): return base_response return f"Main ready hoon! Pehle apna {intelligence_gap['label']} bhejo verify karne ke liye." # SOC FIX: Soften Extraction Phrasing (Verification Framing) return base_response if base_response else "Pehle aapka UPI ID bhejo, verify karke hi payment hota hai na?" elif strategy == "show_interest_extract": if any(term in base_response.lower() for term in ["detail", "bhejo", "acha"]): return base_response return f"{base_response} Acha lagta hai! Sab details bhejo, main abhi start karta hoon!" return base_response def get_escalation_recommendation( self, conversation: Dict, intelligence: Dict ) -> Dict[str, Any]: """ Recommend whether to escalate or continue engagement. Returns recommendation for the orchestrator. """ message_count = len(conversation.get("history", [])) has_upi = bool(intelligence.get("upi_ids")) has_phone = bool(intelligence.get("phone_numbers")) has_account = bool(intelligence.get("bank_accounts")) # Calculate value of continuing intel_score = sum([has_upi, has_phone, has_account]) # 1. Critical Intelligence Threshold: Detect high-confidence phishing indicators. # This triggers immediate reporting protocols when actionable intelligence (URLs) is verified. has_url = bool(intelligence.get("urls")) or bool(intelligence.get("phishingLinks")) if has_url: return { "action": "can_conclude", "reason": "Critical Intelligence (Phishing Link) captured. Threshold exceeded.", "intel_score": intel_score + 5 } # 2. Urgent Threat Block: Detect immediate financial coercion patterns. # If text contains "urgent", "block", "immediately", prioritize intervention. keywords = intelligence.get("keywords", []) urgency_terms = ["urgent", "immediately", "block", "suspend", "24 hours", "otp", "compromised"] is_urgent = any(term in str(keywords).lower() for term in urgency_terms) # If we have keywords OR the message text itself implies urgency (hack: check intel context) # Note: We rely on keywords extracted by ScamDetector for this context. # SOC FIX: Hard Intel Gate (Require at least ONE hard indicator + urgency to conclude) if is_urgent and (has_phone or has_upi or has_account or has_url): return { "action": "can_conclude", "reason": "Critical Banking Threat (Block/Suspend Coercion) detected with Identifiers.", "intel_score": intel_score + 4 } # 3. Technical Threat Vector: Detect Remote Access Tools (RATs) and OTP requests. # These indicators signify an active, high-risk session requiring immediate conclusion. has_rat = bool(intelligence.get("rat_apps")) has_otp = bool(intelligence.get("otps")) has_card = bool(intelligence.get("credit_cards")) if (has_rat or has_otp or has_card): return { "action": "can_conclude", "reason": "High-Risk Threat Vector (RAT/OTP/Financial Info) detected.", "intel_score": intel_score + 10 # Massive Bonus } # If we already have good intel and many messages, can consider wrapping up if intel_score >= 2 and message_count > 10: return { "action": "can_conclude", "reason": "Sufficient intelligence collected", "intel_score": intel_score } # If few messages, keep going regardless if message_count < 5: return { "action": "continue_engagement", "reason": "Building rapport phase", "intel_score": intel_score } # If no intel yet, push harder if intel_score == 0: return { "action": "escalate_extraction", "reason": "No intelligence collected yet", "intel_score": intel_score } return { "action": "continue_engagement", "reason": "More intelligence possible", "intel_score": intel_score } __all__ = ["AdaptiveStrategyAgent"]