""" Segmento Chunker Implements Presidio's sliding-window chunking logic for deep learning NLP models. Prevents the 512-token limit truncation and deduplicates overlapping entities using IoU. """ from typing import List, Dict, Any, Callable, Tuple import logging logger = logging.getLogger("segmento.chunker") class TextChunk: def __init__(self, text: str, start: int, end: int): self.text = text self.start = start self.end = end class SegmentoChunker: """Character-based text chunker with word boundary preservation.""" def __init__(self, chunk_size: int = 2000, chunk_overlap: int = 200): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.boundary_chars = (" ", "\n", "\t") def chunk(self, text: str) -> List[TextChunk]: """Split text into overlapping chunks, extending to the nearest word boundary.""" if not text: return [] chunks = [] start = 0 while start < len(text): end = min(start + self.chunk_size, len(text)) # Extend to complete word boundary while end < len(text) and text[end] not in self.boundary_chars: end += 1 chunks.append(TextChunk(text=text[start:end], start=start, end=end)) if end >= len(text): break # Slide window forward by (size - overlap) # Actually, standard Presidio does: start = end - chunk_overlap start = end - self.chunk_overlap return chunks def _calculate_iou(start1: int, end1: int, start2: int, end2: int) -> float: """Calculate Intersection-over-Union for two text spans.""" intersection = max(0, min(end1, end2) - max(start1, start2)) if intersection == 0: return 0.0 union = (end1 - start1) + (end2 - start2) - intersection return intersection / union if union > 0 else 0.0 def deduplicate_overlapping_entities(entities: List[Dict[str, Any]], iou_threshold: float = 0.5) -> List[Dict[str, Any]]: """ Remove duplicate entities that overlap across chunks. Keeps the entity with the highest score. """ if not entities: return [] # Sort by score descending so we keep the highest confidence matches # If score is missing, default to 1.0 entities = sorted(entities, key=lambda x: x.get('score', 1.0), reverse=True) unique_entities = [] for entity in entities: is_duplicate = False for unique_ent in unique_entities: # If same label and high IoU overlap if entity.get("label") == unique_ent.get("label"): iou = _calculate_iou( entity["start"], entity["end"], unique_ent["start"], unique_ent["end"] ) if iou >= iou_threshold: is_duplicate = True break if not is_duplicate: unique_entities.append(entity) # Sort back by original text position unique_entities = sorted(unique_entities, key=lambda x: x["start"]) return unique_entities def run_model_with_chunking( model: Any, text: str, chunk_size: int = 1500, overlap: int = 150 ) -> List[Dict[str, Any]]: """ Wrapper function to pass large text through a Deep Learning model safely. Splits text, runs the model on chunks (using fast batching if supported), maps coordinates back, and deduplicates. """ if not text: return [] chunker = SegmentoChunker(chunk_size=chunk_size, chunk_overlap=overlap) chunks = chunker.chunk(text) all_detections = [] # Check if the model supports fast Batch Vectorization if hasattr(model, "scan_batch"): batch_texts = [c.text for c in chunks] try: batch_results = model.scan_batch(batch_texts) for i, chunk_results in enumerate(batch_results): chunk = chunks[i] for res in chunk_results: # Map local coordinates to absolute document coordinates mapped_res = res.copy() mapped_res["start"] = res["start"] + chunk.start mapped_res["end"] = res["end"] + chunk.start all_detections.append(mapped_res) except Exception as e: logger.error(f"Error scanning batch: {e}") # Fallback to slower sequential loop if batching is not supported else: # We assume the model object itself is a callable function, or it has a .scan() method scan_func = model.scan if hasattr(model, "scan") else model for chunk in chunks: try: chunk_results = scan_func(chunk.text) for res in chunk_results: mapped_res = res.copy() mapped_res["start"] = res["start"] + chunk.start mapped_res["end"] = res["end"] + chunk.start all_detections.append(mapped_res) except Exception as e: logger.error(f"Error scanning chunk: {e}") continue # Deduplicate overlapping entities caused by the sliding window deduplicated = deduplicate_overlapping_entities(all_detections, iou_threshold=0.5) return deduplicated