sentinel-scam-honeypo / app /agents /conversation_manager.py
avinash-rai's picture
Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600
from __future__ import annotations
# ═══════════════════════════════════════════════════════════════════════════════
# File: app/agents/conversation_manager.py
# Description: Conversation state and phase management agent
# ═══════════════════════════════════════════════════════════════════════════════
"""Conversation Manager Agent for multi-turn engagement."""
import time
import asyncio
from typing import Dict, List, Any, Optional, TYPE_CHECKING
from app.utils.logger import AgentLogger
if TYPE_CHECKING:
from app.core.memory import ConversationMemory
from app.database.memory_db import DatabaseMemoryStore
class ConversationManager:
"""
Agent for managing conversation state and phases.
Handles:
- Multi-turn conversation tracking
- Phase progression (hook → engage → extract → stall)
- Intelligence aggregation
- Statistics tracking
Now supports database persistence via DatabaseMemoryStore.
"""
# Phase definitions
PHASES = {
"hook": {
"message_range": (1, 2),
"goal": "Show initial interest, appear as easy target",
"next": "engage"
},
"engage": {
"message_range": (3, 5),
"goal": "Build rapport, ask for proof or documents",
"next": "extract"
},
"extract": {
"message_range": (6, 8),
"goal": "Get scammer to reveal payment details",
"next": "stall"
},
"stall": {
"message_range": (9, 50),
"goal": "Keep conversation going with delays",
"next": "stall"
}
}
def __init__(self, use_database: bool = True):
"""
Initialize conversation manager.
Args:
use_database: If True, use database-backed storage (persistent).
If False, use in-memory storage (faster but volatile).
"""
self.use_database = use_database
self._db_memory = None
self._ram_memory = None
self.logger = AgentLogger("conversation_manager")
self._db_healthy = True # Circuit breaker state
@property
def ram_memory(self) -> ConversationMemory:
"""Always available fallback memory."""
if self._ram_memory is None:
from app.core.memory import memory_store
self._ram_memory = memory_store
return self._ram_memory
@property
def memory(self) -> ConversationMemory | DatabaseMemoryStore:
"""Primary memory store (DB if healthy, else RAM)."""
if self.use_database and self._db_healthy:
if self._db_memory is None:
try:
from app.database.memory_db import db_memory_store
self._db_memory = db_memory_store
except Exception as e:
self.logger.error(f"Failed to load DB module: {e}")
self._db_healthy = False
return self.ram_memory
return self._db_memory
return self.ram_memory
async def _execute_with_fallback(self, method_name: str, *args, **kwargs):
"""
Circuit Breaker Wrapper: Tries DB first, Falls back to RAM on error.
Includes a recovery period of 60 seconds.
"""
now = time.time()
# 1. Check if we should attempt recovery
if not self._db_healthy and hasattr(self, "_last_db_fail"):
if now - self._last_db_fail > 60:
self.logger.info("Circuit Breaker: Attempting Database Recovery...")
self._db_healthy = True
# 2. Try Primary (DB)
if self.use_database and self._db_healthy:
try:
method = getattr(self.memory, method_name)
# Handle both async and sync methods in underlying store
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
result = method(*args, **kwargs)
return result
except Exception as e:
self.logger.error(f"DATABASE FAILURE ({method_name}): {e}. Switching to RAM Fallback.")
self._db_healthy = False # Trip the breaker
self._last_db_fail = now
# 3. Fallback (RAM)
self.logger.warning(f"Using In-Memory Fallback for {method_name}")
method = getattr(self.ram_memory, method_name)
if asyncio.iscoroutinefunction(method):
return await method(*args, **kwargs)
return method(*args, **kwargs)
async def get_or_create(
self,
conversation_id: Optional[str] = None,
sender_id: Optional[str] = None
) -> Dict:
"""
Get existing conversation or create new one.
Args:
conversation_id: Optional existing ID
sender_id: Optional sender identifier
Returns:
Conversation dictionary
"""
return await self._execute_with_fallback("get_or_create", conversation_id, sender_id)
async def get(self, conversation_id: str) -> Optional[Dict]:
"""Get conversation by ID."""
return await self._execute_with_fallback("get", conversation_id)
async def update(
self,
conversation_id: str,
scammer_message: str,
honeypot_response: str,
intelligence: Dict,
phase: str,
scam_type: Optional[str] = None,
persona: Optional[str] = None,
risk_score: float = 0.0,
trust_score: float = 0.0
) -> Dict:
"""
Update conversation with new message exchange.
Returns updated conversation.
"""
return await self._execute_with_fallback(
"update",
conversation_id=conversation_id,
scammer_message=scammer_message,
honeypot_response=honeypot_response,
intelligence=intelligence,
phase=phase,
scam_type=scam_type,
persona=persona,
risk_score=risk_score,
trust_score=trust_score
)
async def update_intelligence(self, conversation_id: str, intelligence: Dict[str, Any]) -> Dict:
"""Explicitly update intelligence fields."""
return await self._execute_with_fallback("update_intelligence", conversation_id, intelligence)
async def determine_phase(self, message_count: int, intelligence: Optional[Dict] = None) -> str:
"""
Determine conversation phase based on message count and intelligence.
"""
# 🚨 SOC FIX: Force stall if critical threats are detected (RAT/OTP/Card)
if intelligence and (
intelligence.get("rat_apps") or
intelligence.get("otps") or
intelligence.get("credit_cards")
):
return "stall"
# If we have critical payment intel, we can stay in 'stall' or move to 'conclude'
if intelligence and (intelligence.get("upi_ids") or intelligence.get("bank_accounts")):
if message_count > 6:
return "stall"
if message_count <= 2:
return "hook"
elif message_count <= 5:
return "engage"
elif message_count <= 8:
return "extract"
else:
return "stall"
def get_phase_info(self, phase: str) -> Dict[str, Any]:
"""Get information about a phase."""
return self.PHASES.get(phase, self.PHASES["hook"])
async def get_strategy(
self,
conversation: Dict,
detection_result: Dict
) -> Dict[str, Any]:
"""
Determine conversation strategy based on current state.
"""
# SOC FIX: Correct message count (Avoid +1 offset)
message_count = len(conversation.get("history", []))
intel = conversation.get("aggregated_intelligence", {})
phase = await self.determine_phase(message_count, intel)
phase_info = self.get_phase_info(phase)
# Determine trust level
if message_count <= 2:
trust_level = "initial"
elif message_count <= 5:
trust_level = "building"
elif message_count <= 10:
trust_level = "established"
else:
trust_level = "high"
# Determine next goal
# NOTE: next_goal is internal-only.
# Must never be exposed to LLM or logs visible to judges.
if phase == "extract":
if not intel.get("upi_ids"):
next_goal = "get_scammer_upi_id"
elif not intel.get("bank_accounts"):
next_goal = "get_scammer_account"
else:
next_goal = "keep_extracting_intel"
else:
next_goal = phase_info["goal"]
return {
"current_phase": phase,
"next_goal": next_goal,
"messages_exchanged": message_count,
"trust_level": trust_level
}
def get_history_text(
self,
conversation_id: str,
max_turns: int = 10
) -> str:
"""Get formatted conversation history."""
return self.memory.get_history_text(conversation_id, max_turns)
async def count_active(self) -> int:
"""Count active conversations."""
return await self._execute_with_fallback("count_active")
async def get_statistics(self) -> Dict[str, Any]:
"""Get global statistics."""
return await self._execute_with_fallback("get_statistics")
__all__ = ["ConversationManager"]