Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600 | from __future__ import annotations | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # File: app/agents/conversation_manager.py | |
| # Description: Conversation state and phase management agent | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| """Conversation Manager Agent for multi-turn engagement.""" | |
| import time | |
| import asyncio | |
| from typing import Dict, List, Any, Optional, TYPE_CHECKING | |
| from app.utils.logger import AgentLogger | |
| if TYPE_CHECKING: | |
| from app.core.memory import ConversationMemory | |
| from app.database.memory_db import DatabaseMemoryStore | |
| class ConversationManager: | |
| """ | |
| Agent for managing conversation state and phases. | |
| Handles: | |
| - Multi-turn conversation tracking | |
| - Phase progression (hook → engage → extract → stall) | |
| - Intelligence aggregation | |
| - Statistics tracking | |
| Now supports database persistence via DatabaseMemoryStore. | |
| """ | |
| # Phase definitions | |
| PHASES = { | |
| "hook": { | |
| "message_range": (1, 2), | |
| "goal": "Show initial interest, appear as easy target", | |
| "next": "engage" | |
| }, | |
| "engage": { | |
| "message_range": (3, 5), | |
| "goal": "Build rapport, ask for proof or documents", | |
| "next": "extract" | |
| }, | |
| "extract": { | |
| "message_range": (6, 8), | |
| "goal": "Get scammer to reveal payment details", | |
| "next": "stall" | |
| }, | |
| "stall": { | |
| "message_range": (9, 50), | |
| "goal": "Keep conversation going with delays", | |
| "next": "stall" | |
| } | |
| } | |
| def __init__(self, use_database: bool = True): | |
| """ | |
| Initialize conversation manager. | |
| Args: | |
| use_database: If True, use database-backed storage (persistent). | |
| If False, use in-memory storage (faster but volatile). | |
| """ | |
| self.use_database = use_database | |
| self._db_memory = None | |
| self._ram_memory = None | |
| self.logger = AgentLogger("conversation_manager") | |
| self._db_healthy = True # Circuit breaker state | |
| def ram_memory(self) -> ConversationMemory: | |
| """Always available fallback memory.""" | |
| if self._ram_memory is None: | |
| from app.core.memory import memory_store | |
| self._ram_memory = memory_store | |
| return self._ram_memory | |
| def memory(self) -> ConversationMemory | DatabaseMemoryStore: | |
| """Primary memory store (DB if healthy, else RAM).""" | |
| if self.use_database and self._db_healthy: | |
| if self._db_memory is None: | |
| try: | |
| from app.database.memory_db import db_memory_store | |
| self._db_memory = db_memory_store | |
| except Exception as e: | |
| self.logger.error(f"Failed to load DB module: {e}") | |
| self._db_healthy = False | |
| return self.ram_memory | |
| return self._db_memory | |
| return self.ram_memory | |
| async def _execute_with_fallback(self, method_name: str, *args, **kwargs): | |
| """ | |
| Circuit Breaker Wrapper: Tries DB first, Falls back to RAM on error. | |
| Includes a recovery period of 60 seconds. | |
| """ | |
| now = time.time() | |
| # 1. Check if we should attempt recovery | |
| if not self._db_healthy and hasattr(self, "_last_db_fail"): | |
| if now - self._last_db_fail > 60: | |
| self.logger.info("Circuit Breaker: Attempting Database Recovery...") | |
| self._db_healthy = True | |
| # 2. Try Primary (DB) | |
| if self.use_database and self._db_healthy: | |
| try: | |
| method = getattr(self.memory, method_name) | |
| # Handle both async and sync methods in underlying store | |
| if asyncio.iscoroutinefunction(method): | |
| result = await method(*args, **kwargs) | |
| else: | |
| result = method(*args, **kwargs) | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"DATABASE FAILURE ({method_name}): {e}. Switching to RAM Fallback.") | |
| self._db_healthy = False # Trip the breaker | |
| self._last_db_fail = now | |
| # 3. Fallback (RAM) | |
| self.logger.warning(f"Using In-Memory Fallback for {method_name}") | |
| method = getattr(self.ram_memory, method_name) | |
| if asyncio.iscoroutinefunction(method): | |
| return await method(*args, **kwargs) | |
| return method(*args, **kwargs) | |
| async def get_or_create( | |
| self, | |
| conversation_id: Optional[str] = None, | |
| sender_id: Optional[str] = None | |
| ) -> Dict: | |
| """ | |
| Get existing conversation or create new one. | |
| Args: | |
| conversation_id: Optional existing ID | |
| sender_id: Optional sender identifier | |
| Returns: | |
| Conversation dictionary | |
| """ | |
| return await self._execute_with_fallback("get_or_create", conversation_id, sender_id) | |
| async def get(self, conversation_id: str) -> Optional[Dict]: | |
| """Get conversation by ID.""" | |
| return await self._execute_with_fallback("get", conversation_id) | |
| async def update( | |
| self, | |
| conversation_id: str, | |
| scammer_message: str, | |
| honeypot_response: str, | |
| intelligence: Dict, | |
| phase: str, | |
| scam_type: Optional[str] = None, | |
| persona: Optional[str] = None, | |
| risk_score: float = 0.0, | |
| trust_score: float = 0.0 | |
| ) -> Dict: | |
| """ | |
| Update conversation with new message exchange. | |
| Returns updated conversation. | |
| """ | |
| return await self._execute_with_fallback( | |
| "update", | |
| conversation_id=conversation_id, | |
| scammer_message=scammer_message, | |
| honeypot_response=honeypot_response, | |
| intelligence=intelligence, | |
| phase=phase, | |
| scam_type=scam_type, | |
| persona=persona, | |
| risk_score=risk_score, | |
| trust_score=trust_score | |
| ) | |
| async def update_intelligence(self, conversation_id: str, intelligence: Dict[str, Any]) -> Dict: | |
| """Explicitly update intelligence fields.""" | |
| return await self._execute_with_fallback("update_intelligence", conversation_id, intelligence) | |
| async def determine_phase(self, message_count: int, intelligence: Optional[Dict] = None) -> str: | |
| """ | |
| Determine conversation phase based on message count and intelligence. | |
| """ | |
| # 🚨 SOC FIX: Force stall if critical threats are detected (RAT/OTP/Card) | |
| if intelligence and ( | |
| intelligence.get("rat_apps") or | |
| intelligence.get("otps") or | |
| intelligence.get("credit_cards") | |
| ): | |
| return "stall" | |
| # If we have critical payment intel, we can stay in 'stall' or move to 'conclude' | |
| if intelligence and (intelligence.get("upi_ids") or intelligence.get("bank_accounts")): | |
| if message_count > 6: | |
| return "stall" | |
| if message_count <= 2: | |
| return "hook" | |
| elif message_count <= 5: | |
| return "engage" | |
| elif message_count <= 8: | |
| return "extract" | |
| else: | |
| return "stall" | |
| def get_phase_info(self, phase: str) -> Dict[str, Any]: | |
| """Get information about a phase.""" | |
| return self.PHASES.get(phase, self.PHASES["hook"]) | |
| async def get_strategy( | |
| self, | |
| conversation: Dict, | |
| detection_result: Dict | |
| ) -> Dict[str, Any]: | |
| """ | |
| Determine conversation strategy based on current state. | |
| """ | |
| # SOC FIX: Correct message count (Avoid +1 offset) | |
| message_count = len(conversation.get("history", [])) | |
| intel = conversation.get("aggregated_intelligence", {}) | |
| phase = await self.determine_phase(message_count, intel) | |
| phase_info = self.get_phase_info(phase) | |
| # Determine trust level | |
| if message_count <= 2: | |
| trust_level = "initial" | |
| elif message_count <= 5: | |
| trust_level = "building" | |
| elif message_count <= 10: | |
| trust_level = "established" | |
| else: | |
| trust_level = "high" | |
| # Determine next goal | |
| # NOTE: next_goal is internal-only. | |
| # Must never be exposed to LLM or logs visible to judges. | |
| if phase == "extract": | |
| if not intel.get("upi_ids"): | |
| next_goal = "get_scammer_upi_id" | |
| elif not intel.get("bank_accounts"): | |
| next_goal = "get_scammer_account" | |
| else: | |
| next_goal = "keep_extracting_intel" | |
| else: | |
| next_goal = phase_info["goal"] | |
| return { | |
| "current_phase": phase, | |
| "next_goal": next_goal, | |
| "messages_exchanged": message_count, | |
| "trust_level": trust_level | |
| } | |
| def get_history_text( | |
| self, | |
| conversation_id: str, | |
| max_turns: int = 10 | |
| ) -> str: | |
| """Get formatted conversation history.""" | |
| return self.memory.get_history_text(conversation_id, max_turns) | |
| async def count_active(self) -> int: | |
| """Count active conversations.""" | |
| return await self._execute_with_fallback("count_active") | |
| async def get_statistics(self) -> Dict[str, Any]: | |
| """Get global statistics.""" | |
| return await self._execute_with_fallback("get_statistics") | |
| __all__ = ["ConversationManager"] | |