from __future__ import annotations import os import shutil from pathlib import Path import tiktoken from tiktoken.load import load_tiktoken_bpe from transformers import PreTrainedTokenizer BASE_VOCAB_SIZE = 65536 IT_VOCAB_SIZE = BASE_VOCAB_SIZE + 4 _PAT_STR = "|".join( [ r"""[^\r\n\p{L}\p{N}]?[\p{Lu}\p{Lt}\p{Lm}\p{Lo}\p{M}]*[\p{Ll}\p{Lm}\p{Lo}\p{M}]+(?i:'s|'t|'re|'ve|'m|'ll|'d)?""", r"""[^\r\n\p{L}\p{N}]?[\p{Lu}\p{Lt}\p{Lm}\p{Lo}\p{M}]+[\p{Ll}\p{Lm}\p{Lo}\p{M}]*(?i:'s|'t|'re|'ve|'m|'ll|'d)?""", r"""\p{N}{1,3}""", r""" ?[^\s\p{L}\p{N}]+[\r\n/]*""", r"""\s*[\r\n]+""", r"""\s+(?!\S)""", r"""\s+""", ] ) _BASE_SPECIAL_TOKENS = { "<|endoftext|>": BASE_VOCAB_SIZE - 1, } _IT_SPECIAL_TOKENS = { "<|endoftext|>": BASE_VOCAB_SIZE - 1, "<|end|>": BASE_VOCAB_SIZE, "<|user|>": BASE_VOCAB_SIZE + 1, "<|assistant|>": BASE_VOCAB_SIZE + 2, "<|system|>": BASE_VOCAB_SIZE + 3, } class TalkieTokenizer(PreTrainedTokenizer): vocab_files_names = {"vocab_file": "vocab.txt"} model_input_names = ["input_ids", "attention_mask"] def __init__( self, vocab_file: str, style: str = "base", model_max_length: int = 2048, **kwargs, ): self.vocab_file = str(vocab_file) self.style = style mergeable_ranks = load_tiktoken_bpe(self.vocab_file) mergeable_ranks = { key: value for key, value in mergeable_ranks.items() if value < BASE_VOCAB_SIZE - 1 } if style == "it": special_tokens = dict(_IT_SPECIAL_TOKENS) vocab_size = IT_VOCAB_SIZE name = "talkie-it" elif style == "base": special_tokens = dict(_BASE_SPECIAL_TOKENS) vocab_size = BASE_VOCAB_SIZE name = "talkie-base" else: raise ValueError(f"unknown Talkie tokenizer style: {style!r}") self.encoder = tiktoken.Encoding( name=name, pat_str=_PAT_STR, mergeable_ranks=mergeable_ranks, special_tokens=special_tokens, ) self._vocab_size = vocab_size self._special_token_to_id = special_tokens self._id_to_special_token = {value: key for key, value in special_tokens.items()} if style == "it": kwargs.setdefault("eos_token", "<|end|>") kwargs.setdefault( "additional_special_tokens", ["<|endoftext|>", "<|user|>", "<|assistant|>", "<|system|>"], ) else: kwargs.setdefault("eos_token", "<|endoftext|>") super().__init__(model_max_length=model_max_length, **kwargs) @property def vocab_size(self) -> int: return self._vocab_size def get_vocab(self) -> dict[str, int]: vocab = {str(index): index for index in range(self._vocab_size)} vocab.update(self._special_token_to_id) vocab.update(self.get_added_vocab()) return vocab def _tokenize(self, text: str, **kwargs) -> list[str]: return [str(token_id) for token_id in self.encoder.encode(text, allowed_special="all")] def _convert_token_to_id(self, token: str) -> int: if token in self._special_token_to_id: return self._special_token_to_id[token] try: token_id = int(token) except ValueError: return self.eos_token_id if 0 <= token_id < self._vocab_size: return token_id return self.eos_token_id def _convert_id_to_token(self, index: int) -> str: index = int(index) return self._id_to_special_token.get(index, str(index)) def convert_tokens_to_string(self, tokens: list[str]) -> str: ids = [self._convert_token_to_id(token) for token in tokens] return self.encoder.decode(ids) def _decode( self, token_ids, skip_special_tokens: bool = False, clean_up_tokenization_spaces: bool | None = None, **kwargs, ) -> str: if isinstance(token_ids, int): token_ids = [token_ids] ids = [int(token_id) for token_id in token_ids] if skip_special_tokens: specials = set(self._special_token_to_id.values()) ids = [token_id for token_id in ids if token_id not in specials] return self.encoder.decode(ids) def build_inputs_with_special_tokens( self, token_ids_0: list[int], token_ids_1: list[int] | None = None ) -> list[int]: if token_ids_1 is None: return list(token_ids_0) return list(token_ids_0) + list(token_ids_1) def get_special_tokens_mask( self, token_ids_0: list[int], token_ids_1: list[int] | None = None, already_has_special_tokens: bool = False, ) -> list[int]: special_ids = set(self._special_token_to_id.values()) if already_has_special_tokens: return [1 if token_id in special_ids else 0 for token_id in token_ids_0] token_ids = list(token_ids_0) if token_ids_1 is None else list(token_ids_0) + list(token_ids_1) return [1 if token_id in special_ids else 0 for token_id in token_ids] def create_token_type_ids_from_sequences( self, token_ids_0: list[int], token_ids_1: list[int] | None = None ) -> list[int]: length = len(token_ids_0) if token_ids_1 is None else len(token_ids_0) + len(token_ids_1) return [0] * length def save_vocabulary(self, save_directory: str, filename_prefix: str | None = None): if not os.path.isdir(save_directory): raise ValueError(f"Vocabulary path {save_directory!r} is not a directory") name = "vocab.txt" if filename_prefix is None else f"{filename_prefix}-vocab.txt" out = Path(save_directory) / name if Path(self.vocab_file).resolve() != out.resolve(): shutil.copyfile(self.vocab_file, out) return (str(out),)