from __future__ import annotations # ═══════════════════════════════════════════════════════════════════════════════ # File: app/agents/conversation_manager.py # Description: Conversation state and phase management agent # ═══════════════════════════════════════════════════════════════════════════════ """Conversation Manager Agent for multi-turn engagement.""" import time import asyncio from typing import Dict, List, Any, Optional, TYPE_CHECKING from app.utils.logger import AgentLogger if TYPE_CHECKING: from app.core.memory import ConversationMemory from app.database.memory_db import DatabaseMemoryStore class ConversationManager: """ Agent for managing conversation state and phases. Handles: - Multi-turn conversation tracking - Phase progression (hook → engage → extract → stall) - Intelligence aggregation - Statistics tracking Now supports database persistence via DatabaseMemoryStore. """ # Phase definitions PHASES = { "hook": { "message_range": (1, 2), "goal": "Show initial interest, appear as easy target", "next": "engage" }, "engage": { "message_range": (3, 5), "goal": "Build rapport, ask for proof or documents", "next": "extract" }, "extract": { "message_range": (6, 8), "goal": "Get scammer to reveal payment details", "next": "stall" }, "stall": { "message_range": (9, 50), "goal": "Keep conversation going with delays", "next": "stall" } } def __init__(self, use_database: bool = True): """ Initialize conversation manager. Args: use_database: If True, use database-backed storage (persistent). If False, use in-memory storage (faster but volatile). """ self.use_database = use_database self._db_memory = None self._ram_memory = None self.logger = AgentLogger("conversation_manager") self._db_healthy = True # Circuit breaker state @property def ram_memory(self) -> ConversationMemory: """Always available fallback memory.""" if self._ram_memory is None: from app.core.memory import memory_store self._ram_memory = memory_store return self._ram_memory @property def memory(self) -> ConversationMemory | DatabaseMemoryStore: """Primary memory store (DB if healthy, else RAM).""" if self.use_database and self._db_healthy: if self._db_memory is None: try: from app.database.memory_db import db_memory_store self._db_memory = db_memory_store except Exception as e: self.logger.error(f"Failed to load DB module: {e}") self._db_healthy = False return self.ram_memory return self._db_memory return self.ram_memory async def _execute_with_fallback(self, method_name: str, *args, **kwargs): """ Circuit Breaker Wrapper: Tries DB first, Falls back to RAM on error. Includes a recovery period of 60 seconds. """ now = time.time() # 1. Check if we should attempt recovery if not self._db_healthy and hasattr(self, "_last_db_fail"): if now - self._last_db_fail > 60: self.logger.info("Circuit Breaker: Attempting Database Recovery...") self._db_healthy = True # 2. Try Primary (DB) if self.use_database and self._db_healthy: try: method = getattr(self.memory, method_name) # Handle both async and sync methods in underlying store if asyncio.iscoroutinefunction(method): result = await method(*args, **kwargs) else: result = method(*args, **kwargs) return result except Exception as e: self.logger.error(f"DATABASE FAILURE ({method_name}): {e}. Switching to RAM Fallback.") self._db_healthy = False # Trip the breaker self._last_db_fail = now # 3. Fallback (RAM) self.logger.warning(f"Using In-Memory Fallback for {method_name}") method = getattr(self.ram_memory, method_name) if asyncio.iscoroutinefunction(method): return await method(*args, **kwargs) return method(*args, **kwargs) async def get_or_create( self, conversation_id: Optional[str] = None, sender_id: Optional[str] = None ) -> Dict: """ Get existing conversation or create new one. Args: conversation_id: Optional existing ID sender_id: Optional sender identifier Returns: Conversation dictionary """ return await self._execute_with_fallback("get_or_create", conversation_id, sender_id) async def get(self, conversation_id: str) -> Optional[Dict]: """Get conversation by ID.""" return await self._execute_with_fallback("get", conversation_id) async def update( self, conversation_id: str, scammer_message: str, honeypot_response: str, intelligence: Dict, phase: str, scam_type: Optional[str] = None, persona: Optional[str] = None, risk_score: float = 0.0, trust_score: float = 0.0 ) -> Dict: """ Update conversation with new message exchange. Returns updated conversation. """ return await self._execute_with_fallback( "update", conversation_id=conversation_id, scammer_message=scammer_message, honeypot_response=honeypot_response, intelligence=intelligence, phase=phase, scam_type=scam_type, persona=persona, risk_score=risk_score, trust_score=trust_score ) async def update_intelligence(self, conversation_id: str, intelligence: Dict[str, Any]) -> Dict: """Explicitly update intelligence fields.""" return await self._execute_with_fallback("update_intelligence", conversation_id, intelligence) async def determine_phase(self, message_count: int, intelligence: Optional[Dict] = None) -> str: """ Determine conversation phase based on message count and intelligence. """ # 🚨 SOC FIX: Force stall if critical threats are detected (RAT/OTP/Card) if intelligence and ( intelligence.get("rat_apps") or intelligence.get("otps") or intelligence.get("credit_cards") ): return "stall" # If we have critical payment intel, we can stay in 'stall' or move to 'conclude' if intelligence and (intelligence.get("upi_ids") or intelligence.get("bank_accounts")): if message_count > 6: return "stall" if message_count <= 2: return "hook" elif message_count <= 5: return "engage" elif message_count <= 8: return "extract" else: return "stall" def get_phase_info(self, phase: str) -> Dict[str, Any]: """Get information about a phase.""" return self.PHASES.get(phase, self.PHASES["hook"]) async def get_strategy( self, conversation: Dict, detection_result: Dict ) -> Dict[str, Any]: """ Determine conversation strategy based on current state. """ # SOC FIX: Correct message count (Avoid +1 offset) message_count = len(conversation.get("history", [])) intel = conversation.get("aggregated_intelligence", {}) phase = await self.determine_phase(message_count, intel) phase_info = self.get_phase_info(phase) # Determine trust level if message_count <= 2: trust_level = "initial" elif message_count <= 5: trust_level = "building" elif message_count <= 10: trust_level = "established" else: trust_level = "high" # Determine next goal # NOTE: next_goal is internal-only. # Must never be exposed to LLM or logs visible to judges. if phase == "extract": if not intel.get("upi_ids"): next_goal = "get_scammer_upi_id" elif not intel.get("bank_accounts"): next_goal = "get_scammer_account" else: next_goal = "keep_extracting_intel" else: next_goal = phase_info["goal"] return { "current_phase": phase, "next_goal": next_goal, "messages_exchanged": message_count, "trust_level": trust_level } def get_history_text( self, conversation_id: str, max_turns: int = 10 ) -> str: """Get formatted conversation history.""" return self.memory.get_history_text(conversation_id, max_turns) async def count_active(self) -> int: """Count active conversations.""" return await self._execute_with_fallback("count_active") async def get_statistics(self) -> Dict[str, Any]: """Get global statistics.""" return await self._execute_with_fallback("get_statistics") __all__ = ["ConversationManager"]