# ═══════════════════════════════════════════════════════════════════════════════ # File: app/database/db.py # Description: Database connection manager with SQLite/PostgreSQL support # Switch via DATABASE_URL environment variable # ═══════════════════════════════════════════════════════════════════════════════ """ Database connection manager supporting: - SQLite (default, for development/hackathon) - PostgreSQL/Supabase (production, via DATABASE_URL) """ import os from typing import Optional, AsyncGenerator from contextlib import asynccontextmanager from sqlalchemy.ext.asyncio import ( create_async_engine, AsyncSession, async_sessionmaker, AsyncEngine ) from sqlalchemy.pool import StaticPool from app.database.models import Base class DatabaseManager: """ Manages database connections with support for multiple backends. Usage: db = DatabaseManager() await db.init() async with db.session() as session: # Use session await db.close() """ def __init__(self, database_url: Optional[str] = None): """ Initialize database manager. Args: database_url: Database URL. If None, uses DATABASE_URL env or SQLite default. Supported URLs: - sqlite+aiosqlite:///./data/honeypot.db (default) - postgresql+asyncpg://user:pass@host:5432/db (Supabase/Neon) """ self.database_url = database_url or os.getenv( "DATABASE_URL", "sqlite+aiosqlite:///./data/honeypot.db" ) # Convert postgres:// to postgresql+asyncpg:// for Supabase compatibility if isinstance(self.database_url, str): if self.database_url.startswith("postgres://"): self.database_url = self.database_url.replace( "postgres://", "postgresql+asyncpg://", 1 ) elif self.database_url.startswith("postgresql://") and "+asyncpg" not in self.database_url: self.database_url = self.database_url.replace( "postgresql://", "postgresql+asyncpg://", 1 ) self.engine: Optional[AsyncEngine] = None self.session_factory: Optional[async_sessionmaker[AsyncSession]] = None self._initialized = False @property def is_sqlite(self) -> bool: """Check if using SQLite.""" return "sqlite" in self.database_url.lower() @property def is_postgres(self) -> bool: """Check if using PostgreSQL.""" return "postgresql" in self.database_url.lower() async def init(self) -> None: """Initialize database connection and create tables.""" if self._initialized: return # Create data directory for SQLite if self.is_sqlite: db_path = self.database_url.replace("sqlite+aiosqlite:///", "") if db_path.startswith("./"): db_path = db_path[2:] db_dir = os.path.dirname(db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) # SQLite-specific settings self.engine = create_async_engine( self.database_url, echo=False, connect_args={"check_same_thread": False}, poolclass=StaticPool # Better for async SQLite ) else: # PostgreSQL settings self.engine = create_async_engine( self.database_url, echo=False, pool_size=5, max_overflow=10, pool_pre_ping=True ) # Create session factory self.session_factory = async_sessionmaker( bind=self.engine, class_=AsyncSession, expire_on_commit=False ) # Create tables async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) self._initialized = True print(f"Database initialized: {'SQLite' if self.is_sqlite else 'PostgreSQL'}") @asynccontextmanager async def session(self) -> AsyncGenerator[AsyncSession, None]: """Get a database session.""" if not self._initialized: await self.init() async with self.session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise async def close(self) -> None: """Close database connection.""" if self.engine: await self.engine.dispose() self._initialized = False print("Database connection closed") # Global database manager instance _db_manager: Optional[DatabaseManager] = None def get_db_manager() -> DatabaseManager: """Get or create the global database manager.""" global _db_manager if _db_manager is None: _db_manager = DatabaseManager() return _db_manager async def init_db() -> DatabaseManager: """Initialize the global database with Automatic Crash Resilience.""" db = get_db_manager() try: await db.init() except Exception as e: print(f"\n[!!!] CRITICAL: Primary Database Initialization Failed: {e}") # FAILOVER LOGIC: If Cloud DB fails, switch to Emergency Local SQLite if not db.is_sqlite: print("[♻️] RESILIENCE PROTOCOL: Switching to Local SQLite Fallback...") global _db_manager # CHECK FOR EXISTING LOCAL DB fallback_path = "./data/honeypot.db" if os.path.exists(fallback_path): print(f"[ℹ️] Found existing local database: {fallback_path}") fallback_url = f"sqlite+aiosqlite:///{fallback_path}" else: print(f"[ℹ️] Creating new emergency database...") fallback_url = "sqlite+aiosqlite:///./data/honeypot_emergency.db" _db_manager = DatabaseManager(database_url=fallback_url) db = _db_manager # Retry initialization try: await db.init() print("[✅] RESILIENCE SUCCESS: System operational on Emergency DB.") except Exception as e2: print(f"[❌] FATAL: Emergency Fallback Failed: {e2}") # We do not raise here to allow API to start even in 'Zombie' mode (no persistence) # But typically this won't happen for SQLite. else: print("[❌] FATAL: Local SQLite failed (Permissions?). System starting in NON-PERSISTENT mode.") return db async def close_db() -> None: """Close the global database connection.""" global _db_manager if _db_manager: await _db_manager.close() _db_manager = None async def get_db() -> AsyncGenerator[AsyncSession, None]: """Dependency for FastAPI routes.""" db = get_db_manager() async with db.session() as session: yield session __all__ = ["DatabaseManager", "get_db_manager", "init_db", "close_db", "get_db"]