""" BPE (Byte Pair Encoding) Tokenizer - Built from Scratch Bahasa Indonesia Tokenizer untuk Hugging Face Author: Jekardah AI Lab """ import json import re import os from collections import Counter, defaultdict from typing import List, Dict, Tuple, Optional class BPETokenizer: """ Byte Pair Encoding Tokenizer built from scratch. Learns subword units from raw text data without requiring any dictionary. """ def __init__(self, vocab_size: int = 32000, do_lower_case: bool = True): self.vocab_size = vocab_size self.do_lower_case = do_lower_case self.vocab = {} # token -> id self.inverse_vocab = {} # id -> token self.merges = [] # list of (pair_a, pair_b) merge rules self._merge_priority = {} # (pair) -> priority index for fast lookup self.pattern = re.compile( r"""'nya|'kan|'lah|'kah|'pun| ?\w+| ?\d+| ?[^\s\w\d]+|\s+(?!\S)|\s+""" ) # Special tokens self.special_tokens = { "": 0, "": 1, "": 2, "": 3, } def _get_pairs(self, word: List[str]) -> Counter: """Get frequency of adjacent pairs in a word.""" pairs = Counter() for i in range(len(word) - 1): pairs[(word[i], word[i + 1])] += 1 return pairs def _get_corpus_pairs(self, corpus: Dict[tuple, int]) -> Counter: """Get frequency of all adjacent pairs across the entire corpus.""" pairs = Counter() for word, freq in corpus.items(): for i in range(len(word) - 1): pairs[(word[i], word[i + 1])] += freq return pairs def _merge_pair(self, pair: Tuple[str, str], corpus: Dict[tuple, int]) -> Dict[tuple, int]: """Merge all occurrences of a pair in the corpus.""" new_corpus = {} bigram = pair for word, freq in corpus.items(): new_word = [] i = 0 while i < len(word): if i < len(word) - 1 and word[i] == bigram[0] and word[i + 1] == bigram[1]: new_word.append(bigram[0] + bigram[1]) i += 2 else: new_word.append(word[i]) i += 1 new_corpus[tuple(new_word)] = freq return new_corpus def _pre_tokenize(self, text: str) -> List[str]: """Split text into initial words/chunks.""" return self.pattern.findall(text) def train(self, texts: List[str], min_frequency: int = 2, verbose: bool = True): """ Train BPE tokenizer on a list of texts. Args: texts: List of training text strings min_frequency: Minimum pair frequency to consider for merging verbose: Print progress during training """ if verbose: print("=" * 60) print("šŸš€ Training BPE Tokenizer") print(f" Target vocab size: {self.vocab_size}") print(f" Training texts: {len(texts)}") print("=" * 60) # Step 1: Pre-tokenize and build initial corpus if verbose: print("\nšŸ“ Step 1: Pre-tokenizing text...") word_freqs = Counter() for text in texts: text_input = text.lower() if self.do_lower_case else text words = self._pre_tokenize(text_input) for word in words: word_freqs[word] += 1 if verbose: print(f" Found {len(word_freqs)} unique words") # Step 2: Initialize corpus as character-level splits if verbose: print("\nšŸ”¤ Step 2: Initializing character-level tokens...") corpus = {} for word, freq in word_freqs.items(): chars = tuple(list(word)) corpus[chars] = freq # Build initial character vocabulary char_vocab = set() for word in corpus.keys(): for char in word: char_vocab.add(char) if verbose: print(f" Initial character vocab: {len(char_vocab)} characters") # Step 3: Iteratively merge most frequent pairs if verbose: print(f"\nšŸ”— Step 3: Learning merges (target: {self.vocab_size} tokens)...") num_merges = self.vocab_size - len(char_vocab) - len(self.special_tokens) self.merges = [] for i in range(num_merges): pairs = self._get_corpus_pairs(corpus) if not pairs: if verbose: print(f" No more pairs to merge at step {i}") break best_pair = pairs.most_common(1)[0] if best_pair[1] < min_frequency: if verbose: print(f" Stopping at step {i}: min frequency {min_frequency} reached") break pair = best_pair[0] self.merges.append(pair) corpus = self._merge_pair(pair, corpus) if verbose and (i + 1) % 500 == 0: merged_token = pair[0] + pair[1] print(f" Merge {i + 1}/{num_merges}: '{pair[0]}' + '{pair[1]}' → '{merged_token}' (freq: {best_pair[1]})") if verbose: print(f" Total merges learned: {len(self.merges)}") # Step 4: Build final vocabulary if verbose: print("\nšŸ“š Step 4: Building final vocabulary...") self.vocab = dict(self.special_tokens) idx = len(self.special_tokens) # Add individual characters for char in sorted(char_vocab): if char not in self.vocab: self.vocab[char] = idx idx += 1 # Add merged tokens for pair in self.merges: merged = pair[0] + pair[1] if merged not in self.vocab: self.vocab[merged] = idx idx += 1 self.inverse_vocab = {v: k for k, v in self.vocab.items()} self._merge_priority = {pair: i for i, pair in enumerate(self.merges)} if verbose: print(f" Final vocab size: {len(self.vocab)}") print("\nāœ… Training complete!") print("=" * 60) def _apply_merges(self, tokens: List[str]) -> List[str]: """Apply learned merge rules to a list of tokens using greedy-by-priority.""" while len(tokens) >= 2: # Find the adjacent pair with the highest priority (lowest index) best_pair = None best_rank = float('inf') for i in range(len(tokens) - 1): pair = (tokens[i], tokens[i + 1]) rank = self._merge_priority.get(pair, float('inf')) if rank < best_rank: best_rank = rank best_pair = pair if best_pair is None or best_rank == float('inf'): break # Merge all occurrences of best_pair new_tokens = [] i = 0 while i < len(tokens): if i < len(tokens) - 1 and tokens[i] == best_pair[0] and tokens[i + 1] == best_pair[1]: new_tokens.append(best_pair[0] + best_pair[1]) i += 2 else: new_tokens.append(tokens[i]) i += 1 tokens = new_tokens return tokens def encode(self, text: str) -> List[int]: """ Encode text to token IDs. Args: text: Input text string Returns: List of token IDs """ text_input = text.lower() if self.do_lower_case else text words = self._pre_tokenize(text_input) ids = [] for word in words: chars = list(word) tokens = self._apply_merges(chars) for token in tokens: if token in self.vocab: ids.append(self.vocab[token]) else: ids.append(self.special_tokens[""]) return ids def decode(self, ids: List[int]) -> str: """ Decode token IDs back to text. Args: ids: List of token IDs Returns: Decoded text string """ tokens = [] for token_id in ids: if token_id in self.inverse_vocab: tokens.append(self.inverse_vocab[token_id]) else: tokens.append("") return "".join(tokens) def tokenize(self, text: str) -> List[str]: """ Tokenize text into subword tokens (string form). Args: text: Input text string Returns: List of token strings """ text_input = text.lower() if self.do_lower_case else text words = self._pre_tokenize(text_input) all_tokens = [] for word in words: chars = list(word) tokens = self._apply_merges(chars) all_tokens.extend(tokens) return all_tokens def save(self, directory: str): """Save tokenizer to directory (HuggingFace compatible format).""" os.makedirs(directory, exist_ok=True) # 1. vocab.json with open(os.path.join(directory, "vocab.json"), "w", encoding="utf-8") as f: json.dump(self.vocab, f, ensure_ascii=False, indent=2) # 2. merges.txt (space-separated with U+2581 for literal spaces) with open(os.path.join(directory, "merges.txt"), "w", encoding="utf-8") as f: f.write("#version: 0.3\n") for pair in self.merges: a = pair[0].replace(' ', '\u2581') b = pair[1].replace(' ', '\u2581') f.write(f"{a} {b}\n") # 3. tokenizer_config.json config = { "tokenizer_class": "BPETokenizer", "vocab_size": len(self.vocab), "model_type": "bpe", "special_tokens": self.special_tokens, "do_lower_case": self.do_lower_case, "language": "id", } with open(os.path.join(directory, "tokenizer_config.json"), "w", encoding="utf-8") as f: json.dump(config, f, ensure_ascii=False, indent=2) # 4. special_tokens_map.json special_map = { "pad_token": "", "unk_token": "", "bos_token": "", "eos_token": "", } with open(os.path.join(directory, "special_tokens_map.json"), "w", encoding="utf-8") as f: json.dump(special_map, f, ensure_ascii=False, indent=2) # 5. tokenizer.json (HuggingFace format) hf_tokenizer = { "version": "1.0", "model": { "type": "BPE", "vocab": self.vocab, "merges": [ f"{p[0].replace(' ', chr(0x2581))} {p[1].replace(' ', chr(0x2581))}" for p in self.merges ], }, "pre_tokenizer": { "type": "Split", "pattern": {"Regex": self.pattern.pattern}, "behavior": "Isolated", }, "decoder": { "type": "Fuse", }, "added_tokens": [ {"id": v, "content": k, "special": True} for k, v in self.special_tokens.items() ], } if self.do_lower_case: hf_tokenizer["normalizer"] = {"type": "Lowercase"} with open(os.path.join(directory, "tokenizer.json"), "w", encoding="utf-8") as f: json.dump(hf_tokenizer, f, ensure_ascii=False, indent=2) print(f"šŸ’¾ Tokenizer saved to: {directory}") @classmethod def from_pretrained(cls, directory: str) -> "BPETokenizer": """Load tokenizer from directory.""" tokenizer = cls() # Load vocab with open(os.path.join(directory, "vocab.json"), "r", encoding="utf-8") as f: tokenizer.vocab = json.load(f) tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()} # Load merges (supports both old JSON+tab and new space-separated formats) tokenizer.merges = [] with open(os.path.join(directory, "merges.txt"), "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): if "\t" in line: # Old JSON+tab format (backward compat) parts = line.split("\t") if len(parts) == 2: a = json.loads(parts[0]) b = json.loads(parts[1]) tokenizer.merges.append((a, b)) else: # New space-separated format with U+2581 escape parts = line.split(" ", 1) if len(parts) == 2: a = parts[0].replace('\u2581', ' ') b = parts[1].replace('\u2581', ' ') tokenizer.merges.append((a, b)) tokenizer._merge_priority = {pair: i for i, pair in enumerate(tokenizer.merges)} # Load config with open(os.path.join(directory, "tokenizer_config.json"), "r", encoding="utf-8") as f: config = json.load(f) tokenizer.special_tokens = config.get("special_tokens", tokenizer.special_tokens) tokenizer.vocab_size = config.get("vocab_size", len(tokenizer.vocab)) tokenizer.do_lower_case = config.get("do_lower_case", True) print(f"āœ… Tokenizer loaded from: {directory}") return tokenizer if __name__ == "__main__": # Quick test tokenizer = BPETokenizer(vocab_size=1000) sample_texts = [ "Saya suka makan nasi goreng di Jakarta", "Indonesia adalah negara kepulauan terbesar di dunia", ] tokenizer.train(sample_texts, min_frequency=1) test = "saya makan nasi goreng" tokens = tokenizer.tokenize(test) ids = tokenizer.encode(test) decoded = tokenizer.decode(ids) print(f"\nInput: {test}") print(f"Tokens: {tokens}") print(f"IDs: {ids}") print(f"Decoded: {decoded}")