sense-backend / file_handlers /structured_handlers.py
SHAFI
implemented presidio levelfoundation
aeefdb5
"""
Structured File Handlers
Extracts plain text from: sqlite, sql, parquet, avro, xlsx, xls, orc, hdf5, feather, dta
"""
import io
def _df_to_text(df, max_rows: int = 500) -> str:
"""
Convert a DataFrame to a contextual text representation.
Instead of 'Val1 | Val2', outputs 'Row X: [Col1]: Val1, [Col2]: Val2'.
This 'Context Injection' ensures NLP models know what the numbers represent.
"""
try:
import pandas as pd
lines = []
columns = list(df.columns)
for idx, row in df.head(max_rows).iterrows():
row_parts = []
for col_idx, val in enumerate(row):
# Ignore nulls/nans to save token space
if pd.isna(val) or val == "":
continue
col_name = str(columns[col_idx]).strip()
row_parts.append(f"[{col_name}]: {val}")
if row_parts:
lines.append(f"Row {idx+1}: " + ", ".join(row_parts) + ".")
return "\n".join(lines)
except Exception as e:
return str(df.head(max_rows))
def parse_parquet(file_bytes: bytes) -> str:
"""Parquet — pyarrow."""
try:
import pandas as pd
df = pd.read_parquet(io.BytesIO(file_bytes))
return _df_to_text(df)
except Exception as e:
return f"[Parquet parse error: {e}]"
def parse_avro(file_bytes: bytes) -> str:
"""Avro — fastavro."""
try:
import fastavro
records = []
with io.BytesIO(file_bytes) as buf:
reader = fastavro.reader(buf)
for i, record in enumerate(reader):
records.append(record)
if i >= 499:
break
import pandas as pd
df = pd.DataFrame(records)
return _df_to_text(df)
except Exception as e:
return f"[Avro parse error: {e}]"
def parse_xlsx(file_bytes: bytes) -> str:
"""XLSX — pandas + openpyxl."""
try:
import pandas as pd
xl = pd.ExcelFile(io.BytesIO(file_bytes), engine="openpyxl")
texts = []
for sheet in xl.sheet_names:
df = xl.parse(sheet)
texts.append(f"[Sheet: {sheet}]")
texts.append(_df_to_text(df))
return "\n".join(texts)
except Exception as e:
return f"[XLSX parse error: {e}]"
def parse_xls(file_bytes: bytes) -> str:
"""XLS (legacy Excel) — pandas + xlrd."""
try:
import pandas as pd
xl = pd.ExcelFile(io.BytesIO(file_bytes), engine="xlrd")
texts = []
for sheet in xl.sheet_names:
df = xl.parse(sheet)
texts.append(f"[Sheet: {sheet}]")
texts.append(_df_to_text(df))
return "\n".join(texts)
except Exception as e:
return f"[XLS parse error: {e}]"
def parse_orc(file_bytes: bytes) -> str:
"""ORC — pyarrow."""
try:
import pyarrow.orc as orc
table = orc.read_table(io.BytesIO(file_bytes))
df = table.to_pandas()
return _df_to_text(df)
except Exception as e:
return f"[ORC parse error: {e}]"
def parse_feather(file_bytes: bytes) -> str:
"""Feather / Arrow IPC — pyarrow."""
try:
import pyarrow.feather as feather
table = feather.read_table(io.BytesIO(file_bytes))
df = table.to_pandas()
return _df_to_text(df)
except Exception as e:
return f"[Feather parse error: {e}]"
def parse_hdf5(file_bytes: bytes) -> str:
"""HDF5 — h5py."""
try:
import h5py
import numpy as np
lines = []
with h5py.File(io.BytesIO(file_bytes), "r") as f:
def _visit(name, obj):
if isinstance(obj, h5py.Dataset):
lines.append(f"[Dataset: {name}]")
data = obj[()]
# Decode bytes if needed
if data.dtype.kind in ("S", "O"):
flat = data.flatten()[:200]
for item in flat:
if isinstance(item, bytes):
lines.append(item.decode("utf-8", errors="replace"))
else:
lines.append(str(item))
else:
lines.append(str(data.flatten()[:50]))
f.visititems(_visit)
return "\n".join(lines) if lines else "[HDF5: no text data found]"
except Exception as e:
return f"[HDF5 parse error: {e}]"
def parse_sqlite(file_bytes: bytes) -> str:
"""SQLite database — read all tables."""
import tempfile, os, sqlite3
tmp_path = None
try:
# SQLite requires a file path; write to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=".db") as tmp:
tmp.write(file_bytes)
tmp_path = tmp.name
conn = sqlite3.connect(tmp_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
lines = []
for table in tables:
lines.append(f"[Table: {table}]")
cursor.execute(f"SELECT * FROM {table} LIMIT 500")
col_names = [d[0] for d in cursor.description]
lines.append(" | ".join(col_names))
for row in cursor.fetchall():
lines.append(" | ".join(str(v) for v in row))
conn.close()
return "\n".join(lines)
except Exception as e:
return f"[SQLite parse error: {e}]"
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
def parse_sql(file_bytes: bytes) -> str:
"""SQL script files — plain text (DDL/DML contains table/column names, sample data)."""
return file_bytes.decode("utf-8", errors="replace")
def parse_dta(file_bytes: bytes) -> str:
"""Stata .dta files — pyreadstat."""
try:
import pyreadstat
df, meta = pyreadstat.read_dta(io.BytesIO(file_bytes))
return _df_to_text(df)
except Exception as e:
return f"[DTA parse error: {e}]"
STRUCTURED_PARSERS = {
"parquet": parse_parquet,
"avro": parse_avro,
"xlsx": parse_xlsx,
"xls": parse_xls,
"orc": parse_orc,
"feather": parse_feather,
"arrow": parse_feather,
"hdf5": parse_hdf5,
"h5": parse_hdf5,
"sqlite": parse_sqlite,
"db": parse_sqlite,
"sql": parse_sql,
"dta": parse_dta,
}