import os import json import psycopg2 import psycopg2.extras def _get_conn(): url = os.environ.get("SUPABASE_DB_URL") if not url: raise RuntimeError("SUPABASE_DB_URL environment variable not set") return psycopg2.connect(url, cursor_factory=psycopg2.extras.RealDictCursor) def upsert_file_registry(uid: str, connector_type: str, files: list[dict]): """ files: list of dicts with keys: file_id, file_name, is_folder, mime_type, file_size_bytes, parent_folder_id, full_path Uses ON CONFLICT DO NOTHING so first_seen_at is never overwritten. """ if not files: return conn = _get_conn() cur = conn.cursor() for f in files: cur.execute(""" INSERT INTO sense_file_registry (uid, connector_type, file_id, file_name, is_folder, mime_type, file_size_bytes, parent_folder_id, full_path) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (uid, connector_type, file_id) DO NOTHING """, (uid, connector_type, f["file_id"], f["file_name"], f.get("is_folder", False), f.get("mime_type"), f.get("file_size_bytes"), f.get("parent_folder_id"), f.get("full_path"))) conn.commit() cur.close() conn.close() def upsert_scan_state(uid: str, connector_type: str, file_id: str, classification: str, scan_type: str, session_id: int): conn = _get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO sense_file_scan_state (uid, connector_type, file_id, classification, scan_type, last_scanned_at, scan_session_id) VALUES (%s,%s,%s,%s,%s,NOW(),%s) ON CONFLICT (uid, connector_type, file_id) DO UPDATE SET classification=EXCLUDED.classification, scan_type=EXCLUDED.scan_type, last_scanned_at=NOW(), scan_session_id=EXCLUDED.scan_session_id """, (uid, connector_type, file_id, classification, scan_type, session_id)) conn.commit() cur.close() conn.close() def create_scan_session(uid: str, connector_type: str, scan_type: str) -> int: conn = _get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO sense_scan_sessions (uid, connector_type, scan_type) VALUES (%s,%s,%s) RETURNING id """, (uid, connector_type, scan_type)) session_id = cur.fetchone()["id"] conn.commit() cur.close() conn.close() return session_id def finish_scan_session(session_id: int, files_scanned: int, total_pii_found: int): conn = _get_conn() cur = conn.cursor() cur.execute(""" UPDATE sense_scan_sessions SET files_scanned=%s, total_pii_found=%s, status='completed' WHERE id=%s """, (files_scanned, total_pii_found, session_id)) conn.commit() cur.close() conn.close() def detect_scan_type(uid: str, connector_type: str) -> str: """Returns 'external' if no registry rows exist, else 'incremental'.""" conn = _get_conn() cur = conn.cursor() cur.execute(""" SELECT COUNT(*) as cnt FROM sense_file_registry WHERE uid=%s AND connector_type=%s """, (uid, connector_type)) count = cur.fetchone()["cnt"] cur.close() conn.close() return "external" if count == 0 else "incremental" def get_file_catalog(uid: str, connector_type: str) -> list[dict]: """Returns merged registry + scan state for all files.""" conn = _get_conn() cur = conn.cursor() cur.execute(""" SELECT r.file_id, r.file_name, r.is_folder, r.mime_type, r.file_size_bytes, r.parent_folder_id, r.full_path, r.first_seen_at, r.metadata, s.classification, s.scan_type, s.last_scanned_at FROM sense_file_registry r LEFT JOIN sense_file_scan_state s ON r.uid=s.uid AND r.connector_type=s.connector_type AND r.file_id=s.file_id WHERE r.uid=%s AND r.connector_type=%s ORDER BY r.is_folder DESC, r.file_name ASC """, (uid, connector_type)) rows = cur.fetchall() cur.close() conn.close() result = [] for r in rows: d = dict(r) # metadata may come back as a dict (psycopg2 auto-parses JSONB) or string if isinstance(d.get("metadata"), str): try: d["metadata"] = json.loads(d["metadata"]) except Exception: d["metadata"] = {} result.append(d) return result def get_scan_sessions(uid: str, connector_type: str) -> list[dict]: conn = _get_conn() cur = conn.cursor() cur.execute(""" SELECT id, scan_type, triggered_at, files_scanned, total_pii_found, status FROM sense_scan_sessions WHERE uid=%s AND connector_type=%s ORDER BY triggered_at DESC LIMIT 50 """, (uid, connector_type)) rows = cur.fetchall() cur.close() conn.close() return [dict(r) for r in rows] # ── DB Connector Persistence ─────────────────────────────────────────────────── def _build_db_file_id(db_name: str, schema_name: str, table_name: str | None) -> str: """Deterministic file_id for a DB table or the DB folder itself.""" if table_name is None: return db_name if schema_name: return f"{db_name}.{schema_name}.{table_name}" return f"{db_name}.{table_name}" def upsert_db_registry_entry( uid: str, connector_type: str, # "postgresql" | "mysql" | "mongodb" db_name: str, schema_name: str, # "public" for postgres; "" for mysql/mongo table_name: str | None, # None means this is the database folder row is_folder: bool, metadata: dict, # {"column_count": N, "row_count_scanned": N, "pii_types": {}} ) -> None: """ Upserts a database or table entry into sense_file_registry. - If is_folder=True: represents the database itself. - If is_folder=False: represents a single table inside the database. file_id is deterministic: ".." or just "" for the folder row. """ file_id = _build_db_file_id(db_name, schema_name, table_name) if is_folder: file_name = db_name parent_folder_id = None full_path = db_name else: file_name = table_name parent_folder_id = db_name # group tables under the db folder if schema_name: full_path = f"{db_name}/{schema_name}/{table_name}" else: full_path = f"{db_name}/{table_name}" conn = _get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO sense_file_registry (uid, connector_type, file_id, file_name, is_folder, mime_type, file_size_bytes, parent_folder_id, full_path, metadata) VALUES (%s,%s,%s,%s,%s,NULL,NULL,%s,%s,%s) ON CONFLICT (uid, connector_type, file_id) DO UPDATE SET file_name=EXCLUDED.file_name, metadata=EXCLUDED.metadata, full_path=EXCLUDED.full_path """, (uid, connector_type, file_id, file_name, is_folder, parent_folder_id, full_path, json.dumps(metadata))) conn.commit() cur.close() conn.close() def upsert_db_scan_state( uid: str, connector_type: str, db_name: str, schema_name: str, table_name: str | None, # None means this is the database folder row classification: str, # "SENSITIVE" | "NON-SENSITIVE" | "unscanned" scan_type: str, # Always "FULL_LOAD" for DB connectors scan_session_id: int, ) -> None: """ Upserts a scan state row into sense_file_scan_state. file_id must match the one used in upsert_db_registry_entry. """ file_id = _build_db_file_id(db_name, schema_name, table_name) conn = _get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO sense_file_scan_state (uid, connector_type, file_id, classification, scan_type, last_scanned_at, scan_session_id) VALUES (%s,%s,%s,%s,%s,NOW(),%s) ON CONFLICT (uid, connector_type, file_id) DO UPDATE SET classification=EXCLUDED.classification, scan_type=EXCLUDED.scan_type, last_scanned_at=NOW(), scan_session_id=EXCLUDED.scan_session_id """, (uid, connector_type, file_id, classification, scan_type, scan_session_id)) conn.commit() cur.close() conn.close()