Spaces:
Running
Running
| """ | |
| Unstructured File Handlers | |
| Extracts plain text from: txt, pdf, docx, rtf, odt, md, log, eml, epub, pptx | |
| """ | |
| import io | |
| import email as email_lib | |
| from typing import Optional | |
| def parse_txt(file_bytes: bytes) -> str: | |
| """Plain text — UTF-8 with fallback.""" | |
| for enc in ("utf-8", "latin-1", "cp1252"): | |
| try: | |
| return file_bytes.decode(enc) | |
| except UnicodeDecodeError: | |
| continue | |
| return file_bytes.decode("utf-8", errors="replace") | |
| def parse_md(file_bytes: bytes) -> str: | |
| """Markdown — treat as plain text (strip common syntax).""" | |
| text = parse_txt(file_bytes) | |
| import re | |
| # Strip markdown syntax so models see clean prose | |
| text = re.sub(r"#{1,6}\s+", "", text) # headings | |
| text = re.sub(r"\*{1,2}(.*?)\*{1,2}", r"\1", text) # bold/italic | |
| text = re.sub(r"`{1,3}.*?`{1,3}", "", text, flags=re.DOTALL) # code | |
| text = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", text) # links | |
| return text.strip() | |
| def parse_log(file_bytes: bytes) -> str: | |
| """Log files — plain text.""" | |
| return parse_txt(file_bytes) | |
| def parse_eml(file_bytes: bytes) -> str: | |
| """EML email files — extract headers + body text.""" | |
| try: | |
| msg = email_lib.message_from_bytes(file_bytes) | |
| parts = [] | |
| # Key headers contain PII | |
| for header in ("From", "To", "Cc", "Subject", "Reply-To"): | |
| val = msg.get(header) | |
| if val: | |
| parts.append(f"{header}: {val}") | |
| # Body | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| ct = part.get_content_type() | |
| if ct in ("text/plain", "text/html"): | |
| payload = part.get_payload(decode=True) | |
| if payload: | |
| charset = part.get_content_charset() or "utf-8" | |
| decoded = payload.decode(charset, errors="replace") | |
| if ct == "text/html": | |
| decoded = _strip_html(decoded) | |
| parts.append(decoded) | |
| else: | |
| payload = msg.get_payload(decode=True) | |
| if payload: | |
| charset = msg.get_content_charset() or "utf-8" | |
| parts.append(payload.decode(charset, errors="replace")) | |
| return "\n".join(parts) | |
| except Exception as e: | |
| return f"[EML parse error: {e}]" | |
| def parse_pdf(file_bytes: bytes) -> str: | |
| """PDF — use PyMuPDF (fitz) with layout-aware sorting.""" | |
| try: | |
| import fitz | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| # sort=True tells PyMuPDF to analyze the layout geometry and read in natural reading order | |
| # This prevents two-column documents from being merged into broken sentences horizontally. | |
| pages = [doc[i].get_text("text", sort=True) for i in range(len(doc))] | |
| return "\n".join(pages) | |
| except Exception as e: | |
| return f"[PDF parse error: {e}]" | |
| def parse_docx(file_bytes: bytes) -> str: | |
| """DOCX — python-docx.""" | |
| try: | |
| from docx import Document | |
| doc = Document(io.BytesIO(file_bytes)) | |
| paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] | |
| # Also extract table cell text | |
| for table in doc.tables: | |
| for row in table.rows: | |
| for cell in row.cells: | |
| if cell.text.strip(): | |
| paragraphs.append(cell.text.strip()) | |
| return "\n".join(paragraphs) | |
| except Exception as e: | |
| return f"[DOCX parse error: {e}]" | |
| def parse_rtf(file_bytes: bytes) -> str: | |
| """RTF — striprtf.""" | |
| try: | |
| from striprtf.striprtf import rtf_to_text | |
| rtf_str = file_bytes.decode("utf-8", errors="replace") | |
| return rtf_to_text(rtf_str) | |
| except Exception as e: | |
| return f"[RTF parse error: {e}]" | |
| def parse_odt(file_bytes: bytes) -> str: | |
| """ODT — odfpy.""" | |
| try: | |
| from odf.opendocument import load | |
| from odf.text import P | |
| from odf import text as odf_text | |
| doc = load(io.BytesIO(file_bytes)) | |
| paragraphs = [] | |
| for p in doc.getElementsByType(P): | |
| content = "".join( | |
| node.data for node in p.childNodes | |
| if node.nodeType == node.TEXT_NODE | |
| ) | |
| if content.strip(): | |
| paragraphs.append(content.strip()) | |
| return "\n".join(paragraphs) | |
| except Exception as e: | |
| return f"[ODT parse error: {e}]" | |
| def parse_epub(file_bytes: bytes) -> str: | |
| """EPUB — ebooklib.""" | |
| try: | |
| import ebooklib | |
| from ebooklib import epub | |
| book = epub.read_epub(io.BytesIO(file_bytes)) | |
| texts = [] | |
| for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT): | |
| content = item.get_content() | |
| texts.append(_strip_html(content.decode("utf-8", errors="replace"))) | |
| return "\n".join(texts) | |
| except Exception as e: | |
| return f"[EPUB parse error: {e}]" | |
| def parse_pptx(file_bytes: bytes) -> str: | |
| """PPTX — python-pptx.""" | |
| try: | |
| from pptx import Presentation | |
| prs = Presentation(io.BytesIO(file_bytes)) | |
| lines = [] | |
| for slide_num, slide in enumerate(prs.slides, 1): | |
| lines.append(f"[Slide {slide_num}]") | |
| for shape in slide.shapes: | |
| if shape.has_text_frame: | |
| for para in shape.text_frame.paragraphs: | |
| text = para.text.strip() | |
| if text: | |
| lines.append(text) | |
| return "\n".join(lines) | |
| except Exception as e: | |
| return f"[PPTX parse error: {e}]" | |
| def _strip_html(html_text: str) -> str: | |
| """Remove HTML tags, return plain text.""" | |
| try: | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(html_text, "lxml") | |
| return soup.get_text(separator="\n") | |
| except Exception: | |
| import re | |
| return re.sub(r"<[^>]+>", " ", html_text) | |
| UNSTRUCTURED_PARSERS = { | |
| "txt": parse_txt, | |
| "pdf": parse_pdf, | |
| "docx": parse_docx, | |
| "rtf": parse_rtf, | |
| "odt": parse_odt, | |
| "md": parse_md, | |
| "log": parse_log, | |
| "eml": parse_eml, | |
| "epub": parse_epub, | |
| "pptx": parse_pptx, | |
| } | |