legal-eye / tau_rag /encoding /hebrew_encoder.py
Legal-i's picture
Initial deploy: legal-eye Hebrew legal RAG (17K corpus, verbatim-from-precedent)
3be54c6 verified
"""
═══════════════════════════════════════════════════════════════════════════════
TAU Platform v4.0 - Enhanced Hebrew Encoder
═══════════════════════════════════════════════════════════════════════════════
Unified Hebrew text encoder combining:
1. Gematria encoding (log-scale + bigrams + position weights)
2. Spherical trajectory encoding (with curvature/torsion)
3. Morphological analysis (root extraction)
4. τ complexity metrics (Heaps' Law adjusted)
5. Vocabulary encoding (word-to-index)
6. Compression integration
Author: Avri Barzel
Date: November 2025
═══════════════════════════════════════════════════════════════════════════════
"""
import numpy as np
import re
import hashlib
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from collections import Counter
from functools import lru_cache
# ═══════════════════════════════════════════════════════════════════════════════
# PRE-COMPILED REGEX PATTERNS (Performance optimization)
# ═══════════════════════════════════════════════════════════════════════════════
NIQQUD_PATTERN = re.compile(r"[\u0591-\u05C7]")
HEBREW_WORD_PATTERN = re.compile(r"[\u05D0-\u05EA]+")
# Universal multilingual word pattern - supports ALL major scripts
MULTILINGUAL_WORD_PATTERN = re.compile(
r"[\u05D0-\u05EA]+" # Hebrew
r"|[a-zA-Z]+" # Latin/English
r"|[\u0400-\u04FF]+" # Cyrillic (Russian, etc.)
r"|[\u0600-\u06FF]+" # Arabic
r"|[\u4E00-\u9FFF]+" # Chinese (CJK)
r"|[\u3040-\u309F]+" # Japanese Hiragana
r"|[\u30A0-\u30FF]+" # Japanese Katakana
r"|[\uAC00-\uD7AF]+" # Korean Hangul
r"|[\u0900-\u097F]+" # Hindi/Devanagari
r"|[\u0370-\u03FF]+" # Greek
r"|[\u0E00-\u0E7F]+" # Thai
r"|[\u0590-\u05FF]+" # Hebrew extended
r"|\d+" # Numbers
)
WORD_OR_PUNCT_PATTERN = re.compile(r"[\w]+|[^\w\s]")
# ═══════════════════════════════════════════════════════════════════════════════
# CONSTANTS - UNIVERSAL CHARACTER VALUES
# ═══════════════════════════════════════════════════════════════════════════════
# Hebrew Gematria (original, 1-400)
GEMATRIA = {
"א": 1,
"ב": 2,
"ג": 3,
"ד": 4,
"ה": 5,
"ו": 6,
"ז": 7,
"ח": 8,
"ט": 9,
"י": 10,
"כ": 20,
"ך": 20,
"ל": 30,
"מ": 40,
"ם": 40,
"נ": 50,
"ן": 50,
"ס": 60,
"ע": 70,
"פ": 80,
"ף": 80,
"צ": 90,
"ץ": 90,
"ק": 100,
"ר": 200,
"ש": 300,
"ת": 400,
}
# English/Latin letters (scaled to 1-400 range, frequency-aware)
LATIN_VALUES = {
# Lowercase - frequency weighted (common letters = higher values)
"e": 400,
"t": 380,
"a": 360,
"o": 340,
"i": 320,
"n": 300,
"s": 280,
"h": 260,
"r": 240,
"d": 220,
"l": 200,
"c": 180,
"u": 160,
"m": 140,
"w": 120,
"f": 100,
"g": 90,
"y": 80,
"p": 70,
"b": 60,
"v": 50,
"k": 40,
"j": 30,
"x": 20,
"q": 15,
"z": 10,
}
# Add uppercase with same values
LATIN_VALUES.update({c.upper(): v for c, v in LATIN_VALUES.items()})
# Cyrillic (Russian, Ukrainian, etc.) - mapped to similar Hebrew values
CYRILLIC_VALUES = {
# Russian alphabet (33 letters) - scaled to 1-400
"а": 1,
"б": 15,
"в": 30,
"г": 45,
"д": 60,
"е": 75,
"ё": 80,
"ж": 95,
"з": 110,
"и": 125,
"й": 140,
"к": 155,
"л": 170,
"м": 185,
"н": 200,
"о": 215,
"п": 230,
"р": 245,
"с": 260,
"т": 275,
"у": 290,
"ф": 305,
"х": 320,
"ц": 335,
"ч": 350,
"ш": 365,
"щ": 375,
"ъ": 380,
"ы": 385,
"ь": 390,
"э": 393,
"ю": 396,
"я": 400,
}
# Add uppercase
CYRILLIC_VALUES.update({c.upper(): v for c, v in CYRILLIC_VALUES.items()})
# Arabic letters - mapped to similar range as Hebrew
ARABIC_VALUES = {
"ا": 1,
"ب": 15,
"ت": 30,
"ث": 45,
"ج": 60,
"ح": 75,
"خ": 90,
"د": 105,
"ذ": 120,
"ر": 135,
"ز": 150,
"س": 165,
"ش": 180,
"ص": 195,
"ض": 210,
"ط": 225,
"ظ": 240,
"ع": 255,
"غ": 270,
"ف": 285,
"ق": 300,
"ك": 315,
"ل": 330,
"م": 345,
"ن": 360,
"ه": 375,
"و": 385,
"ي": 400,
}
# Greek letters - scientific/math importance
GREEK_VALUES = {
"α": 1,
"β": 15,
"γ": 30,
"δ": 45,
"ε": 60,
"ζ": 75,
"η": 90,
"θ": 105,
"ι": 120,
"κ": 135,
"λ": 150,
"μ": 165,
"ν": 180,
"ξ": 195,
"ο": 210,
"π": 314,
"ρ": 240,
"σ": 255,
"τ": 270,
"υ": 285,
"φ": 300,
"χ": 315,
"ψ": 330,
"ω": 400,
}
# Add uppercase
GREEK_VALUES.update({c.upper(): v for c, v in GREEK_VALUES.items()})
# Special: π gets special value (3.14...)
GREEK_VALUES["Π"] = 314
# Hindi/Devanagari - 46 primary characters
DEVANAGARI_VALUES = {chr(0x0905 + i): (i + 1) * 8 for i in range(50)} # अ to ह
# Chinese/CJK - use stroke count approximation (1-400 range)
# Common characters mapped by frequency
CJK_COMMON = {
"的": 400,
"一": 1,
"是": 380,
"不": 360,
"了": 340,
"在": 320,
"人": 300,
"有": 280,
"我": 260,
"他": 240,
"这": 220,
"个": 200,
"们": 180,
"中": 160,
"来": 140,
"上": 120,
"大": 100,
"为": 80,
"和": 60,
"国": 40,
"地": 20,
}
# Japanese Hiragana (46 characters)
HIRAGANA_VALUES = {chr(0x3041 + i): (i + 1) * 8 for i in range(83)} # ぁ to ん
# Japanese Katakana (46 characters)
KATAKANA_VALUES = {chr(0x30A1 + i): (i + 1) * 8 for i in range(83)} # ァ to ン
# Korean Hangul - use syllable block value
# Basic Jamo (consonants and vowels)
HANGUL_JAMO = {chr(0x1100 + i): (i + 1) * 10 for i in range(40)} # ᄀ to ᄒ
# Thai alphabet (44 consonants + vowels)
THAI_VALUES = {chr(0x0E01 + i): (i + 1) * 8 for i in range(58)} # ก to ฮ
# Mathematical symbols with semantic values
MATH_SYMBOLS = {
# Operators
"+": 100,
"-": 100,
"*": 150,
"/": 150,
"=": 200,
"×": 150,
"÷": 150,
"±": 100,
"∓": 100,
# Comparison
"<": 75,
">": 75,
"≤": 80,
"≥": 80,
"≠": 85,
"≈": 90,
"≡": 95,
# Brackets
"(": 50,
")": 50,
"[": 50,
"]": 50,
"{": 50,
"}": 50,
"⟨": 50,
"⟩": 50,
"|": 50,
# Punctuation
".": 25,
",": 25,
":": 25,
";": 25,
"?": 50,
"!": 50,
"@": 75,
"#": 75,
# Math symbols
"%": 125,
"^": 175,
"√": 200,
"∞": 400,
"∑": 300,
"∏": 280,
"∫": 350,
"∂": 250,
"∇": 270,
"∆": 260,
# Logic
"∧": 150,
"∨": 150,
"¬": 100,
"→": 180,
"↔": 190,
"∀": 200,
"∃": 200,
# Set theory
"∈": 160,
"∉": 165,
"⊂": 170,
"⊃": 170,
"∪": 180,
"∩": 180,
"∅": 50,
# Currency
"$": 200,
"€": 200,
"₪": 200,
"£": 200,
"¥": 200,
"₹": 200,
"₽": 200,
# Programming
"&": 100,
"|": 100,
"~": 80,
"`": 30,
"_": 40,
"\\": 60,
}
# Programming/Code symbols
CODE_SYMBOLS = {
"==": 200,
"!=": 200,
"<=": 200,
">=": 200,
"&&": 150,
"||": 150,
"++": 120,
"--": 120,
"->": 180,
"=>": 180,
"::": 160,
"...": 100,
}
# Language identifiers for embedding separation
# Each language gets a unique ID that affects embedding dimensions
LANGUAGE_IDS = {
"hebrew": 0, # Primary - גימטריה מקורית
"latin": 1, # English/Latin
"cyrillic": 2, # Russian etc.
"arabic": 3, # Arabic
"greek": 4, # Greek
"devanagari": 5, # Hindi
"cjk": 6, # Chinese
"hiragana": 7, # Japanese
"katakana": 8, # Japanese
"hangul": 9, # Korean
"thai": 10, # Thai
"math": 11, # Mathematical symbols
"unknown": 12, # Unknown scripts
}
# Character to language mapping (for embedding separation)
CHAR_TO_LANGUAGE = {}
for c in GEMATRIA:
CHAR_TO_LANGUAGE[c] = "hebrew"
for c in LATIN_VALUES:
CHAR_TO_LANGUAGE[c] = "latin"
for c in CYRILLIC_VALUES:
CHAR_TO_LANGUAGE[c] = "cyrillic"
for c in ARABIC_VALUES:
CHAR_TO_LANGUAGE[c] = "arabic"
for c in GREEK_VALUES:
CHAR_TO_LANGUAGE[c] = "greek"
for c in DEVANAGARI_VALUES:
CHAR_TO_LANGUAGE[c] = "devanagari"
for c in CJK_COMMON:
CHAR_TO_LANGUAGE[c] = "cjk"
for c in HIRAGANA_VALUES:
CHAR_TO_LANGUAGE[c] = "hiragana"
for c in KATAKANA_VALUES:
CHAR_TO_LANGUAGE[c] = "katakana"
for c in HANGUL_JAMO:
CHAR_TO_LANGUAGE[c] = "hangul"
for c in THAI_VALUES:
CHAR_TO_LANGUAGE[c] = "thai"
for c in MATH_SYMBOLS:
CHAR_TO_LANGUAGE[c] = "math"
# Combined character values (ALL languages + Math + Code)
# Hebrew gematria values are PRESERVED as-is (1-400)
# Other languages use OFFSET ranges to avoid collision
CHAR_VALUES = {
**GEMATRIA, # Hebrew (priority - original 1-400)
**LATIN_VALUES, # English/Latin
**CYRILLIC_VALUES, # Russian etc.
**ARABIC_VALUES, # Arabic
**GREEK_VALUES, # Greek
**DEVANAGARI_VALUES, # Hindi
**CJK_COMMON, # Chinese common
**HIRAGANA_VALUES, # Japanese
**KATAKANA_VALUES, # Japanese
**HANGUL_JAMO, # Korean
**THAI_VALUES, # Thai
**MATH_SYMBOLS, # Math
}
LETTER_COORDS = {
"א": (0.05, 0.85),
"ה": (0.10, 0.95),
"ח": (0.15, 0.45),
"ע": (0.08, 0.55),
"ג": (0.30, 0.25),
"י": (0.35, 0.75),
"כ": (0.38, 0.50),
"ך": (0.38, 0.20),
"ק": (0.32, 0.40),
"ד": (0.50, 0.45),
"ט": (0.52, 0.20),
"ת": (0.55, 0.70),
"ז": (0.58, 0.25),
"ס": (0.54, 0.35),
"צ": (0.56, 0.30),
"ץ": (0.56, 0.10),
"ש": (0.53, 0.65),
"ל": (0.62, 0.80),
"נ": (0.65, 0.55),
"ן": (0.65, 0.25),
"ר": (0.68, 0.70),
"ב": (0.85, 0.60),
"ו": (0.88, 0.90),
"מ": (0.90, 0.75),
"ם": (0.90, 0.35),
"פ": (0.92, 0.40),
"ף": (0.92, 0.15),
}
HEBREW_ROOTS = {
"שפט": "judge",
"דין": "law",
"חוק": "statute",
"עבר": "violate",
"טען": "claim",
"ערר": "appeal",
"פסק": "rule",
"תבע": "sue",
"זכה": "acquit",
"חייב": "convict",
"ענש": "punish",
"קנס": "fine",
"חתם": "sign",
"הסכם": "agree",
"בטל": "cancel",
"תקף": "valid",
"כתב": "write",
"קרא": "read",
"אמר": "say",
"דבר": "speak",
"הלך": "walk",
"בוא": "come",
"ראה": "see",
"שמע": "hear",
"ידע": "know",
"חשב": "think",
"רצה": "want",
"יכל": "can",
"נתן": "give",
"לקח": "take",
"עשה": "make",
"היה": "be",
}
DOMAIN_WEIGHTS = {
"hebrew_legal": {"gematria": 0.50, "trajectory": 0.30, "morphology": 0.20},
"hebrew_medical": {"gematria": 0.40, "trajectory": 0.30, "morphology": 0.30},
"hebrew_general": {"gematria": 0.40, "trajectory": 0.40, "morphology": 0.20},
"default": {"gematria": 0.45, "trajectory": 0.35, "morphology": 0.20},
}
HEAPS_K = 10.0
HEAPS_BETA = 0.5
# ═══════════════════════════════════════════════════════════════════════════════
# DATA CLASSES
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class GematriaFeatures:
histogram: np.ndarray
bigram_stats: np.ndarray
word_values: List[int]
total: int
mean: float
std: float
@dataclass
class TrajectoryFeatures:
coords: np.ndarray
path_length: float
curvature: float
torsion: float
centroid: np.ndarray
@property
def arc_length(self) -> float:
"""Alias for path_length to maintain consistency with multimodal encoders."""
return self.path_length
@dataclass
class MorphologyFeatures:
roots: List[Optional[str]]
unique_roots: List[str]
coverage: float
prefix_count: int
suffix_count: int
@dataclass
class TAUMetrics:
tau: float
complexity: float
activity: float
coherence: float
@dataclass
class VocabularyFeatures:
indices: List[int]
coverage: float
oov_count: int
@dataclass
class EncodingResult:
embedding: np.ndarray
gematria: GematriaFeatures
trajectory: TrajectoryFeatures
morphology: MorphologyFeatures
vocabulary: VocabularyFeatures
tau_metrics: TAUMetrics
metadata: Dict[str, Any]
# ═══════════════════════════════════════════════════════════════════════════════
# VOCABULARY LAYER
# ═══════════════════════════════════════════════════════════════════════════════
class VocabularyLayer:
"""Word-to-index mapping with OOV handling"""
SPECIAL_TOKENS = ["<PAD>", "<UNK>", "<START>", "<END>", "<NUM>", "<PUNC>"]
def __init__(self, vocab_file: Optional[str] = None):
self.word2idx: Dict[str, int] = {}
self.idx2word: Dict[int, str] = {}
self.frequencies: Counter = Counter()
# Add special tokens
for i, token in enumerate(self.SPECIAL_TOKENS):
self.word2idx[token] = i
self.idx2word[i] = token
self.next_idx = len(self.SPECIAL_TOKENS)
if vocab_file:
self.load(vocab_file)
def add_word(self, word: str) -> int:
word = word.lower().strip()
if not word:
return self.word2idx["<PAD>"]
self.frequencies[word] += 1
if word not in self.word2idx:
self.word2idx[word] = self.next_idx
self.idx2word[self.next_idx] = word
self.next_idx += 1
return self.word2idx[word]
def get_index(self, word: str) -> int:
word = word.lower().strip()
if not word:
return self.word2idx["<PAD>"]
if word.isdigit():
return self.word2idx["<NUM>"]
if not word.isalnum():
return self.word2idx["<PUNC>"]
return self.word2idx.get(word, self.word2idx["<UNK>"])
def encode(self, text: str) -> Tuple[List[int], float]:
words = WORD_OR_PUNCT_PATTERN.findall(text) # Use pre-compiled pattern
indices = []
oov_count = 0
unk_idx = self.word2idx["<UNK>"] # Cache lookup
for word in words:
idx = self.get_index(word)
indices.append(idx)
if idx == unk_idx:
oov_count += 1
coverage = 1 - (oov_count / len(words)) if words else 1.0
return indices, coverage
def size(self) -> int:
return len(self.word2idx)
def save(self, path: str):
import json
data = {
"word2idx": self.word2idx,
"frequencies": dict(self.frequencies.most_common(50000)),
}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
def load(self, path: str):
import json
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
self.word2idx = data["word2idx"]
self.idx2word = {int(v): k for k, v in self.word2idx.items()}
self.frequencies = Counter(data.get("frequencies", {}))
self.next_idx = max(self.word2idx.values()) + 1
# ═══════════════════════════════════════════════════════════════════════════════
# MORPHOLOGY LAYER
# ═══════════════════════════════════════════════════════════════════════════════
class MorphologyLayer:
"""Hebrew morphological analysis"""
PREFIXES = ["ב", "כ", "ל", "מ", "ש", "ה", "ו", "וב", "וכ", "ול", "ומ", "וש", "וה"]
SUFFIXES = [
"ים",
"ות",
"ה",
"י",
"ך",
"ו",
"נו",
"כם",
"כן",
"הם",
"הן",
"תי",
"תם",
"תן",
]
def __init__(self):
self.roots = HEBREW_ROOTS
def extract_root(self, word: str) -> Tuple[Optional[str], int, int]:
if not word or len(word) < 2:
return None, 0, 0
# Check known roots
for root in self.roots:
if root in word:
return root, 0, 0
prefix_count = 0
suffix_count = 0
cleaned = word
# Remove prefixes
for prefix in sorted(self.PREFIXES, key=len, reverse=True):
if cleaned.startswith(prefix) and len(cleaned) > len(prefix) + 2:
cleaned = cleaned[len(prefix) :]
prefix_count += 1
break
# Remove suffixes
for suffix in sorted(self.SUFFIXES, key=len, reverse=True):
if cleaned.endswith(suffix) and len(cleaned) > len(suffix) + 2:
cleaned = cleaned[: -len(suffix)]
suffix_count += 1
break
if len(cleaned) >= 3:
return cleaned[:3], prefix_count, suffix_count
return None, prefix_count, suffix_count
def analyze(self, words: List[str]) -> MorphologyFeatures:
roots = []
total_prefixes = 0
total_suffixes = 0
for word in words:
root, pref, suff = self.extract_root(word)
roots.append(root)
total_prefixes += pref
total_suffixes += suff
found = [r for r in roots if r]
unique = list(set(found))
coverage = len(found) / len(words) * 100 if words else 0
return MorphologyFeatures(
roots=roots,
unique_roots=unique,
coverage=coverage,
prefix_count=total_prefixes,
suffix_count=total_suffixes,
)
# ═══════════════════════════════════════════════════════════════════════════════
# ENHANCED HEBREW ENCODER
# ═══════════════════════════════════════════════════════════════════════════════
class EnhancedHebrewEncoder:
"""
Complete Hebrew encoder with all features.
Encoding pipeline:
1. Preprocessing (clean, tokenize)
2. Vocabulary encoding (word → index)
3. Gematria encoding (numerical + bigrams)
4. Trajectory encoding (3D spherical)
5. Morphology encoding (root extraction)
6. τ metrics calculation
7. Combined embedding
"""
def __init__(
self,
embedding_dim: int = 256,
trajectory_points: int = 100,
vocab_file: Optional[str] = None,
):
self.embedding_dim = embedding_dim
self.trajectory_points = trajectory_points
# Initialize layers
self.vocab_layer = VocabularyLayer(vocab_file)
self.morph_layer = MorphologyLayer()
# Projection matrices
np.random.seed(42)
self.W_gematria = np.random.randn(78, embedding_dim).astype(np.float32) * 0.1
self.W_trajectory = np.random.randn(306, embedding_dim).astype(np.float32) * 0.1
self.W_morphology = np.random.randn(110, embedding_dim).astype(np.float32) * 0.1
def _clean_text(self, text: str) -> str:
text = NIQQUD_PATTERN.sub("", text) # Use pre-compiled pattern
return " ".join(text.split())
def _extract_words(self, text: str) -> List[str]:
"""Extract words from text - supports Hebrew, English, and numbers."""
return MULTILINGUAL_WORD_PATTERN.findall(text)
@staticmethod
@lru_cache(maxsize=50000)
def _word_gematria_cached(word: str) -> int:
"""
Cached character value calculation with LANGUAGE SEPARATION.
Hebrew gematria is PRESERVED (1-400).
Other languages get OFFSET values to avoid collision:
- Hebrew: 1-400 (original gematria)
- Latin: 1000 + value (1010-1400)
- Cyrillic: 2000 + value (2001-2400)
- Arabic: 3000 + value (3001-3400)
- Greek: 4000 + value (4001-4400)
- etc.
This ensures Hebrew gematria uniqueness is preserved.
"""
total = 0
lang_offset = 0
detected_lang = None
for c in word:
# Detect language from first recognized character
if detected_lang is None and c in CHAR_TO_LANGUAGE:
detected_lang = CHAR_TO_LANGUAGE[c]
lang_id = LANGUAGE_IDS.get(detected_lang, 12)
# Hebrew (id=0) has NO offset - preserves original gematria
# Other languages get offset = lang_id * 1000
lang_offset = lang_id * 1000 if lang_id > 0 else 0
if c in CHAR_VALUES:
total += CHAR_VALUES[c]
elif c.isdigit():
# Numbers: use digit value * position weight
total += int(c) * 10
else:
# Unknown character: use ord value scaled
total += ord(c) % 100
# Apply language offset (Hebrew stays pure, others get offset)
total += lang_offset
return total if total > 0 else 1 # Ensure non-zero
@staticmethod
def detect_language(text: str) -> str:
"""Detect primary language of text based on character distribution."""
lang_counts = {}
for c in text:
if c in CHAR_TO_LANGUAGE:
lang = CHAR_TO_LANGUAGE[c]
lang_counts[lang] = lang_counts.get(lang, 0) + 1
if not lang_counts:
return "unknown"
return max(lang_counts, key=lang_counts.get)
def _word_gematria(self, word: str) -> int:
return self._word_gematria_cached(word)
def calculate_gematria(self, text: str) -> int:
"""Calculate gematria value for Hebrew text (public API)."""
words = self._extract_words(text)
if not words:
return 0
return sum(self._word_gematria(w) for w in words)
def _entropy(self, items: List) -> float:
if not items:
return 0.0
counts = Counter(items)
total = len(items)
probs = np.array(list(counts.values())) / total
return float(-np.sum(probs * np.log2(probs + 1e-10)))
def _encode_gematria(self, words: List[str]) -> Tuple[np.ndarray, GematriaFeatures]:
if not words:
return np.zeros(self.embedding_dim, dtype=np.float32), GematriaFeatures(
histogram=np.zeros(50),
bigram_stats=np.zeros(20),
word_values=[],
total=0,
mean=0.0,
std=0.0,
)
values = [self._word_gematria(w) for w in words]
# Log-scale histogram
log_vals = np.log1p(values)
hist, _ = np.histogram(log_vals, bins=50, range=(0, np.log1p(1000)))
hist = hist.astype(np.float32)
if hist.sum() > 0:
hist /= hist.sum()
# Bigram stats
bigram_vals = (
[
self._word_gematria(words[i] + words[i + 1])
for i in range(len(words) - 1)
]
if len(words) > 1
else []
)
if bigram_vals:
bigram_stats = (
np.array(
[
np.mean(bigram_vals),
np.std(bigram_vals),
np.median(bigram_vals),
np.min(bigram_vals),
np.max(bigram_vals),
np.percentile(bigram_vals, 25),
np.percentile(bigram_vals, 75),
len(bigram_vals),
],
dtype=np.float32,
)
/ 1000
)
bigram_stats = np.pad(bigram_stats, (0, 20 - len(bigram_stats)))
else:
bigram_stats = np.zeros(20, dtype=np.float32)
# Basic stats
basic = np.array(
[
np.mean(values) / 1000,
np.std(values) / 1000,
np.median(values) / 1000,
np.min(values) / 1000,
np.max(values) / 1000,
len(words) / 100,
sum(values) / 10000,
len(set(values)) / max(len(values), 1),
],
dtype=np.float32,
)
# Combine and project
features = np.concatenate([hist, bigram_stats, basic])
embedding = features @ self.W_gematria
norm = np.linalg.norm(embedding)
if norm > 0:
embedding /= norm
return embedding.astype(np.float32), GematriaFeatures(
histogram=hist,
bigram_stats=bigram_stats,
word_values=values,
total=sum(values),
mean=float(np.mean(values)),
std=float(np.std(values)),
)
def _encode_trajectory(
self, words: List[str]
) -> Tuple[np.ndarray, TrajectoryFeatures]:
if not words:
return np.zeros(self.embedding_dim, dtype=np.float32), TrajectoryFeatures(
coords=np.zeros((self.trajectory_points, 3)),
path_length=0.0,
curvature=0.0,
torsion=0.0,
centroid=np.zeros(3),
)
seq = [self._word_gematria(w) for w in words]
# Resample
if len(seq) < self.trajectory_points:
idx = np.linspace(0, len(seq) - 1, self.trajectory_points)
resampled = np.interp(idx, range(len(seq)), seq)
else:
idx = np.linspace(0, len(seq) - 1, self.trajectory_points).astype(int)
resampled = np.array([seq[i] for i in idx])
normalized = resampled / (np.max(resampled) + 1e-8)
# Spherical coordinates
theta = np.cumsum(normalized) * 2 * np.pi / self.trajectory_points
phi = normalized * np.pi * 0.8 + 0.1 * np.pi
x = np.sin(phi) * np.cos(theta)
y = np.sin(phi) * np.sin(theta)
z = np.cos(phi)
coords = np.stack([x, y, z], axis=1).astype(np.float32)
# Geometric features
diffs = np.diff(coords, axis=0)
path_length = float(np.sum(np.linalg.norm(diffs, axis=1)))
d1 = np.gradient(coords, axis=0)
d2 = np.gradient(d1, axis=0)
d1_norm = np.linalg.norm(d1, axis=1) + 1e-10
d2_norm = np.linalg.norm(d2, axis=1)
curvature = float(np.mean(d2_norm / (d1_norm**2)))
d3 = np.gradient(d2, axis=0)
cross = np.cross(d1, d2)
cross_norm_sq = np.sum(cross**2, axis=1) + 1e-10
torsion_vals = np.sum(cross * d3, axis=1) / cross_norm_sq
torsion = float(np.mean(np.abs(torsion_vals)))
centroid = np.mean(coords, axis=0)
# Project
geom_features = np.array(
[
path_length / self.trajectory_points,
curvature,
torsion,
np.std(np.linalg.norm(coords - centroid, axis=1)),
np.std(np.linalg.norm(diffs, axis=1)),
np.std(d2_norm / (d1_norm**2)),
],
dtype=np.float32,
)
features = np.concatenate([coords.flatten(), geom_features])
embedding = features @ self.W_trajectory
norm = np.linalg.norm(embedding)
if norm > 0:
embedding /= norm
return embedding.astype(np.float32), TrajectoryFeatures(
coords=coords,
path_length=path_length,
curvature=curvature,
torsion=torsion,
centroid=centroid,
)
def _encode_morphology(
self, words: List[str]
) -> Tuple[np.ndarray, MorphologyFeatures]:
morph = self.morph_layer.analyze(words)
if not words:
return np.zeros(self.embedding_dim, dtype=np.float32), morph
# Root histogram
root_indices = [hash(r) % 10000 for r in morph.roots if r]
if root_indices:
hist, _ = np.histogram(root_indices, bins=100, range=(0, 10000))
hist = hist.astype(np.float32)
if hist.sum() > 0:
hist /= hist.sum()
else:
hist = np.zeros(100, dtype=np.float32)
# Stats
stats = np.array(
[
morph.coverage / 100,
len(morph.unique_roots) / max(len(words), 1),
morph.prefix_count / max(len(words), 1),
morph.suffix_count / max(len(words), 1),
np.mean([len(w) for w in words]) / 10,
np.std([len(w) for w in words]) / 5,
(
np.mean([len(r) for r in morph.unique_roots]) / 5
if morph.unique_roots
else 0
),
(
np.std([len(r) for r in morph.unique_roots]) / 3
if len(morph.unique_roots) > 1
else 0
),
sum(1 for r in morph.unique_roots if r in HEBREW_ROOTS)
/ max(len(morph.unique_roots), 1),
len(words) / 100,
],
dtype=np.float32,
)
features = np.concatenate([hist, stats])
embedding = features @ self.W_morphology
norm = np.linalg.norm(embedding)
if norm > 0:
embedding /= norm
return embedding.astype(np.float32), morph
# ═══════════════════════════════════════════════════════════════════════════
# OPTIMIZED ENCODING METHODS (avoid duplicate gematria calculation)
# ═══════════════════════════════════════════════════════════════════════════
def _encode_gematria_optimized(
self, words: List[str], word_values: List[int]
) -> Tuple[np.ndarray, GematriaFeatures]:
"""Optimized gematria encoding with pre-computed values."""
if not words:
return np.zeros(self.embedding_dim, dtype=np.float32), GematriaFeatures(
histogram=np.zeros(50),
bigram_stats=np.zeros(20),
word_values=[],
total=0,
mean=0.0,
std=0.0,
)
values = word_values # Use pre-computed values
# Log-scale histogram (vectorized)
values_arr = np.array(values, dtype=np.float32)
log_vals = np.log1p(values_arr)
hist, _ = np.histogram(log_vals, bins=50, range=(0, np.log1p(1000)))
hist = hist.astype(np.float32)
hist_sum = hist.sum()
if hist_sum > 0:
hist /= hist_sum
# Bigram stats - use pre-computed values for adjacent word sums
if len(values) > 1:
bigram_vals = np.array(
[values[i] + values[i + 1] for i in range(len(values) - 1)],
dtype=np.float32,
)
bigram_stats = (
np.array(
[
np.mean(bigram_vals),
np.std(bigram_vals),
np.median(bigram_vals),
np.min(bigram_vals),
np.max(bigram_vals),
np.percentile(bigram_vals, 25),
np.percentile(bigram_vals, 75),
len(bigram_vals),
],
dtype=np.float32,
)
/ 1000
)
bigram_stats = np.pad(bigram_stats, (0, 20 - len(bigram_stats)))
else:
bigram_stats = np.zeros(20, dtype=np.float32)
# Basic stats (vectorized)
basic = np.array(
[
values_arr.mean() / 1000,
values_arr.std() / 1000,
np.median(values_arr) / 1000,
values_arr.min() / 1000,
values_arr.max() / 1000,
len(words) / 100,
values_arr.sum() / 10000,
len(np.unique(values_arr)) / len(values_arr),
],
dtype=np.float32,
)
# Combine and project
features = np.concatenate([hist, bigram_stats, basic])
embedding = features @ self.W_gematria
norm = np.linalg.norm(embedding)
if norm > 0:
embedding /= norm
return embedding.astype(np.float32), GematriaFeatures(
histogram=hist,
bigram_stats=bigram_stats,
word_values=values,
total=int(values_arr.sum()),
mean=float(values_arr.mean()),
std=float(values_arr.std()),
)
def _encode_trajectory_optimized(
self, words: List[str], word_values: List[int]
) -> Tuple[np.ndarray, TrajectoryFeatures]:
"""Optimized trajectory encoding with pre-computed gematria values."""
if not words:
return np.zeros(self.embedding_dim, dtype=np.float32), TrajectoryFeatures(
coords=np.zeros((self.trajectory_points, 3)),
path_length=0.0,
curvature=0.0,
torsion=0.0,
centroid=np.zeros(3),
)
seq = np.array(word_values, dtype=np.float32) # Use pre-computed values
# Resample (vectorized)
n_points = len(seq)
if n_points < self.trajectory_points:
idx = np.linspace(0, n_points - 1, self.trajectory_points)
resampled = np.interp(idx, np.arange(n_points), seq)
else:
idx = np.linspace(0, n_points - 1, self.trajectory_points).astype(int)
resampled = seq[idx]
max_val = resampled.max()
normalized = resampled / (max_val + 1e-8)
# Spherical coordinates (fully vectorized)
theta = np.cumsum(normalized) * 2 * np.pi / self.trajectory_points
phi = normalized * np.pi * 0.8 + 0.1 * np.pi
sin_phi = np.sin(phi)
cos_phi = np.cos(phi)
cos_theta = np.cos(theta)
sin_theta = np.sin(theta)
x = sin_phi * cos_theta
y = sin_phi * sin_theta
z = cos_phi
coords = np.stack([x, y, z], axis=1).astype(np.float32)
# Geometric features (optimized - compute all derivatives at once)
diffs = np.diff(coords, axis=0)
diff_norms = np.linalg.norm(diffs, axis=1)
path_length = float(diff_norms.sum())
# Compute derivatives in sequence (d1 -> d2 -> d3)
d1 = np.gradient(coords, axis=0)
d2 = np.gradient(d1, axis=0)
d3 = np.gradient(d2, axis=0)
d1_norm = np.linalg.norm(d1, axis=1) + 1e-10
d2_norm = np.linalg.norm(d2, axis=1)
curvature = float(np.mean(d2_norm / (d1_norm**2)))
cross = np.cross(d1, d2)
cross_norm_sq = np.sum(cross**2, axis=1) + 1e-10
torsion_vals = np.abs(np.sum(cross * d3, axis=1) / cross_norm_sq)
torsion = float(torsion_vals.mean())
centroid = coords.mean(axis=0)
# Project
geom_features = np.array(
[
path_length / self.trajectory_points,
curvature,
torsion,
np.std(np.linalg.norm(coords - centroid, axis=1)),
np.std(diff_norms),
np.std(d2_norm / (d1_norm**2)),
],
dtype=np.float32,
)
features = np.concatenate([coords.flatten(), geom_features])
embedding = features @ self.W_trajectory
norm = np.linalg.norm(embedding)
if norm > 0:
embedding /= norm
return embedding.astype(np.float32), TrajectoryFeatures(
coords=coords,
path_length=path_length,
curvature=curvature,
torsion=torsion,
centroid=centroid,
)
def _calculate_tau(self, words: List[str]) -> TAUMetrics:
if not words:
return TAUMetrics(tau=0.0, complexity=0.0, activity=0.0, coherence=0.0)
n = len(words)
unique = len(set(words))
# Complexity (Heaps' Law)
expected = HEAPS_K * (n**HEAPS_BETA)
complexity = min((unique / max(expected, 1)) * 100, 200)
# Activity (entropy)
chars = "".join(words)
char_entropy = self._entropy(list(chars))
avg_len = np.mean([len(w) for w in words])
activity = char_entropy * np.log2(avg_len + 1)
# Coherence (sliding window)
window = 5
if n >= window:
ratios = [
len(set(words[i : i + window])) / window for i in range(n - window + 1)
]
coherence = 1 - min(np.std(ratios) * 2, 1)
else:
coherence = 0.5
tau = complexity * activity * (1 - coherence) / 100
return TAUMetrics(
tau=round(tau, 4),
complexity=round(complexity, 4),
activity=round(activity, 4),
coherence=round(coherence, 4),
)
def _encode_language_signature(self, text: str) -> np.ndarray:
"""
Create language signature embedding.
Uses SIGNED values to separate languages in embedding space:
- Hebrew: POSITIVE values (preserves original gematria semantics)
- Other languages: NEGATIVE values (clear separation)
This ensures Hebrew gematria uniqueness is preserved in the embedding.
"""
# Count characters per language
lang_counts = {}
total_chars = 0
for c in text:
if c in CHAR_TO_LANGUAGE:
lang = CHAR_TO_LANGUAGE[c]
lang_counts[lang] = lang_counts.get(lang, 0) + 1
total_chars += 1
if total_chars == 0:
return np.zeros(len(LANGUAGE_IDS), dtype=np.float32)
# Create language distribution vector with SIGNED values
lang_vec = np.zeros(len(LANGUAGE_IDS), dtype=np.float32)
for lang, count in lang_counts.items():
lang_id = LANGUAGE_IDS.get(lang, LANGUAGE_IDS["unknown"])
proportion = count / total_chars
# Hebrew (id=0) gets POSITIVE values
# All other languages get NEGATIVE values
if lang_id == 0: # Hebrew
lang_vec[lang_id] = proportion # Positive
else:
lang_vec[lang_id] = -proportion # Negative
return lang_vec
def encode(self, text: str, domain: str = "default") -> EncodingResult:
"""
Encode multilingual text with language-aware embedding.
Args:
text: Text to encode (Hebrew, English, or any supported language)
domain: Domain for weight adjustment
Returns:
EncodingResult with all features including language signature
"""
text = self._clean_text(text)
words = self._extract_words(text)
weights = DOMAIN_WEIGHTS.get(domain, DOMAIN_WEIGHTS["default"])
# OPTIMIZATION: Calculate gematria values once, reuse everywhere
word_values = [self._word_gematria(w) for w in words]
# Encode all channels with pre-computed gematria values
gematria_emb, gematria_feat = self._encode_gematria_optimized(
words, word_values
)
traj_emb, traj_feat = self._encode_trajectory_optimized(words, word_values)
morph_emb, morph_feat = self._encode_morphology(words)
# Language signature (SIGNED: Hebrew=positive, others=negative)
lang_signature = self._encode_language_signature(text)
# Vocabulary
indices, coverage = self.vocab_layer.encode(text)
unk_idx = self.vocab_layer.word2idx["<UNK>"]
oov_count = sum(1 for idx in indices if idx == unk_idx) # Faster than .count()
vocab_feat = VocabularyFeatures(
indices=indices, coverage=coverage, oov_count=oov_count
)
# τ metrics
tau_metrics = self._calculate_tau(words)
# Combined embedding (before language signature)
base_embedding = (
weights["gematria"] * gematria_emb
+ weights["trajectory"] * traj_emb
+ weights["morphology"] * morph_emb
)
# Inject language signature into last dimensions
# This creates SEPARATION in embedding space:
# - Hebrew texts cluster together (positive values)
# - Non-Hebrew texts cluster separately (negative values)
num_lang_dims = len(LANGUAGE_IDS)
embedding = base_embedding.copy()
embedding[-num_lang_dims:] += lang_signature * 0.3 # 30% influence
norm = np.linalg.norm(embedding)
if norm > 0:
embedding /= norm
metadata = {
"text_length": len(text),
"word_count": len(words),
"domain": domain,
"weights": weights,
"hash": hashlib.md5(text.encode()).hexdigest(),
}
return EncodingResult(
embedding=embedding,
gematria=gematria_feat,
trajectory=traj_feat,
morphology=morph_feat,
vocabulary=vocab_feat,
tau_metrics=tau_metrics,
metadata=metadata,
)
def encode_batch(
self, texts: List[str], domain: str = "default"
) -> List[EncodingResult]:
return [self.encode(text, domain) for text in texts]
def similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
norm1, norm2 = np.linalg.norm(vec1), np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(np.dot(vec1, vec2) / (norm1 * norm2))
def find_similar(
self, query: str, corpus: List[str], domain: str = "default", top_k: int = 10
) -> List[Tuple[int, float, str]]:
query_emb = self.encode(query, domain).embedding
results = []
for i, text in enumerate(corpus):
emb = self.encode(text, domain).embedding
sim = self.similarity(query_emb, emb)
preview = text[:100] + "..." if len(text) > 100 else text
results.append((i, sim, preview))
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
# Alias for backward compatibility
HebrewEncoder = EnhancedHebrewEncoder
# ═══════════════════════════════════════════════════════════════════════════════
# DEMO
# ═══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("=" * 70)
print("TAU Platform v4.0 - Enhanced Hebrew Encoder Demo")
print("=" * 70)
print()
encoder = EnhancedHebrewEncoder()
text = "בית המשפט העליון פסק כי הערעור יתקבל. השופטים קבעו פה אחד כי יש לבטל את ההחלטה."
print(f"Text: {text}")
print()
result = encoder.encode(text, domain="hebrew_legal")
print(f"Embedding shape: {result.embedding.shape}")
print(f"Embedding norm: {np.linalg.norm(result.embedding):.4f}")
print()
print("τ Metrics:")
print(f" τ: {result.tau_metrics.tau}")
print(f" Complexity: {result.tau_metrics.complexity}")
print(f" Activity: {result.tau_metrics.activity}")
print(f" Coherence: {result.tau_metrics.coherence}")
print()
print("Gematria:")
print(f" Total: {result.gematria.total}")
print(f" Mean: {result.gematria.mean:.2f}")
print()
print("Trajectory:")
print(f" Path length: {result.trajectory.path_length:.4f}")
print(f" Curvature: {result.trajectory.curvature:.4f}")
print()
print("Morphology:")
print(f" Coverage: {result.morphology.coverage:.1f}%")
print(f" Unique roots: {result.morphology.unique_roots[:5]}")
print()
print("Vocabulary:")
print(f" Coverage: {result.vocabulary.coverage:.1%}")
print(f" OOV count: {result.vocabulary.oov_count}")
print()
print("=" * 70)
print("✅ Demo Complete!")
print("=" * 70)