""" Unstructured File Handlers Extracts plain text from: txt, pdf, docx, rtf, odt, md, log, eml, epub, pptx """ import io import email as email_lib from typing import Optional def parse_txt(file_bytes: bytes) -> str: """Plain text — UTF-8 with fallback.""" for enc in ("utf-8", "latin-1", "cp1252"): try: return file_bytes.decode(enc) except UnicodeDecodeError: continue return file_bytes.decode("utf-8", errors="replace") def parse_md(file_bytes: bytes) -> str: """Markdown — treat as plain text (strip common syntax).""" text = parse_txt(file_bytes) import re # Strip markdown syntax so models see clean prose text = re.sub(r"#{1,6}\s+", "", text) # headings text = re.sub(r"\*{1,2}(.*?)\*{1,2}", r"\1", text) # bold/italic text = re.sub(r"`{1,3}.*?`{1,3}", "", text, flags=re.DOTALL) # code text = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", text) # links return text.strip() def parse_log(file_bytes: bytes) -> str: """Log files — plain text.""" return parse_txt(file_bytes) def parse_eml(file_bytes: bytes) -> str: """EML email files — extract headers + body text.""" try: msg = email_lib.message_from_bytes(file_bytes) parts = [] # Key headers contain PII for header in ("From", "To", "Cc", "Subject", "Reply-To"): val = msg.get(header) if val: parts.append(f"{header}: {val}") # Body if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct in ("text/plain", "text/html"): payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" decoded = payload.decode(charset, errors="replace") if ct == "text/html": decoded = _strip_html(decoded) parts.append(decoded) else: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" parts.append(payload.decode(charset, errors="replace")) return "\n".join(parts) except Exception as e: return f"[EML parse error: {e}]" def parse_pdf(file_bytes: bytes) -> str: """PDF — use PyMuPDF (fitz) with layout-aware sorting.""" try: import fitz doc = fitz.open(stream=file_bytes, filetype="pdf") # sort=True tells PyMuPDF to analyze the layout geometry and read in natural reading order # This prevents two-column documents from being merged into broken sentences horizontally. pages = [doc[i].get_text("text", sort=True) for i in range(len(doc))] return "\n".join(pages) except Exception as e: return f"[PDF parse error: {e}]" def parse_docx(file_bytes: bytes) -> str: """DOCX — python-docx.""" try: from docx import Document doc = Document(io.BytesIO(file_bytes)) paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] # Also extract table cell text for table in doc.tables: for row in table.rows: for cell in row.cells: if cell.text.strip(): paragraphs.append(cell.text.strip()) return "\n".join(paragraphs) except Exception as e: return f"[DOCX parse error: {e}]" def parse_rtf(file_bytes: bytes) -> str: """RTF — striprtf.""" try: from striprtf.striprtf import rtf_to_text rtf_str = file_bytes.decode("utf-8", errors="replace") return rtf_to_text(rtf_str) except Exception as e: return f"[RTF parse error: {e}]" def parse_odt(file_bytes: bytes) -> str: """ODT — odfpy.""" try: from odf.opendocument import load from odf.text import P from odf import text as odf_text doc = load(io.BytesIO(file_bytes)) paragraphs = [] for p in doc.getElementsByType(P): content = "".join( node.data for node in p.childNodes if node.nodeType == node.TEXT_NODE ) if content.strip(): paragraphs.append(content.strip()) return "\n".join(paragraphs) except Exception as e: return f"[ODT parse error: {e}]" def parse_epub(file_bytes: bytes) -> str: """EPUB — ebooklib.""" try: import ebooklib from ebooklib import epub book = epub.read_epub(io.BytesIO(file_bytes)) texts = [] for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT): content = item.get_content() texts.append(_strip_html(content.decode("utf-8", errors="replace"))) return "\n".join(texts) except Exception as e: return f"[EPUB parse error: {e}]" def parse_pptx(file_bytes: bytes) -> str: """PPTX — python-pptx.""" try: from pptx import Presentation prs = Presentation(io.BytesIO(file_bytes)) lines = [] for slide_num, slide in enumerate(prs.slides, 1): lines.append(f"[Slide {slide_num}]") for shape in slide.shapes: if shape.has_text_frame: for para in shape.text_frame.paragraphs: text = para.text.strip() if text: lines.append(text) return "\n".join(lines) except Exception as e: return f"[PPTX parse error: {e}]" def _strip_html(html_text: str) -> str: """Remove HTML tags, return plain text.""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(html_text, "lxml") return soup.get_text(separator="\n") except Exception: import re return re.sub(r"<[^>]+>", " ", html_text) UNSTRUCTURED_PARSERS = { "txt": parse_txt, "pdf": parse_pdf, "docx": parse_docx, "rtf": parse_rtf, "odt": parse_odt, "md": parse_md, "log": parse_log, "eml": parse_eml, "epub": parse_epub, "pptx": parse_pptx, }