| |
| |
| |
| |
| |
|
|
| """Database models for honeypot data persistence.""" |
|
|
| from datetime import datetime |
| from typing import Optional, List |
| from sqlalchemy import ( |
| String, Integer, Float, Text, DateTime, Boolean, |
| ForeignKey, JSON, UniqueConstraint |
| ) |
| from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship |
|
|
|
|
| class Base(DeclarativeBase): |
| """Base class for all models.""" |
| pass |
|
|
|
|
| class Conversation(Base): |
| """Stores conversation sessions with scammers.""" |
| |
| __tablename__ = "conversations" |
| |
| id: Mapped[str] = mapped_column(String(50), primary_key=True) |
| sender_id: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) |
| scam_type: Mapped[Optional[str]] = mapped_column(String(50), nullable=True) |
| persona: Mapped[Optional[str]] = mapped_column(String(50), nullable=True) |
| phase: Mapped[str] = mapped_column(String(20), default="hook") |
| trust_score: Mapped[float] = mapped_column(Float, default=0.0) |
| risk_score: Mapped[float] = mapped_column(Float, default=0.0) |
| message_count: Mapped[int] = mapped_column(Integer, default=0) |
| created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) |
| updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) |
| |
| |
| messages: Mapped[List["Message"]] = relationship(back_populates="conversation", cascade="all, delete-orphan") |
| intelligence_items: Mapped[List["Intelligence"]] = relationship(back_populates="conversation", cascade="all, delete-orphan") |
| |
| def to_dict(self) -> dict: |
| """Convert to dictionary for API responses.""" |
| return { |
| "id": self.id, |
| "sender_id": self.sender_id, |
| "scam_type": self.scam_type, |
| "persona": self.persona, |
| "phase": self.phase, |
| "trust_score": self.trust_score, |
| "risk_score": self.risk_score, |
| "message_count": self.message_count, |
| "created_at": self.created_at.isoformat() if self.created_at else None, |
| "updated_at": self.updated_at.isoformat() if self.updated_at else None, |
| "history": [m.to_dict() for m in self.messages], |
| "aggregated_intelligence": self._aggregate_intel() |
| } |
| |
| def _aggregate_intel(self) -> dict: |
| """Aggregate intelligence by type.""" |
| result = { |
| "phone_numbers": [], |
| "upi_ids": [], |
| "bank_accounts": [], |
| "ifsc_codes": [], |
| "emails": [], |
| "urls": [], |
| "credit_cards": [], |
| "otps": [], |
| "rat_apps": [], |
| "pan_cards": [], |
| "aadhar_numbers": [] |
| } |
| for item in self.intelligence_items: |
| key = item.entity_type |
| |
| if key not in result: |
| result[key] = [] |
| |
| if item.entity_value not in result[key]: |
| result[key].append(item.entity_value) |
| return result |
|
|
|
|
| class Message(Base): |
| """Stores individual message exchanges.""" |
| |
| __tablename__ = "messages" |
| |
| id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) |
| conversation_id: Mapped[str] = mapped_column(String(50), ForeignKey("conversations.id")) |
| turn: Mapped[int] = mapped_column(Integer) |
| scammer_message: Mapped[str] = mapped_column(Text) |
| honeypot_response: Mapped[str] = mapped_column(Text) |
| phase: Mapped[str] = mapped_column(String(20)) |
| timestamp: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) |
| |
| |
| conversation: Mapped["Conversation"] = relationship(back_populates="messages") |
| |
| def to_dict(self) -> dict: |
| return { |
| "turn": self.turn, |
| "scammer_message": self.scammer_message, |
| "honeypot_response": self.honeypot_response, |
| "phase": self.phase, |
| "timestamp": self.timestamp.isoformat() if self.timestamp else None |
| } |
|
|
|
|
| class Intelligence(Base): |
| """Stores extracted intelligence (IOCs).""" |
| |
| __tablename__ = "intelligence" |
| |
| id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) |
| conversation_id: Mapped[str] = mapped_column(String(50), ForeignKey("conversations.id")) |
| entity_type: Mapped[str] = mapped_column(String(30)) |
| entity_value: Mapped[str] = mapped_column(String(500)) |
| created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) |
| |
| |
| conversation: Mapped["Conversation"] = relationship(back_populates="intelligence_items") |
| |
| __table_args__ = ( |
| UniqueConstraint('conversation_id', 'entity_type', 'entity_value', name='uq_intel'), |
| ) |
|
|
|
|
| class Campaign(Base): |
| """Stores scam campaign clusters.""" |
| |
| __tablename__ = "campaigns" |
| |
| id: Mapped[str] = mapped_column(String(50), primary_key=True) |
| scam_type: Mapped[str] = mapped_column(String(50)) |
| message_count: Mapped[int] = mapped_column(Integer, default=0) |
| entity_count: Mapped[int] = mapped_column(Integer, default=0) |
| first_seen: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) |
| last_seen: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) |
| |
| def to_dict(self) -> dict: |
| return { |
| "id": self.id, |
| "scam_type": self.scam_type, |
| "message_count": self.message_count, |
| "entity_count": self.entity_count, |
| "first_seen": self.first_seen.isoformat() if self.first_seen else None, |
| "last_seen": self.last_seen.isoformat() if self.last_seen else None |
| } |
|
|
|
|
| class Report(Base): |
| """Stores law enforcement reports.""" |
| |
| __tablename__ = "reports" |
| |
| id: Mapped[str] = mapped_column(String(50), primary_key=True) |
| conversation_id: Mapped[Optional[str]] = mapped_column(String(50), nullable=True) |
| report_type: Mapped[str] = mapped_column(String(30)) |
| priority: Mapped[str] = mapped_column(String(20)) |
| status: Mapped[str] = mapped_column(String(20), default="pending") |
| data: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True) |
| created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) |
| |
| def to_dict(self) -> dict: |
| return { |
| "id": self.id, |
| "conversation_id": self.conversation_id, |
| "report_type": self.report_type, |
| "priority": self.priority, |
| "status": self.status, |
| "created_at": self.created_at.isoformat() if self.created_at else None |
| } |
|
|
|
|
| __all__ = ["Base", "Conversation", "Message", "Intelligence", "Campaign", "Report"] |
|
|