import re import json import pandas as pd import fitz # PyMuPDF import nltk import io import os import pickle import base64 import concurrent.futures from typing import Dict, List, Any, Optional from sqlalchemy import create_engine from urllib.parse import quote_plus from bs4 import BeautifulSoup # --- IMPORT CLASSIFIERS (always-on) --- from classifier_manager.spacy_model import PiiSpacyAnalyzer from classifier_manager.presidio_model import PiiPresidioAnalyzer from classifier_manager.gliner_model import PiiGlinerAnalyzer from classifier_manager.deberta_model import PiiDebertaAnalyzer from classifier_manager.inspector import ModelInspector from classifier_manager.regex_scanner import RegexScanner # --- IMPORT CLASSIFIERS (lazy-loaded — NOT instantiated at startup) --- from classifier_manager.pasteproof_model import PiiPasteproofAnalyzer from classifier_manager.piiranha_model import PiiPiiranhaAnalyzer from classifier_manager.nvidia_gliner_model import PiiNvidiaGlinerAnalyzer from classifier_manager.mmbert_model import PiiMmbertAnalyzer from classifier_manager.nerguard_model import PiiNerguardAnalyzer from classifier_manager.gliner_pii_large_model import PiiGlinerLargeAnalyzer # --- IMPORT FILE HANDLERS --- from file_handlers.ocr_engine import OcrEngine from file_handlers.avro_handler import AvroHandler from file_handlers.parquet_handler import ParquetHandler from file_handlers.json_handler import JsonHandler from file_handlers.pdf_handler import PdfHandler # --- IMPORT CONNECTORS --- from connectors.postgres_handler import PostgresHandler from connectors.mysql_handler import MysqlHandler from connectors.gmail_handler import GmailHandler from connectors.drive_handler import DriveHandler from connectors.aws_s3_handler import S3Handler from connectors.azure_handler import AzureBlobHandler from connectors.gcp_storage_handler import GcpStorageHandler from connectors.slack_handler import SlackHandler from connectors.confluence_handler import ConfluenceHandler from connectors.mongo_handler import MongoHandler # --- DEPENDENCY CHECKS --- try: from googleapiclient.discovery import build GOOGLE_AVAILABLE = True except ImportError: GOOGLE_AVAILABLE = False print("Google Libraries not installed.") # Optional dependency checks try: import pymongo MONGO_AVAILABLE = True except: MONGO_AVAILABLE = False try: import boto3 AWS_AVAILABLE = True except: AWS_AVAILABLE = False try: from azure.storage.blob import BlobServiceClient AZURE_AVAILABLE = True except: AZURE_AVAILABLE = False try: from google.cloud import storage GCS_AVAILABLE = True except: GCS_AVAILABLE = False # NLTK Setup try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt') nltk.download('averaged_perceptron_tagger') nltk.download('maxent_ne_chunker') nltk.download('words') nltk.download('punkt_tab') class RegexClassifier: """ Main Orchestrator Class. This acts as the central controller for all PII detection operations. """ # ----------------------------------------------------------------------- # MODEL REGISTRY: Maps user-facing key -> (instance, always_on flag) # always_on=True -> loaded at startup, always runs # always_on=False -> lazy-loaded only when user enables the toggle # ----------------------------------------------------------------------- _MODEL_CLASSES = { "regex": None, # handled inline, not a class instance "nltk": None, # handled inline, not a class instance "spacy": PiiSpacyAnalyzer, "presidio": PiiPresidioAnalyzer, "gliner": PiiGlinerAnalyzer, "deberta": PiiDebertaAnalyzer, "pasteproof": PiiPasteproofAnalyzer, "piiranha": PiiPiiranhaAnalyzer, "nvidia_gliner": PiiNvidiaGlinerAnalyzer, "mmbert": PiiMmbertAnalyzer, } # Models that load at startup (core ensemble, low memory) _ALWAYS_ON = {"regex", "nltk", "spacy", "presidio", "gliner", "deberta"} def __init__(self): # 1. Always-On Classifiers (loaded at boot) self.regex_scanner = RegexScanner() self.spacy_analyzer = PiiSpacyAnalyzer() self.presidio_analyzer= PiiPresidioAnalyzer() self.gliner_analyzer = PiiGlinerAnalyzer() self.deberta_analyzer = PiiDebertaAnalyzer() self.inspector = ModelInspector() # 2. Lazy-Load Registry: key -> instance (populated on first use) self._lazy_models: Dict[str, Any] = {} self._lazy_classes = { "pasteproof": PiiPasteproofAnalyzer, "piiranha": PiiPiiranhaAnalyzer, "nvidia_gliner": PiiNvidiaGlinerAnalyzer, "mmbert": PiiMmbertAnalyzer, } # 2. File Handlers self.ocr_engine = OcrEngine() self.avro_handler = AvroHandler() self.parquet_handler = ParquetHandler() self.json_handler = JsonHandler() self.pdf_handler = PdfHandler(self.ocr_engine) # 3. Connectors self.pg_handler = PostgresHandler() self.mysql_handler = MysqlHandler() self.mongo_handler = MongoHandler() self.gmail_handler = GmailHandler() self.drive_handler = DriveHandler() self.s3_handler = S3Handler() self.azure_handler = AzureBlobHandler() self.gcp_handler = GcpStorageHandler() self.slack_handler = SlackHandler() self.confluence_handler = ConfluenceHandler() # --- LAZY MODEL REGISTRY --- # Models are instantiated + loaded only when first requested. # Once loaded they stay in RAM for the lifetime of the process. self._lazy_models: dict = {} self._lazy_classes: dict = { "pasteproof": PiiPasteproofAnalyzer, "piiranha": PiiPiiranhaAnalyzer, "nvidia_gliner": PiiNvidiaGlinerAnalyzer, "mmbert": PiiMmbertAnalyzer, "nerguard": PiiNerguardAnalyzer, "gliner_large": PiiGlinerLargeAnalyzer, } # Shortcuts for UI colors self.colors = self.regex_scanner.colors # --- LAZY MODEL LOADER --- def _get_lazy_model(self, key: str): """ Return a loaded lazy model instance by key. Instantiates and calls .load() on first access; reuses on subsequent calls. Returns None if key is unknown or loading fails. """ if key not in self._lazy_models: cls = self._lazy_classes.get(key) if cls is None: print(f"[LazyLoader] Unknown model key: {key}") return None inst = cls() success = inst.load() if not success: print(f"[LazyLoader] Failed to load model: {key}") return None self._lazy_models[key] = inst print(f"[LazyLoader] Model '{key}' loaded and cached.") return self._lazy_models[key] # Per-model timeout (seconds). Lazy models get extra time for first download. _ALWAYS_ON_TIMEOUT = 30 _LAZY_MODEL_TIMEOUT = 100 # first load can take 60-90 s on HF Spaces CPU def scan_with_models( self, text: str, model_keys: List[str], ) -> Dict[str, Any]: """ Run selected models on text IN PARALLEL using a thread pool. Each model gets its own timeout. Timed-out / failed models return {"detections": [], "error": "...", "timed_out": True}. """ from chunking_engine import run_model_with_chunking # Models that do not need chunking (they process the entire string at once) always_on_no_chunk = { "regex": lambda t: self.regex_scanner.scan(t), "nltk": lambda t: self.scan_with_nltk(t), "spacy": lambda t: self.spacy_analyzer.scan(t), "presidio": lambda t: self.presidio_analyzer.scan(t), } # Models that DO need chunking due to token limits always_on_chunked = { "gliner": lambda t: run_model_with_chunking(self.gliner_analyzer, t), "deberta": lambda t: run_model_with_chunking(self.deberta_analyzer, t), } # Handle the special 'ensemble' key if "ensemble" in model_keys: # We don't parallelize ensemble within the pool because it internally calls other models # We will handle it separately below or let it run synchronously if it's the only one. pass def _run_one(key: str): if key == "ensemble": return key, {"detections": self.run_weighted_ensemble(text), "error": None, "timed_out": False} timeout = self._ALWAYS_ON_TIMEOUT if (key in always_on_no_chunk or key in always_on_chunked) else self._LAZY_MODEL_TIMEOUT try: if key in always_on_no_chunk: detections = always_on_no_chunk[key](text) elif key in always_on_chunked: detections = always_on_chunked[key](text) else: model = self._get_lazy_model(key) if model: detections = run_model_with_chunking(model, text) else: detections = [] return key, {"detections": detections, "error": None, "timed_out": False} except Exception as e: print(f"[scan_with_models] Error in '{key}': {e}") return key, {"detections": [], "error": str(e), "timed_out": False} results: Dict[str, Any] = {} max_workers = min(len(model_keys), 4) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool: future_map = {pool.submit(_run_one, key): key for key in model_keys} # Wait for all, but cap each at its per-model timeout done, not_done = concurrent.futures.wait( future_map.keys(), timeout=self._LAZY_MODEL_TIMEOUT + 5, # outer wall ) for f in done: key, payload = f.result() results[key] = payload for f in not_done: key = future_map[f] f.cancel() print(f"[scan_with_models] TIMEOUT: model '{key}' exceeded wall time") results[key] = {"detections": [], "error": "Model timed out", "timed_out": True} return results # --- PATTERN MANAGEMENT PROXY --- def list_patterns(self): return self.regex_scanner.patterns def add_pattern(self, n, r): self.regex_scanner.add_pattern(n, r) def remove_pattern(self, n): self.regex_scanner.remove_pattern(n) # --- CORE ANALYSIS --- def scan_with_regex(self, text: str) -> List[dict]: return self.regex_scanner.scan(text) def scan_with_nltk(self, text: str) -> List[dict]: detections = [] try: for chunk in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(text))): if hasattr(chunk, 'label') and chunk.label() in ['PERSON', 'GPE']: val = " ".join(c[0] for c in chunk) start = text.find(val) if start != -1: detections.append({ "label": "LOCATION" if chunk.label() == 'GPE' else "FIRST_NAME", "text": val, "start": start, "end": start+len(val), "source": "NLTK" }) except: pass return detections def _get_lazy_model(self, key: str): """Instantiate and load a lazy model on first access. Returns instance or None.""" if key not in self._lazy_classes: return None if key not in self._lazy_models: instance = self._lazy_classes[key]() loaded = instance.load() if not loaded: return None self._lazy_models[key] = instance return self._lazy_models[key] def analyze_text_hybrid(self, text: str, selected_models: List[str] = None) -> List[dict]: """ Run PII detection using only the models specified in `selected_models`. If `selected_models` is None or empty, the full always-on ensemble runs. """ if not text: return [] # Default: run all always-on models if not selected_models: selected_models = list(self._ALWAYS_ON) all_matches = [] from chunking_engine import run_model_with_chunking # --- Always-On models (no lazy loading needed) --- if "regex" in selected_models: all_matches.extend(self.regex_scanner.scan(text)) if "nltk" in selected_models: all_matches.extend(self.scan_with_nltk(text)) if "spacy" in selected_models: all_matches.extend(self.spacy_analyzer.scan(text)) if "presidio" in selected_models: all_matches.extend(self.presidio_analyzer.scan(text)) if "gliner" in selected_models: all_matches.extend(run_model_with_chunking(self.gliner_analyzer, text)) if "deberta" in selected_models: all_matches.extend(run_model_with_chunking(self.deberta_analyzer, text)) # --- Lazy-loaded models (instantiated on first use) --- for lazy_key in ["pasteproof", "piiranha", "nvidia_gliner", "mmbert"]: if lazy_key in selected_models: model = self._get_lazy_model(lazy_key) if model: all_matches.extend(run_model_with_chunking(model, text)) # Sort and Deduplicate by span overlap (keep longest match) all_matches.sort(key=lambda x: x['start']) unique = [] if not all_matches: return [] curr = all_matches[0] for next_m in all_matches[1:]: if next_m['start'] < curr['end']: if len(next_m['text']) > len(curr['text']): curr = next_m else: unique.append(curr) curr = next_m unique.append(curr) return unique def run_full_inspection(self, text: str, selected_models: List[str] = None): """ Builds a per-model match dictionary dynamically, so the Inspector table automatically reflects exactly which models were activated. """ if not selected_models: selected_models = list(self._ALWAYS_ON) from chunking_engine import run_model_with_chunking model_results: Dict[str, list] = {} if "regex" in selected_models: model_results["🛠️ Regex"] = self.regex_scanner.scan(text) if "nltk" in selected_models: model_results["🧠 NLTK"] = self.scan_with_nltk(text) if "spacy" in selected_models: model_results["🤖 SpaCy"] = self.spacy_analyzer.scan(text) if "presidio" in selected_models: model_results["🛡️ Presidio"] = self.presidio_analyzer.scan(text) if "gliner" in selected_models: model_results["🦅 GLiNER"] = run_model_with_chunking(self.gliner_analyzer, text) if "deberta" in selected_models: model_results["🚀 DeBERTa"] = run_model_with_chunking(self.deberta_analyzer, text) for lazy_key, label in [("pasteproof", "📋 Pasteproof"), ("piiranha", "🐟 Piiranha"), ("nvidia_gliner", "⚡ NVIDIA-GLiNER"), ("mmbert", "🌐 mmbert32k")]: if lazy_key in selected_models: m = self._get_lazy_model(lazy_key) if m: model_results[label] = run_model_with_chunking(m, text) return self.inspector.compare_models_dynamic(model_results) def run_weighted_ensemble(self, text: str) -> List[dict]: """ Runs the 'God Algorithm' Weighted Ensemble. Combines rule-based (Regex/Presidio) and contextual (DeBERTa/GLiNER) detections, weights them, and groups by Intersection-over-Union (IoU) to resolve conflicts. """ from chunking_engine import run_model_with_chunking, deduplicate_overlapping_entities raw_detections = [] # 1. Run all models and assign trust weights based on their architecture # Highly trusted deterministic / rule-based models for m in self.regex_scanner.scan(text): m["weight"] = 1.0; m["source"] = "Ensemble (Regex)" raw_detections.append(m) for m in self.presidio_analyzer.scan(text): m["weight"] = 0.95; m["source"] = "Ensemble (Presidio)" raw_detections.append(m) # Context-aware deep learning models (chunked) for m in run_model_with_chunking(self.deberta_analyzer, text): m["weight"] = 0.85; m["source"] = "Ensemble (DeBERTa)" raw_detections.append(m) for m in run_model_with_chunking(self.gliner_analyzer, text): m["weight"] = 0.75; m["source"] = "Ensemble (GLiNER)" raw_detections.append(m) # Baseline statistical models for m in self.spacy_analyzer.scan(text): m["weight"] = 0.5; m["source"] = "Ensemble (SpaCy)" raw_detections.append(m) if not raw_detections: return [] # 2. Cluster overlapping detections # Sort by start coordinate to make grouping easier raw_detections.sort(key=lambda x: x["start"]) clusters = [] current_cluster = [raw_detections[0]] for det in raw_detections[1:]: # If the current detection overlaps with the active cluster # (i.e. start is before the end of the last item in the cluster) if det["start"] <= max(x["end"] for x in current_cluster): current_cluster.append(det) else: clusters.append(current_cluster) current_cluster = [det] clusters.append(current_cluster) # 3. Resolve conflicts within each cluster final_detections = [] for cluster in clusters: if len(cluster) == 1: final_detections.append(cluster[0]) continue # Aggregate weights by label label_weights = {} for det in cluster: lbl = det["label"] w = det["weight"] label_weights[lbl] = label_weights.get(lbl, 0) + w # Pick the winning label winning_label = max(label_weights.items(), key=lambda x: x[1])[0] # Find the detection in this cluster that has the winning label and highest individual weight candidates = [c for c in cluster if c["label"] == winning_label] if not candidates: candidates = cluster # fallback, shouldn't happen best_det = max(candidates, key=lambda x: x["weight"]) # Optionally, expand boundaries to encompass the maximum matched area min_start = min(c["start"] for c in cluster if c["label"] == winning_label) max_end = max(c["end"] for c in cluster if c["label"] == winning_label) best_det["start"] = min_start best_det["end"] = max_end best_det["text"] = text[min_start:max_end] final_detections.append(best_det) # 4. Final IoU Deduplication to clean up any remaining sloppy edges return deduplicate_overlapping_entities(final_detections, iou_threshold=0.3) # --- WRAPPERS FOR UI --- def get_json_data(self, file_obj) -> pd.DataFrame: return self.json_handler.read_file(file_obj) def get_pdf_page_text(self, file_bytes, page_num): return self.pdf_handler.get_page_text(file_bytes, page_num) def get_pdf_total_pages(self, file_bytes) -> int: return self.pdf_handler.get_total_pages(file_bytes) def get_labeled_pdf_image(self, file_bytes, page_num): text = self.get_pdf_page_text(file_bytes, page_num) matches = self.analyze_text_hybrid(text) return self.pdf_handler.render_labeled_image(file_bytes, page_num, matches, self.colors) def get_avro_data(self, file_bytes) -> pd.DataFrame: return self.avro_handler.convert_to_dataframe(file_bytes) def get_parquet_data(self, file_bytes) -> pd.DataFrame: return self.parquet_handler.convert_to_dataframe(file_bytes) def get_ocr_text_from_image(self, file_bytes) -> str: return self.ocr_engine.extract_text(file_bytes) def get_pii_counts_dataframe(self, df: pd.DataFrame, selected_models: List[str] = None) -> pd.DataFrame: if df.empty: return pd.DataFrame(columns=["PII Type", "Count"]) # Sample at most 100 rows to prevent massive string flattening sample_size = min(100, len(df)) df_sample = df.sample(n=sample_size, random_state=42) if sample_size > 0 else df # Select only object/string columns to avoid useless numbers str_cols = df_sample.select_dtypes(include=['object', 'string']) if str_cols.empty: return pd.DataFrame(columns=["PII Type", "Count"]) # Truncate to a safe maximum limit (approx 50k chars) to prevent Transformer OOM text = " ".join(str_cols.fillna("").astype(str).values.flatten()) return self.get_pii_counts(text[:50000], selected_models) def get_pii_counts(self, text: str, selected_models: List[str] = None) -> pd.DataFrame: matches = self.analyze_text_hybrid(str(text), selected_models) if not matches: return pd.DataFrame(columns=["PII Type", "Count"]) counts = {} for m in matches: counts[m['label']] = counts.get(m['label'], 0) + 1 return pd.DataFrame(list(counts.items()), columns=["PII Type", "Count"]) def mask_dataframe(self, df: pd.DataFrame, selected_models: List[str] = None) -> pd.DataFrame: df_masked = df.copy().astype(str) def mask_text(text): if pd.isna(text) or text == "nan" or text == "None": return "" if not text.strip(): return text try: matches = self.analyze_text_hybrid(text, selected_models) if not matches: return text matches.sort(key=lambda x: x['start'], reverse=True) for m in matches: if "***" not in text[m['start']:m['end']]: text = text[:m['start']] + "******" + text[m['end']:] return text except Exception as e: print(f"Masking error: {e}") return text # Process all columns to catch numeric PII (like phone numbers) for col in df_masked.columns: unique_vals = df_masked[col].unique() masked_map = {val: mask_text(val) for val in unique_vals} df_masked[col] = df_masked[col].map(masked_map) return df_masked def scan_dataframe_with_html(self, df: pd.DataFrame, selected_models: List[str] = None) -> pd.DataFrame: df_scanned = df.copy().astype(str) def highlight(text): if pd.isna(text) or text == "nan" or text == "None": return "" if not text.strip(): return text try: matches = self.analyze_text_hybrid(text, selected_models) if not matches: return text matches.sort(key=lambda x: x['start'], reverse=True) for m in matches: if "{m["text"]}' text = text[:m['start']] + replacement + text[m['end']:] return text except Exception as e: print(f"Highlight error: {e}") return text # Process all columns to catch numeric PII (like phone numbers) for col in df_scanned.columns: unique_vals = df_scanned[col].unique() highlight_map = {val: highlight(val) for val in unique_vals} df_scanned[col] = df_scanned[col].map(highlight_map) return df_scanned def get_data_schema(self, df): return pd.DataFrame({"Column": df.columns, "Type": df.dtypes.astype(str)}) # --- CONNECTOR WRAPPERS --- def get_postgres_data(self, host, port, db, user, pw, table): return self.pg_handler.fetch_data(host, port, db, user, pw, table) def get_mysql_data(self, host, port, db, user, pw, table): return self.mysql_handler.fetch_data(host, port, db, user, pw, table) def get_mongodb_data(self, host, port, db, user, pw, collection): return self.mongo_handler.fetch_data(host, port, db, user, pw, collection) def get_gmail_data(self, credentials_file, num_emails=10) -> pd.DataFrame: return self.gmail_handler.fetch_emails(credentials_file, num_emails) def get_google_drive_files(self, credentials_dict): return self.drive_handler.list_files(credentials_dict) def download_drive_file(self, file_id, mime_type, credentials_dict): return self.drive_handler.download_file(file_id, mime_type, credentials_dict) def get_s3_buckets(self, a, s, r): return self.s3_handler.get_buckets(a, s, r) def get_s3_files(self, a, s, r, b): return self.s3_handler.get_files(a, s, r, b) def download_s3_file(self, a, s, r, b, k): return self.s3_handler.download_file(a, s, r, b, k) def get_azure_containers(self, c): return self.azure_handler.get_containers(c) def get_azure_blobs(self, c, n): return self.azure_handler.get_blobs(c, n) def download_azure_blob(self, c, n, b): return self.azure_handler.download_blob(c, n, b) def get_gcs_buckets(self, c): return self.gcp_handler.get_buckets(c) def get_gcs_files(self, c, b): return self.gcp_handler.get_files(c, b) def download_gcs_file(self, c, b, n): return self.gcp_handler.download_file(c, b, n) # --- ENTERPRISE WRAPPERS --- def get_slack_messages(self, token, channel_id): return self.slack_handler.fetch_messages(token, channel_id) def get_confluence_page(self, url, username, token, page_id): return self.confluence_handler.fetch_page_content(url, username, token, page_id)