avinash-rai's picture
feat: Platinum Grade Hardening & Zero-Trust Compliance Fixes (Audit v4.2)
e6999f9
# ═══════════════════════════════════════════════════════════════════════════════
# File: app/database/models.py
# Description: SQLAlchemy models for persistent storage
# Supports: SQLite (default) + PostgreSQL/Supabase (production)
# ═══════════════════════════════════════════════════════════════════════════════
"""Database models for honeypot data persistence."""
from datetime import datetime
from typing import Optional, List
from sqlalchemy import (
String, Integer, Float, Text, DateTime, Boolean,
ForeignKey, JSON, UniqueConstraint
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
"""Base class for all models."""
pass
class Conversation(Base):
"""Stores conversation sessions with scammers."""
__tablename__ = "conversations"
id: Mapped[str] = mapped_column(String(50), primary_key=True)
sender_id: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
scam_type: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
persona: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
phase: Mapped[str] = mapped_column(String(20), default="hook")
trust_score: Mapped[float] = mapped_column(Float, default=0.0)
risk_score: Mapped[float] = mapped_column(Float, default=0.0)
message_count: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
messages: Mapped[List["Message"]] = relationship(back_populates="conversation", cascade="all, delete-orphan")
intelligence_items: Mapped[List["Intelligence"]] = relationship(back_populates="conversation", cascade="all, delete-orphan")
def to_dict(self) -> dict:
"""Convert to dictionary for API responses."""
return {
"id": self.id,
"sender_id": self.sender_id,
"scam_type": self.scam_type,
"persona": self.persona,
"phase": self.phase,
"trust_score": self.trust_score,
"risk_score": self.risk_score,
"message_count": self.message_count,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"history": [m.to_dict() for m in self.messages],
"aggregated_intelligence": self._aggregate_intel()
}
def _aggregate_intel(self) -> dict:
"""Aggregate intelligence by type."""
result = {
"phone_numbers": [],
"upi_ids": [],
"bank_accounts": [],
"ifsc_codes": [],
"emails": [],
"urls": [],
"credit_cards": [],
"otps": [],
"rat_apps": [],
"pan_cards": [],
"aadhar_numbers": []
}
for item in self.intelligence_items:
key = item.entity_type
# Handle dynamic keys or pre-defined ones
if key not in result:
result[key] = []
if item.entity_value not in result[key]:
result[key].append(item.entity_value)
return result
class Message(Base):
"""Stores individual message exchanges."""
__tablename__ = "messages"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
conversation_id: Mapped[str] = mapped_column(String(50), ForeignKey("conversations.id"))
turn: Mapped[int] = mapped_column(Integer)
scammer_message: Mapped[str] = mapped_column(Text)
honeypot_response: Mapped[str] = mapped_column(Text)
phase: Mapped[str] = mapped_column(String(20))
timestamp: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
# Relationship
conversation: Mapped["Conversation"] = relationship(back_populates="messages")
def to_dict(self) -> dict:
return {
"turn": self.turn,
"scammer_message": self.scammer_message,
"honeypot_response": self.honeypot_response,
"phase": self.phase,
"timestamp": self.timestamp.isoformat() if self.timestamp else None
}
class Intelligence(Base):
"""Stores extracted intelligence (IOCs)."""
__tablename__ = "intelligence"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
conversation_id: Mapped[str] = mapped_column(String(50), ForeignKey("conversations.id"))
entity_type: Mapped[str] = mapped_column(String(30)) # phone, upi, bank, url, etc.
entity_value: Mapped[str] = mapped_column(String(500))
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
# Relationship
conversation: Mapped["Conversation"] = relationship(back_populates="intelligence_items")
__table_args__ = (
UniqueConstraint('conversation_id', 'entity_type', 'entity_value', name='uq_intel'),
)
class Campaign(Base):
"""Stores scam campaign clusters."""
__tablename__ = "campaigns"
id: Mapped[str] = mapped_column(String(50), primary_key=True)
scam_type: Mapped[str] = mapped_column(String(50))
message_count: Mapped[int] = mapped_column(Integer, default=0)
entity_count: Mapped[int] = mapped_column(Integer, default=0)
first_seen: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
last_seen: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
def to_dict(self) -> dict:
return {
"id": self.id,
"scam_type": self.scam_type,
"message_count": self.message_count,
"entity_count": self.entity_count,
"first_seen": self.first_seen.isoformat() if self.first_seen else None,
"last_seen": self.last_seen.isoformat() if self.last_seen else None
}
class Report(Base):
"""Stores law enforcement reports."""
__tablename__ = "reports"
id: Mapped[str] = mapped_column(String(50), primary_key=True)
conversation_id: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
report_type: Mapped[str] = mapped_column(String(30)) # police, upi_freeze
priority: Mapped[str] = mapped_column(String(20))
status: Mapped[str] = mapped_column(String(20), default="pending")
data: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
def to_dict(self) -> dict:
return {
"id": self.id,
"conversation_id": self.conversation_id,
"report_type": self.report_type,
"priority": self.priority,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None
}
__all__ = ["Base", "Conversation", "Message", "Intelligence", "Campaign", "Report"]