avinash-rai's picture
Deployment Ready: Fixed scam detection low confidence, added production audit report, optimized throttles
1838600
# ═══════════════════════════════════════════════════════════════════════════════
# File: app/database/db.py
# Description: Database connection manager with SQLite/PostgreSQL support
# Switch via DATABASE_URL environment variable
# ═══════════════════════════════════════════════════════════════════════════════
"""
Database connection manager supporting:
- SQLite (default, for development/hackathon)
- PostgreSQL/Supabase (production, via DATABASE_URL)
"""
import os
from typing import Optional, AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
AsyncEngine
)
from sqlalchemy.pool import StaticPool
from app.database.models import Base
class DatabaseManager:
"""
Manages database connections with support for multiple backends.
Usage:
db = DatabaseManager()
await db.init()
async with db.session() as session:
# Use session
await db.close()
"""
def __init__(self, database_url: Optional[str] = None):
"""
Initialize database manager.
Args:
database_url: Database URL. If None, uses DATABASE_URL env or SQLite default.
Supported URLs:
- sqlite+aiosqlite:///./data/honeypot.db (default)
- postgresql+asyncpg://user:pass@host:5432/db (Supabase/Neon)
"""
self.database_url = database_url or os.getenv(
"DATABASE_URL",
"sqlite+aiosqlite:///./data/honeypot.db"
)
# Convert postgres:// to postgresql+asyncpg:// for Supabase compatibility
if isinstance(self.database_url, str):
if self.database_url.startswith("postgres://"):
self.database_url = self.database_url.replace(
"postgres://", "postgresql+asyncpg://", 1
)
elif self.database_url.startswith("postgresql://") and "+asyncpg" not in self.database_url:
self.database_url = self.database_url.replace(
"postgresql://", "postgresql+asyncpg://", 1
)
self.engine: Optional[AsyncEngine] = None
self.session_factory: Optional[async_sessionmaker[AsyncSession]] = None
self._initialized = False
@property
def is_sqlite(self) -> bool:
"""Check if using SQLite."""
return "sqlite" in self.database_url.lower()
@property
def is_postgres(self) -> bool:
"""Check if using PostgreSQL."""
return "postgresql" in self.database_url.lower()
async def init(self) -> None:
"""Initialize database connection and create tables."""
if self._initialized:
return
# Create data directory for SQLite
if self.is_sqlite:
db_path = self.database_url.replace("sqlite+aiosqlite:///", "")
if db_path.startswith("./"):
db_path = db_path[2:]
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
# SQLite-specific settings
self.engine = create_async_engine(
self.database_url,
echo=False,
connect_args={"check_same_thread": False},
poolclass=StaticPool # Better for async SQLite
)
else:
# PostgreSQL settings
self.engine = create_async_engine(
self.database_url,
echo=False,
pool_size=5,
max_overflow=10,
pool_pre_ping=True
)
# Create session factory
self.session_factory = async_sessionmaker(
bind=self.engine,
class_=AsyncSession,
expire_on_commit=False
)
# Create tables
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
self._initialized = True
print(f"Database initialized: {'SQLite' if self.is_sqlite else 'PostgreSQL'}")
@asynccontextmanager
async def session(self) -> AsyncGenerator[AsyncSession, None]:
"""Get a database session."""
if not self._initialized:
await self.init()
async with self.session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def close(self) -> None:
"""Close database connection."""
if self.engine:
await self.engine.dispose()
self._initialized = False
print("Database connection closed")
# Global database manager instance
_db_manager: Optional[DatabaseManager] = None
def get_db_manager() -> DatabaseManager:
"""Get or create the global database manager."""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager
async def init_db() -> DatabaseManager:
"""Initialize the global database with Automatic Crash Resilience."""
db = get_db_manager()
try:
await db.init()
except Exception as e:
print(f"\n[!!!] CRITICAL: Primary Database Initialization Failed: {e}")
# FAILOVER LOGIC: If Cloud DB fails, switch to Emergency Local SQLite
if not db.is_sqlite:
print("[♻️] RESILIENCE PROTOCOL: Switching to Local SQLite Fallback...")
global _db_manager
# CHECK FOR EXISTING LOCAL DB
fallback_path = "./data/honeypot.db"
if os.path.exists(fallback_path):
print(f"[ℹ️] Found existing local database: {fallback_path}")
fallback_url = f"sqlite+aiosqlite:///{fallback_path}"
else:
print(f"[ℹ️] Creating new emergency database...")
fallback_url = "sqlite+aiosqlite:///./data/honeypot_emergency.db"
_db_manager = DatabaseManager(database_url=fallback_url)
db = _db_manager
# Retry initialization
try:
await db.init()
print("[✅] RESILIENCE SUCCESS: System operational on Emergency DB.")
except Exception as e2:
print(f"[❌] FATAL: Emergency Fallback Failed: {e2}")
# We do not raise here to allow API to start even in 'Zombie' mode (no persistence)
# But typically this won't happen for SQLite.
else:
print("[❌] FATAL: Local SQLite failed (Permissions?). System starting in NON-PERSISTENT mode.")
return db
async def close_db() -> None:
"""Close the global database connection."""
global _db_manager
if _db_manager:
await _db_manager.close()
_db_manager = None
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency for FastAPI routes."""
db = get_db_manager()
async with db.session() as session:
yield session
__all__ = ["DatabaseManager", "get_db_manager", "init_db", "close_db", "get_db"]