""" Structured File Handlers Extracts plain text from: sqlite, sql, parquet, avro, xlsx, xls, orc, hdf5, feather, dta """ import io def _df_to_text(df, max_rows: int = 500) -> str: """ Convert a DataFrame to a contextual text representation. Instead of 'Val1 | Val2', outputs 'Row X: [Col1]: Val1, [Col2]: Val2'. This 'Context Injection' ensures NLP models know what the numbers represent. """ try: import pandas as pd lines = [] columns = list(df.columns) for idx, row in df.head(max_rows).iterrows(): row_parts = [] for col_idx, val in enumerate(row): # Ignore nulls/nans to save token space if pd.isna(val) or val == "": continue col_name = str(columns[col_idx]).strip() row_parts.append(f"[{col_name}]: {val}") if row_parts: lines.append(f"Row {idx+1}: " + ", ".join(row_parts) + ".") return "\n".join(lines) except Exception as e: return str(df.head(max_rows)) def parse_parquet(file_bytes: bytes) -> str: """Parquet — pyarrow.""" try: import pandas as pd df = pd.read_parquet(io.BytesIO(file_bytes)) return _df_to_text(df) except Exception as e: return f"[Parquet parse error: {e}]" def parse_avro(file_bytes: bytes) -> str: """Avro — fastavro.""" try: import fastavro records = [] with io.BytesIO(file_bytes) as buf: reader = fastavro.reader(buf) for i, record in enumerate(reader): records.append(record) if i >= 499: break import pandas as pd df = pd.DataFrame(records) return _df_to_text(df) except Exception as e: return f"[Avro parse error: {e}]" def parse_xlsx(file_bytes: bytes) -> str: """XLSX — pandas + openpyxl.""" try: import pandas as pd xl = pd.ExcelFile(io.BytesIO(file_bytes), engine="openpyxl") texts = [] for sheet in xl.sheet_names: df = xl.parse(sheet) texts.append(f"[Sheet: {sheet}]") texts.append(_df_to_text(df)) return "\n".join(texts) except Exception as e: return f"[XLSX parse error: {e}]" def parse_xls(file_bytes: bytes) -> str: """XLS (legacy Excel) — pandas + xlrd.""" try: import pandas as pd xl = pd.ExcelFile(io.BytesIO(file_bytes), engine="xlrd") texts = [] for sheet in xl.sheet_names: df = xl.parse(sheet) texts.append(f"[Sheet: {sheet}]") texts.append(_df_to_text(df)) return "\n".join(texts) except Exception as e: return f"[XLS parse error: {e}]" def parse_orc(file_bytes: bytes) -> str: """ORC — pyarrow.""" try: import pyarrow.orc as orc table = orc.read_table(io.BytesIO(file_bytes)) df = table.to_pandas() return _df_to_text(df) except Exception as e: return f"[ORC parse error: {e}]" def parse_feather(file_bytes: bytes) -> str: """Feather / Arrow IPC — pyarrow.""" try: import pyarrow.feather as feather table = feather.read_table(io.BytesIO(file_bytes)) df = table.to_pandas() return _df_to_text(df) except Exception as e: return f"[Feather parse error: {e}]" def parse_hdf5(file_bytes: bytes) -> str: """HDF5 — h5py.""" try: import h5py import numpy as np lines = [] with h5py.File(io.BytesIO(file_bytes), "r") as f: def _visit(name, obj): if isinstance(obj, h5py.Dataset): lines.append(f"[Dataset: {name}]") data = obj[()] # Decode bytes if needed if data.dtype.kind in ("S", "O"): flat = data.flatten()[:200] for item in flat: if isinstance(item, bytes): lines.append(item.decode("utf-8", errors="replace")) else: lines.append(str(item)) else: lines.append(str(data.flatten()[:50])) f.visititems(_visit) return "\n".join(lines) if lines else "[HDF5: no text data found]" except Exception as e: return f"[HDF5 parse error: {e}]" def parse_sqlite(file_bytes: bytes) -> str: """SQLite database — read all tables.""" import tempfile, os, sqlite3 tmp_path = None try: # SQLite requires a file path; write to temp file with tempfile.NamedTemporaryFile(delete=False, suffix=".db") as tmp: tmp.write(file_bytes) tmp_path = tmp.name conn = sqlite3.connect(tmp_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] lines = [] for table in tables: lines.append(f"[Table: {table}]") cursor.execute(f"SELECT * FROM {table} LIMIT 500") col_names = [d[0] for d in cursor.description] lines.append(" | ".join(col_names)) for row in cursor.fetchall(): lines.append(" | ".join(str(v) for v in row)) conn.close() return "\n".join(lines) except Exception as e: return f"[SQLite parse error: {e}]" finally: if tmp_path and os.path.exists(tmp_path): try: os.unlink(tmp_path) except Exception: pass def parse_sql(file_bytes: bytes) -> str: """SQL script files — plain text (DDL/DML contains table/column names, sample data).""" return file_bytes.decode("utf-8", errors="replace") def parse_dta(file_bytes: bytes) -> str: """Stata .dta files — pyreadstat.""" try: import pyreadstat df, meta = pyreadstat.read_dta(io.BytesIO(file_bytes)) return _df_to_text(df) except Exception as e: return f"[DTA parse error: {e}]" STRUCTURED_PARSERS = { "parquet": parse_parquet, "avro": parse_avro, "xlsx": parse_xlsx, "xls": parse_xls, "orc": parse_orc, "feather": parse_feather, "arrow": parse_feather, "hdf5": parse_hdf5, "h5": parse_hdf5, "sqlite": parse_sqlite, "db": parse_sqlite, "sql": parse_sql, "dta": parse_dta, }