sense-backend / file_handlers /semi_structured_handlers.py
SHAFI
implemented presidio levelfoundation
aeefdb5
"""
Semi-Structured File Handlers
Extracts plain text from: json, xml, yaml, html, csv, tsv, ini, toml, config, edifact
"""
import io
import json as json_lib
def parse_json(file_bytes: bytes) -> str:
"""JSON β€” flatten all string values into lines."""
try:
data = json_lib.loads(file_bytes.decode("utf-8", errors="replace"))
lines = []
_flatten_json(data, lines)
return "\n".join(lines)
except Exception as e:
return file_bytes.decode("utf-8", errors="replace")
def _flatten_json(obj, lines: list, path: str = "", depth: int = 0):
"""Recursively extract all key-value pairs from JSON with context injection."""
if depth > 10:
return
if isinstance(obj, dict):
for k, v in obj.items():
new_path = f"{path}.{k}" if path else str(k)
_flatten_json(v, lines, new_path, depth + 1)
elif isinstance(obj, list):
for i, item in enumerate(obj):
new_path = f"{path}[{i}]"
_flatten_json(item, lines, new_path, depth + 1)
elif obj is not None and str(obj).strip():
# Context Injection: Bind the JSON key path directly to the value
lines.append(f"[{path}]: {obj}")
def parse_xml(file_bytes: bytes) -> str:
"""XML β€” extract all text nodes."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(file_bytes, "lxml-xml")
return soup.get_text(separator="\n")
except Exception:
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(file_bytes, "lxml")
return soup.get_text(separator="\n")
except Exception as e:
return file_bytes.decode("utf-8", errors="replace")
def parse_yaml(file_bytes: bytes) -> str:
"""YAML β€” flatten all scalar values."""
try:
import yaml
data = yaml.safe_load(file_bytes.decode("utf-8", errors="replace"))
lines = []
_flatten_json(data, lines)
return "\n".join(lines)
except Exception:
return file_bytes.decode("utf-8", errors="replace")
def parse_html(file_bytes: bytes) -> str:
"""HTML β€” extract visible text."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(file_bytes, "lxml")
# Remove script and style elements
for tag in soup(["script", "style", "meta", "link"]):
tag.decompose()
return soup.get_text(separator="\n")
except Exception as e:
import re
text = file_bytes.decode("utf-8", errors="replace")
return re.sub(r"<[^>]+>", " ", text)
def parse_csv(file_bytes: bytes) -> str:
"""CSV β€” use structured context injection."""
try:
import pandas as pd
from file_handlers.structured_handlers import _df_to_text
df = pd.read_csv(io.BytesIO(file_bytes))
return _df_to_text(df)
except Exception as e:
return file_bytes.decode("utf-8", errors="replace")
def parse_tsv(file_bytes: bytes) -> str:
"""TSV β€” use structured context injection."""
try:
import pandas as pd
from file_handlers.structured_handlers import _df_to_text
df = pd.read_csv(io.BytesIO(file_bytes), sep="\t")
return _df_to_text(df)
except Exception as e:
return file_bytes.decode("utf-8", errors="replace")
def parse_ini(file_bytes: bytes) -> str:
"""INI/Config β€” extract all key-value pairs as text."""
try:
import configparser
config = configparser.ConfigParser()
text = file_bytes.decode("utf-8", errors="replace")
config.read_string(text)
lines = []
for section in config.sections():
lines.append(f"[{section}]")
for key, val in config.items(section):
lines.append(f"{key} = {val}")
return "\n".join(lines) if lines else text
except Exception:
return file_bytes.decode("utf-8", errors="replace")
def parse_toml(file_bytes: bytes) -> str:
"""TOML β€” flatten all values to text."""
try:
import sys
text = file_bytes.decode("utf-8", errors="replace")
if sys.version_info >= (3, 11):
import tomllib
data = tomllib.loads(text)
else:
import tomli
data = tomli.loads(text)
lines = []
_flatten_json(data, lines)
return "\n".join(lines)
except Exception:
return file_bytes.decode("utf-8", errors="replace")
def parse_config(file_bytes: bytes) -> str:
"""Config files β€” treat as ini first, fallback to plain text."""
result = parse_ini(file_bytes)
if result.startswith("["):
return result
return file_bytes.decode("utf-8", errors="replace")
def parse_edifact(file_bytes: bytes) -> str:
"""EDIFACT β€” try pydifact, fallback to raw text extraction."""
try:
from pydifact.segmentcollection import RawSegmentCollection
text = file_bytes.decode("utf-8", errors="replace")
collection = RawSegmentCollection.from_str(text)
lines = []
for segment in collection:
# Each segment: tag + elements
parts = [segment.tag] + [
str(e) for e in (segment.elements or [])
]
lines.append(" ".join(parts))
return "\n".join(lines)
except Exception:
# Fallback: raw text β€” EDI has PII in fields like NAD segments
text = file_bytes.decode("utf-8", errors="replace")
import re
# Replace segment terminators with newlines for readability
text = re.sub(r"'", "\n", text)
text = re.sub(r"\+", " ", text)
return text
SEMI_STRUCTURED_PARSERS = {
"json": parse_json,
"xml": parse_xml,
"yaml": parse_yaml,
"yml": parse_yaml,
"html": parse_html,
"htm": parse_html,
"csv": parse_csv,
"tsv": parse_tsv,
"ini": parse_ini,
"toml": parse_toml,
"config": parse_config,
"cfg": parse_config,
"conf": parse_config,
"edifact": parse_edifact,
"edi": parse_edifact,
}