#!/usr/bin/env python3 """ ASCII-limited English-first vocab bundle builder for a tiny LLM. Design goals ------------ - English-only as much as reasonably possible - Keep text intact instead of creating holes - Fold uppercase -> lowercase - Fold accented Latin letters -> plain ASCII where reasonable - Drop emoji and non-Latin scripts - Keep only a small practical punctuation set - Learn multi-character tokens from LETTERS ONLY - Keep digits and punctuation atomic as single-character tokens - Stream from Hugging Face without local dataset files Default source -------------- Streams: HuggingFaceFW/fineweb-edu config=sample-10BT split=train Outputs ------- Creates a bundle directory containing: manifest.json vocab.json token_stats.npz pair_stats.npz What gets kept -------------- - letters: a-z - digits: 0-9 - whitespace: space + newline - limited punctuation: . , ! ? ' " - ( ) : ; @ # + % = / \ * Tokenization policy ------------------- - learned multi-character tokens: letters only - digits remain atomic single-character tokens - punctuation remains atomic single-character tokens Examples -------- PowerShell smoke test: python F:\\TokenizerUltra\\build_ascii_vocab_bundle_v9.py --output "F:\\TokenizerUltra\\vocab_bundle_test" --max-examples 5000 --bpe-train-chars 2000000 --final-token-budget 2000000 PowerShell full build: python F:\\TokenizerUltra\\build_ascii_vocab_bundle_v9.py --output "F:\\TokenizerUltra\\vocab_bundle" --bpe-train-chars 100000000 --final-token-budget 100000000 Dependencies ------------ python -m pip install numpy datasets """ from __future__ import annotations import argparse import json import re import unicodedata from collections import Counter from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterator, List, Optional, Sequence, Tuple import numpy as np DEFAULT_DATASET = "HuggingFaceFW/fineweb-edu" DEFAULT_CONFIG = "sample-10BT" DEFAULT_SPLIT = "train" SPECIAL_TOKENS = ["", "", "", ""] ASCII_LETTERS = "abcdefghijklmnopqrstuvwxyz" ASCII_DIGITS = "0123456789" ALLOWED_PUNCT = ".,!?\'\"-():;@#+%=/\\*" SPACE_TOKEN = " " NEWLINE_TOKEN = "\n" ALLOWED_CHARS = set(ASCII_LETTERS + ASCII_DIGITS + ALLOWED_PUNCT + SPACE_TOKEN + NEWLINE_TOKEN) TEXT_FIELDS = ("text", "content", "body", "document", "raw_content", "message") ESCAPED_PUNCT = re.escape(ALLOWED_PUNCT) TOKEN_RE = re.compile(rf"\n| +|[a-z]+|[0-9]|[{ESCAPED_PUNCT}]") MULTISPACE_RE = re.compile(r"[ \t\f\v]+") MULTINEWLINE_RE = re.compile(r"\n{3,}") SEQUENCE_REPLACEMENTS = { "\u2018": "'", "\u2019": "'", "\u201c": '"', "\u201d": '"', "\u2013": "-", "\u2014": "-", "\u2015": "-", "\u2212": "-", "\u2026": "...", "\u2022": " ", "\u00b7": " ", "\u00a0": " ", "\u200b": "", "\u200c": "", "\u200d": "", "\ufeff": "", "\u00ad": "", "\t": " ", "\r": "\n", "[": "(", "]": ")", "{": "(", "}": ")", "<": "(", ">": ")", "(": "(", ")": ")", "[": "(", "]": ")", "{": "(", "}": ")", "【": "(", "】": ")", "〈": "(", "〉": ")", "《": "(", "》": ")", "「": "(", "」": ")", "『": "(", "』": ")", "〔": "(", "〕": ")", "〖": "(", "〗": ")", } LATIN_FOLD_REPLACEMENTS = { "ß": "ss", "ẞ": "ss", "æ": "ae", "ǽ": "ae", "œ": "oe", "ø": "o", "ð": "d", "þ": "th", "ł": "l", "đ": "d", "ħ": "h", "ı": "i", } @dataclass class BundleConfig: output: Path dataset: str = DEFAULT_DATASET config: str = DEFAULT_CONFIG split: str = DEFAULT_SPLIT vocab_size: int = 2000 bpe_train_chars: int = 100_000_000 final_token_budget: int = 100_000_000 max_examples: Optional[int] = None min_pair_count: int = 5 token_prior_clip: float = 3.0 pair_prior_clip: float = 3.0 word_cache_size: int = 200000 def _import_load_dataset(): try: from datasets import load_dataset except Exception as exc: raise SystemExit( "Missing dependency: datasets. Install with:\n" " python -m pip install datasets numpy" ) from exc return load_dataset def normalize_text(text: str) -> str: if not text: return "" for src, dst in SEQUENCE_REPLACEMENTS.items(): text = text.replace(src, dst) for src, dst in LATIN_FOLD_REPLACEMENTS.items(): text = text.replace(src, dst) text = text.casefold() text = unicodedata.normalize("NFKD", text) out_chars: List[str] = [] last_was_space = False for ch in text: cat = unicodedata.category(ch) if cat.startswith("M"): continue if ch in ALLOWED_CHARS: out_chars.append(ch) last_was_space = (ch == " ") continue if ch == "\n": out_chars.append("\n") last_was_space = False continue if ch.isspace(): if not last_was_space: out_chars.append(" ") last_was_space = True continue if ord(ch) < 128: if cat[:1] in {"P", "S"} or ch in "[]{}<>_|~^$&`": if not last_was_space: out_chars.append(" ") last_was_space = True continue if cat[:1] in {"L", "N", "P", "S"}: if not last_was_space: out_chars.append(" ") last_was_space = True continue normalized = "".join(out_chars) normalized = MULTISPACE_RE.sub(" ", normalized) normalized = re.sub(r" *\n *", "\n", normalized) normalized = MULTINEWLINE_RE.sub("\n\n", normalized) normalized = normalized.strip(" ") return normalized def iter_stream_examples( dataset_name: str, config_name: str, split: str, max_examples: Optional[int], ) -> Iterator[str]: load_dataset = _import_load_dataset() ds = load_dataset(dataset_name, config_name, split=split, streaming=True) seen = 0 for row in ds: text = None if isinstance(row, dict): for field in TEXT_FIELDS: if field in row and isinstance(row[field], str): text = row[field] break if text is None and "messages" in row and isinstance(row["messages"], list): chunks: List[str] = [] for msg in row["messages"]: if isinstance(msg, dict): content = msg.get("content") if isinstance(content, str): chunks.append(content) if chunks: text = "\n".join(chunks) elif isinstance(row, str): text = row if text: yield text seen += 1 if max_examples is not None and seen >= max_examples: break def iter_normalized_text(cfg: BundleConfig) -> Iterator[str]: for raw in iter_stream_examples(cfg.dataset, cfg.config, cfg.split, cfg.max_examples): text = normalize_text(raw) if text: yield text def iter_pre_tokens(text: str) -> Iterator[str]: for piece in TOKEN_RE.findall(text): yield piece def count_words_for_bpe(cfg: BundleConfig) -> Counter[str]: word_freq: Counter[str] = Counter() char_budget = 0 for text in iter_normalized_text(cfg): char_budget += len(text) for piece in iter_pre_tokens(text): if piece.isalpha(): word_freq[piece] += 1 if char_budget >= cfg.bpe_train_chars: break return word_freq def word_to_symbols(word: str) -> Tuple[str, ...]: return tuple(word) def compute_pair_counts_from_vocab( vocab_words: Dict[Tuple[str, ...], int] ) -> Counter[Tuple[str, str]]: pair_counts: Counter[Tuple[str, str]] = Counter() for symbols, freq in vocab_words.items(): if len(symbols) < 2: continue for i in range(len(symbols) - 1): left = symbols[i] right = symbols[i + 1] if left.isalpha() and right.isalpha(): pair_counts[(left, right)] += freq return pair_counts def merge_word_symbols( symbols: Tuple[str, ...], pair: Tuple[str, str], ) -> Tuple[str, ...]: merged: List[str] = [] i = 0 while i < len(symbols): if i < len(symbols) - 1 and symbols[i] == pair[0] and symbols[i + 1] == pair[1]: merged.append(symbols[i] + symbols[i + 1]) i += 2 else: merged.append(symbols[i]) i += 1 return tuple(merged) def train_bpe_from_words( word_freq: Counter[str], vocab_size: int, ) -> Tuple[List[str], List[Tuple[str, str]]]: fixed_non_alpha_count = len(SPECIAL_TOKENS) + 2 + len(ASCII_DIGITS) + len(ALLOWED_PUNCT) target_alpha_piece_count = max(vocab_size - fixed_non_alpha_count, len(ASCII_LETTERS)) vocab_words: Dict[Tuple[str, ...], int] = { word_to_symbols(word): freq for word, freq in word_freq.items() } current_symbols = set(ASCII_LETTERS) merges: List[Tuple[str, str]] = [] while len(current_symbols) < target_alpha_piece_count: pair_counts = compute_pair_counts_from_vocab(vocab_words) if not pair_counts: break best_pair, best_count = pair_counts.most_common(1)[0] if best_count < 2: break merges.append(best_pair) new_vocab_words: Dict[Tuple[str, ...], int] = {} for symbols, freq in vocab_words.items(): merged_symbols = merge_word_symbols(symbols, best_pair) new_vocab_words[merged_symbols] = new_vocab_words.get(merged_symbols, 0) + freq vocab_words = new_vocab_words current_symbols.add(best_pair[0] + best_pair[1]) if len(current_symbols) % 100 == 0: print(f"[bpe] learned alpha pieces: {len(current_symbols)}", flush=True) learned_alpha_pieces = sorted(current_symbols) final_vocab = ( SPECIAL_TOKENS + [SPACE_TOKEN, NEWLINE_TOKEN] + list(ASCII_DIGITS) + list(ALLOWED_PUNCT) + learned_alpha_pieces ) final_vocab = final_vocab[:vocab_size] return final_vocab, merges class GreedyTokenizer: def __init__( self, vocab: Sequence[str], merges: Sequence[Tuple[str, str]], word_cache_size: int = 200000, ) -> None: self.vocab = list(vocab) self.merges = list(merges) self.token_to_id = {tok: i for i, tok in enumerate(self.vocab)} self.unk_id = self.token_to_id[""] self.alpha_token_ids = { tid for tok, tid in self.token_to_id.items() if tok.isalpha() } self.merge_ranks: Dict[Tuple[str, str], int] = { pair: rank for rank, pair in enumerate(self.merges) } self.word_cache_size = max(int(word_cache_size), 0) self._word_cache: Dict[str, Tuple[int, ...]] = {} def _get_pairs(self, symbols: Tuple[str, ...]) -> set[Tuple[str, str]]: return set(zip(symbols[:-1], symbols[1:])) def _merge_once(self, symbols: Tuple[str, ...], pair: Tuple[str, str]) -> Tuple[str, ...]: first, second = pair merged: List[str] = [] i = 0 while i < len(symbols): if i < len(symbols) - 1 and symbols[i] == first and symbols[i + 1] == second: merged.append(first + second) i += 2 else: merged.append(symbols[i]) i += 1 return tuple(merged) def tokenize_alpha_run(self, span: str) -> List[int]: if not span: return [] cached = self._word_cache.get(span) if cached is not None: return list(cached) symbols: Tuple[str, ...] = tuple(span) while True: pairs = self._get_pairs(symbols) if not pairs: break ranked_pairs = [ (self.merge_ranks[pair], pair) for pair in pairs if pair in self.merge_ranks ] if not ranked_pairs: break _, best_pair = min(ranked_pairs) symbols = self._merge_once(symbols, best_pair) if len(symbols) == 1: break token_ids = tuple(self.token_to_id.get(piece, self.unk_id) for piece in symbols) if self.word_cache_size > 0: if len(self._word_cache) >= self.word_cache_size: self._word_cache.clear() self._word_cache[span] = token_ids return list(token_ids) def is_alpha_id(self, token_id: int) -> bool: return token_id in self.alpha_token_ids def encode(self, text: str) -> List[int]: ids: List[int] = [] for piece in iter_pre_tokens(text): if piece == "\n": ids.append(self.token_to_id[NEWLINE_TOKEN]) elif piece.isspace(): ids.append(self.token_to_id[SPACE_TOKEN]) elif piece.isalpha(): ids.extend(self.tokenize_alpha_run(piece)) else: ids.append(self.token_to_id.get(piece, self.unk_id)) return ids def _safe_zscore(values: np.ndarray) -> np.ndarray: values = values.astype(np.float32, copy=False) mean = float(values.mean()) std = float(values.std()) if std < 1e-8: return np.zeros_like(values, dtype=np.float32) return (values - mean) / std def build_priors_from_counts( counts: np.ndarray, clip_value: float, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: counts = counts.astype(np.float64, copy=False) total = counts.sum() if total <= 0: raise ValueError("Counts are empty; cannot build priors.") probs = (counts + 1.0) / (total + counts.size) surprisal = -np.log(probs) z = _safe_zscore(surprisal.astype(np.float32)) z = np.clip(z, -clip_value, clip_value) prior = (z + clip_value) / (2.0 * clip_value) return probs.astype(np.float32), surprisal.astype(np.float32), prior.astype(np.float32) def build_pair_priors( pair_counts: np.ndarray, min_pair_count: int, clip_value: float, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: row_sums = pair_counts.sum(axis=1, keepdims=True).astype(np.float64) vocab_size = pair_counts.shape[0] probs = (pair_counts.astype(np.float64) + 1.0) / (row_sums + vocab_size) surprisal = -np.log(probs) valid_mask = (pair_counts >= min_pair_count).astype(np.uint8) flat_surprisal = surprisal.astype(np.float32).reshape(-1) z = _safe_zscore(flat_surprisal).reshape(pair_counts.shape) z = np.clip(z, -clip_value, clip_value) prior = (z + clip_value) / (2.0 * clip_value) prior = np.where(valid_mask == 1, prior, 0.5) return ( probs.astype(np.float32), surprisal.astype(np.float32), prior.astype(np.float32), valid_mask, ) def second_pass_stats( cfg: BundleConfig, tokenizer: GreedyTokenizer, ) -> Tuple[np.ndarray, np.ndarray]: vocab_size = len(tokenizer.vocab) token_counts = np.zeros(vocab_size, dtype=np.int64) pair_counts = np.zeros((vocab_size, vocab_size), dtype=np.int32) token_budget = 0 for text in iter_normalized_text(cfg): ids = tokenizer.encode(text) if not ids: continue for tid in ids: token_counts[tid] += 1 prev = ids[0] for cur in ids[1:]: if tokenizer.is_alpha_id(prev) and tokenizer.is_alpha_id(cur): pair_counts[prev, cur] += 1 prev = cur token_budget += len(ids) if token_budget >= cfg.final_token_budget: break return token_counts, pair_counts def save_bundle( cfg: BundleConfig, vocab: Sequence[str], merges: Sequence[Tuple[str, str]], token_counts: np.ndarray, token_probs: np.ndarray, token_surprisal: np.ndarray, token_prior: np.ndarray, pair_counts: np.ndarray, pair_probs: np.ndarray, pair_surprisal: np.ndarray, pair_prior: np.ndarray, pair_valid_mask: np.ndarray, ) -> None: cfg.output.mkdir(parents=True, exist_ok=True) vocab_json = { "token_to_id": {tok: i for i, tok in enumerate(vocab)}, "id_to_token": {str(i): tok for i, tok in enumerate(vocab)}, "special_tokens": SPECIAL_TOKENS, "space_token": SPACE_TOKEN, "newline_token": NEWLINE_TOKEN, "merges": [[a, b] for a, b in merges], } (cfg.output / "vocab.json").write_text( json.dumps(vocab_json, indent=2, ensure_ascii=True), encoding="utf-8", ) manifest = { "bundle_version": 9, "description": "english-first ascii-limited vocab bundle with letter-only learned tokens, atomic digits and punctuation, latin accent folding, bracket folding, faster ranked-bpe runtime tokenization, and alpha-only pair priors", "dataset": cfg.dataset, "config": cfg.config, "split": cfg.split, "vocab_size": len(vocab), "requested_vocab_size": cfg.vocab_size, "special_tokens": SPECIAL_TOKENS, "allowed_ascii_letters": ASCII_LETTERS, "allowed_ascii_digits": ASCII_DIGITS, "allowed_ascii_punctuation": ALLOWED_PUNCT, "normalization": { "casefold_uppercase_to_lowercase": True, "latin_accent_folding": True, "bracket_like_marks_folded_to_parentheses": True, "non_latin_scripts_to_space": True, "emoji_removed": True, "unsupported_symbols_to_space": True, "collapse_spaces": True, "trim_long_newlines": True, "runtime_tokenization": "ranked_bpe_letters_only_with_word_cache", }, "token_shape_policy": { "learned_multi_character_tokens": "letters_only", "digits": "atomic_single_character", "punctuation": "atomic_single_character", "spaces": "atomic_single_character", "newlines": "atomic_single_character", }, "pair_prior_scope": { "counted_pairs": "alpha_to_alpha_only", "non_alpha_pairs": "neutral_default_prior", }, "bpe_train_chars": cfg.bpe_train_chars, "final_token_budget": cfg.final_token_budget, "min_pair_count": cfg.min_pair_count, "token_prior_clip": cfg.token_prior_clip, "pair_prior_clip": cfg.pair_prior_clip, "word_cache_size": cfg.word_cache_size, } (cfg.output / "manifest.json").write_text( json.dumps(manifest, indent=2, ensure_ascii=True), encoding="utf-8", ) np.savez_compressed( cfg.output / "token_stats.npz", count=token_counts, prob=token_probs, surprisal=token_surprisal, importance_prior=token_prior, ) np.savez_compressed( cfg.output / "pair_stats.npz", pair_count=pair_counts, pair_prob=pair_probs, pair_surprisal=pair_surprisal, pair_importance_prior=pair_prior, pair_valid_mask=pair_valid_mask, ) def build_bundle(cfg: BundleConfig) -> None: print("[1/4] Counting normalized words for letter-only BPE training...", flush=True) word_freq = count_words_for_bpe(cfg) if not word_freq: raise SystemExit("No usable normalized text found in the stream.") print(f"[1/4] Unique normalized letter-words: {len(word_freq):,}", flush=True) print("[2/4] Training letter-only BPE-style subword vocab...", flush=True) vocab, merges = train_bpe_from_words(word_freq, cfg.vocab_size) print(f"[2/4] Final vocab size: {len(vocab)}", flush=True) print("[3/4] Streaming second pass for token and pair stats...", flush=True) tokenizer = GreedyTokenizer(vocab, merges, word_cache_size=cfg.word_cache_size) token_counts, pair_counts = second_pass_stats(cfg, tokenizer) if token_counts.sum() <= 0: raise SystemExit("Second pass produced no tokens. Check dataset fields or normalization rules.") print(f"[3/4] Final token count: {int(token_counts.sum()):,}", flush=True) print("[4/4] Building priors and saving bundle...", flush=True) token_probs, token_surprisal, token_prior = build_priors_from_counts( token_counts, cfg.token_prior_clip, ) pair_probs, pair_surprisal, pair_prior, pair_valid_mask = build_pair_priors( pair_counts, cfg.min_pair_count, cfg.pair_prior_clip, ) save_bundle( cfg=cfg, vocab=vocab, merges=merges, token_counts=token_counts, token_probs=token_probs, token_surprisal=token_surprisal, token_prior=token_prior, pair_counts=pair_counts, pair_probs=pair_probs, pair_surprisal=pair_surprisal, pair_prior=pair_prior, pair_valid_mask=pair_valid_mask, ) print(f"Done. Bundle written to: {cfg.output}", flush=True) def parse_args(argv: Optional[Sequence[str]] = None) -> BundleConfig: parser = argparse.ArgumentParser(description="Build an ASCII-limited English vocab + prior bundle from a streamed dataset.") parser.add_argument("--output", required=True, help="Output directory for the bundle.") parser.add_argument("--dataset", default=DEFAULT_DATASET, help=f"Hugging Face dataset name. Default: {DEFAULT_DATASET}") parser.add_argument("--config", default=DEFAULT_CONFIG, help=f"Hugging Face dataset config. Default: {DEFAULT_CONFIG}") parser.add_argument("--split", default=DEFAULT_SPLIT, help=f"Dataset split. Default: {DEFAULT_SPLIT}") parser.add_argument("--vocab-size", type=int, default=2000, help="Final vocab size including special tokens.") parser.add_argument("--bpe-train-chars", type=int, default=100_000_000, help="Normalized character budget for vocab learning.") parser.add_argument("--final-token-budget", type=int, default=100_000_000, help="Final tokenizer token budget for priors.") parser.add_argument("--max-examples", type=int, default=None, help="Optional cap on streamed examples for testing.") parser.add_argument("--min-pair-count", type=int, default=5, help="Minimum pair count to trust a pair prior.") parser.add_argument("--token-prior-clip", type=float, default=3.0, help="Clip for token prior z-scores.") parser.add_argument("--pair-prior-clip", type=float, default=3.0, help="Clip for pair prior z-scores.") parser.add_argument("--word-cache-size", type=int, default=200000, help="Max cached normalized words for faster runtime tokenization.") args = parser.parse_args(argv) return BundleConfig( output=Path(args.output), dataset=args.dataset, config=args.config, split=args.split, vocab_size=args.vocab_size, bpe_train_chars=args.bpe_train_chars, final_token_budget=args.final_token_budget, max_examples=args.max_examples, min_pair_count=args.min_pair_count, token_prior_clip=args.token_prior_clip, pair_prior_clip=args.pair_prior_clip, word_cache_size=args.word_cache_size, ) def main(argv: Optional[Sequence[str]] = None) -> int: cfg = parse_args(argv) build_bundle(cfg) return 0 if __name__ == "__main__": raise SystemExit(main())