""" BPE Tokenizer in the style of GPT-4. Two implementations are available: 1) HuggingFace Tokenizer that can do both training and inference but is really confusing 2) Our own RustBPE Tokenizer for training and tiktoken for efficient inference """ import os import copy from functools import lru_cache SPECIAL_TOKENS = [ # every document begins with the Beginning of Sequence (BOS) token that delimits documents "<|bos|>", # tokens below are only used during finetuning to render Conversations into token ids "<|user_start|>", # user messages "<|user_end|>", "<|assistant_start|>", # assistant messages "<|assistant_end|>", "<|python_start|>", # assistant invokes python REPL tool "<|python_end|>", "<|output_start|>", # python REPL outputs back to assistant "<|output_end|>", ] # NOTE: this split pattern deviates from GPT-4 in that we use \p{N}{1,2} instead of \p{N}{1,3} # I did this because I didn't want to "waste" too many tokens on numbers for smaller vocab sizes. # I haven't validated that this is actually a good idea, TODO. SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,2}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+""" # ----------------------------------------------------------------------------- # Generic GPT-4-style tokenizer based on HuggingFace Tokenizer from tokenizers import Tokenizer as HFTokenizer from tokenizers import pre_tokenizers, decoders, Regex from tokenizers.models import BPE from tokenizers.trainers import BpeTrainer class HuggingFaceTokenizer: """Light wrapper around HuggingFace Tokenizer for some utilities""" def __init__(self, tokenizer): self.tokenizer = tokenizer @classmethod def from_pretrained(cls, hf_path): # init from a HuggingFace pretrained tokenizer (e.g. "gpt2") tokenizer = HFTokenizer.from_pretrained(hf_path) return cls(tokenizer) @classmethod def from_directory(cls, tokenizer_dir): # init from a local directory on disk (e.g. "out/tokenizer") tokenizer_path = os.path.join(tokenizer_dir, "tokenizer.json") tokenizer = HFTokenizer.from_file(tokenizer_path) return cls(tokenizer) @classmethod def train_from_iterator(cls, text_iterator, vocab_size): # train from an iterator of text # Configure the HuggingFace Tokenizer tokenizer = HFTokenizer(BPE( byte_fallback=True, # needed! unk_token=None, fuse_unk=False, )) # Normalizer: None tokenizer.normalizer = None # Pre-tokenizer: GPT-4 style # the regex pattern used by GPT-4 to split text into groups before BPE # NOTE: The pattern was changed from \p{N}{1,3} to \p{N}{1,2} because I suspect it is harmful to # very small models and smaller vocab sizes, because it is a little bit wasteful in the token space. # (but I haven't validated this! TODO) gpt4_split_regex = Regex(SPLIT_PATTERN) # huggingface demands that you wrap it in Regex!! tokenizer.pre_tokenizer = pre_tokenizers.Sequence([ pre_tokenizers.Split(pattern=gpt4_split_regex, behavior="isolated", invert=False), pre_tokenizers.ByteLevel(add_prefix_space=False, use_regex=False) ]) # Decoder: ByteLevel (it pairs together with the ByteLevel pre-tokenizer) tokenizer.decoder = decoders.ByteLevel() # Post-processor: None tokenizer.post_processor = None # Trainer: BPE trainer = BpeTrainer( vocab_size=vocab_size, show_progress=True, min_frequency=0, # no minimum frequency initial_alphabet=pre_tokenizers.ByteLevel.alphabet(), special_tokens=SPECIAL_TOKENS, ) # Kick off the training tokenizer.train_from_iterator(text_iterator, trainer) return cls(tokenizer) def get_vocab_size(self): return self.tokenizer.get_vocab_size() def get_special_tokens(self): special_tokens_map = self.tokenizer.get_added_tokens_decoder() special_tokens = [w.content for w in special_tokens_map.values()] return special_tokens def id_to_token(self, id): return self.tokenizer.id_to_token(id) def _encode_one(self, text, prepend=None, append=None): # encode a single string # prepend/append can be either a string of a special token or a token id directly. assert isinstance(text, str) ids = [] if prepend is not None: prepend_id = prepend if isinstance(prepend, int) else self.encode_special(prepend) ids.append(prepend_id) ids.extend(self.tokenizer.encode(text, add_special_tokens=False).ids) if append is not None: append_id = append if isinstance(append, int) else self.encode_special(append) ids.append(append_id) return ids def encode_special(self, text): # encode a single special token via exact match return self.tokenizer.token_to_id(text) def get_bos_token_id(self): bos = self.encode_special("<|bos|>") return bos def encode(self, text, *args, **kwargs): if isinstance(text, str): return self._encode_one(text, *args, **kwargs) elif isinstance(text, list): return [self._encode_one(t, *args, **kwargs) for t in text] else: raise ValueError(f"Invalid input type: {type(text)}") def __call__(self, *args, **kwargs): return self.encode(*args, **kwargs) def decode(self, ids): return self.tokenizer.decode(ids, skip_special_tokens=False) def save(self, tokenizer_dir): # save the tokenizer to disk os.makedirs(tokenizer_dir, exist_ok=True) tokenizer_path = os.path.join(tokenizer_dir, "tokenizer.json") self.tokenizer.save(tokenizer_path) print(f"Saved tokenizer to {tokenizer_path}") # ----------------------------------------------------------------------------- # Tokenizer based on rustbpe + tiktoken combo import pickle import rustbpe import tiktoken class RustBPETokenizer: """Light wrapper around tiktoken (for efficient inference) but train with rustbpe""" def __init__(self, enc, bos_token): self.enc = enc self.bos_token_id = self.encode_special(bos_token) @classmethod def train_from_iterator(cls, text_iterator, vocab_size): # 1) train using rustbpe tokenizer = rustbpe.Tokenizer() # the special tokens are inserted later in __init__, we don't train them here vocab_size_no_special = vocab_size - len(SPECIAL_TOKENS) assert vocab_size_no_special >= 256, f"vocab_size_no_special must be at least 256, got {vocab_size_no_special}" tokenizer.train_from_iterator(text_iterator, vocab_size_no_special, pattern=SPLIT_PATTERN) # 2) construct the associated tiktoken encoding for inference pattern = tokenizer.get_pattern() mergeable_ranks_list = tokenizer.get_mergeable_ranks() mergeable_ranks = {bytes(k): v for k, v in mergeable_ranks_list} tokens_offset = len(mergeable_ranks) special_tokens = {name: tokens_offset + i for i, name in enumerate(SPECIAL_TOKENS)} enc = tiktoken.Encoding( name="rustbpe", pat_str=pattern, mergeable_ranks=mergeable_ranks, # dict[bytes, int] (token bytes -> merge priority rank) special_tokens=special_tokens, # dict[str, int] (special token name -> token id) ) return cls(enc, "<|bos|>") @classmethod def from_directory(cls, tokenizer_dir): pickle_path = os.path.join(tokenizer_dir, "tokenizer.pkl") with open(pickle_path, "rb") as f: enc = pickle.load(f) return cls(enc, "<|bos|>") @classmethod def from_pretrained(cls, tiktoken_name): # https://github.com/openai/tiktoken/blob/eedc8563/tiktoken_ext/openai_public.py enc = tiktoken.get_encoding(tiktoken_name) # tiktoken calls the special document delimiter token "<|endoftext|>" # yes this is confusing because this token is almost always PREPENDED to the beginning of the document # it most often is used to signal the start of a new sequence to the LLM during inference etc. # so in nanoChat we always use "<|bos|>" short for "beginning of sequence", but historically it is often called "<|endoftext|>". return cls(enc, "<|endoftext|>") def get_vocab_size(self): return self.enc.n_vocab def get_special_tokens(self): return self.enc.special_tokens_set def id_to_token(self, id): return self.enc.decode([id]) @lru_cache(maxsize=32) def encode_special(self, text): return self.enc.encode_single_token(text) def get_bos_token_id(self): return self.bos_token_id def encode(self, text, prepend=None, append=None, num_threads=8): # text can be either a string or a list of strings if prepend is not None: prepend_id = prepend if isinstance(prepend, int) else self.encode_special(prepend) if append is not None: append_id = append if isinstance(append, int) else self.encode_special(append) if isinstance(text, str): ids = self.enc.encode_ordinary(text) if prepend is not None: ids.insert(0, prepend_id) # TODO: slightly inefficient here? :( hmm if append is not None: ids.append(append_id) elif isinstance(text, list): ids = self.enc.encode_ordinary_batch(text, num_threads=num_threads) if prepend is not None: for ids_row in ids: ids_row.insert(0, prepend_id) # TODO: same if append is not None: for ids_row in ids: ids_row.append(append_id) else: raise ValueError(f"Invalid input type: {type(text)}") return ids def __call__(self, *args, **kwargs): return self.encode(*args, **kwargs) def decode(self, ids): return self.enc.decode(ids) def save(self, tokenizer_dir): # save the encoding object to disk os.makedirs(tokenizer_dir, exist_ok=True) pickle_path = os.path.join(tokenizer_dir, "tokenizer.pkl") with open(pickle_path, "wb") as f: pickle.dump(self.enc, f) print(f"Saved tokenizer encoding to {pickle_path}") def render_conversation(self, conversation, max_tokens=2048): """ Tokenize a single Chat conversation (which we call a "doc" or "document" here). Returns: - ids: list[int] is a list of token ids of this rendered conversation - mask: list[int] of same length, mask = 1 for tokens that the Assistant is expected to train on. """ # ids, masks that we will return and a helper function to help build them up. ids, mask = [], [] def add_tokens(token_ids, mask_val): if isinstance(token_ids, int): token_ids = [token_ids] ids.extend(token_ids) mask.extend([mask_val] * len(token_ids)) # sometimes the first message is a system message... # => just merge it with the second (user) message if conversation["messages"][0]["role"] == "system": # some conversation surgery is necessary here for now... conversation = copy.deepcopy(conversation) # avoid mutating the original messages = conversation["messages"] assert messages[1]["role"] == "user", "System message must be followed by a user message" messages[1]["content"] = messages[0]["content"] + "\n\n" + messages[1]["content"] messages = messages[1:] else: messages = conversation["messages"] assert len(messages) >= 1, f"Conversation has less than 1 message: {messages}" # fetch all the special tokens we need bos = self.get_bos_token_id() user_start, user_end = self.encode_special("<|user_start|>"), self.encode_special("<|user_end|>") assistant_start, assistant_end = self.encode_special("<|assistant_start|>"), self.encode_special("<|assistant_end|>") python_start, python_end = self.encode_special("<|python_start|>"), self.encode_special("<|python_end|>") output_start, output_end = self.encode_special("<|output_start|>"), self.encode_special("<|output_end|>") # now we can tokenize the conversation add_tokens(bos, 0) for i, message in enumerate(messages): # some sanity checking here around assumptions, to prevent footguns must_be_from = "user" if i % 2 == 0 else "assistant" assert message["role"] == must_be_from, f"Message {i} is from {message['role']} but should be from {must_be_from}" # content can be either a simple string or a list of parts (e.g. containing tool calls) content = message["content"] if message["role"] == "user": assert isinstance(content, str), "User messages are simply expected to be strings" value_ids = self.encode(content) add_tokens(user_start, 0) add_tokens(value_ids, 0) add_tokens(user_end, 0) elif message["role"] == "assistant": add_tokens(assistant_start, 0) if isinstance(content, str): # simple string => simply add the tokens value_ids = self.encode(content) add_tokens(value_ids, 1) elif isinstance(content, list): for part in content: value_ids = self.encode(part["text"]) if part["type"] == "text": # string part => simply add the tokens add_tokens(value_ids, 1) elif part["type"] == "python": # python tool call => add the tokens inside <|python_start|> and <|python_end|> add_tokens(python_start, 1) add_tokens(value_ids, 1) add_tokens(python_end, 1) elif part["type"] == "python_output": # python output => add the tokens inside <|output_start|> and <|output_end|> # none of these tokens are supervised because the tokens come from Python at test time add_tokens(output_start, 0) add_tokens(value_ids, 0) add_tokens(output_end, 0) else: raise ValueError(f"Unknown part type: {part['type']}") else: raise ValueError(f"Unknown content type: {type(content)}") add_tokens(assistant_end, 1) # truncate to max_tokens tokens MAX (helps prevent OOMs) ids = ids[:max_tokens] mask = mask[:max_tokens] return ids, mask def visualize_tokenization(self, ids, mask): """Small helper function useful in debugging: visualize the tokenization of render_conversation""" RED = '\033[91m' GREEN = '\033[92m' RESET = '\033[0m' tokens = [] for i, (token_id, mask_val) in enumerate(zip(ids, mask)): token_str = self.decode([token_id]) color = GREEN if mask_val == 1 else RED tokens.append(f"{color}{token_str}{RESET}") return '|'.join(tokens) def render_for_completion(self, conversation): """ Used during Reinforcement Learning. In that setting, we want to render the conversation priming the Assistant for a completion. Unlike the Chat SFT case, we don't need to return the mask. """ # We have some surgery to do: we need to pop the last message (of the Assistant) conversation = copy.deepcopy(conversation) # avoid mutating the original messages = conversation["messages"] assert messages[-1]["role"] == "assistant", "Last message must be from the Assistant" messages.pop() # remove the last message (of the Assistant) inplace # Now tokenize the conversation ids, mask = self.render_conversation(conversation) # Finally, to prime the Assistant for a completion, append the Assistant start token assistant_start = self.encode_special("<|assistant_start|>") ids.append(assistant_start) return ids # ----------------------------------------------------------------------------- # nanochat-specific convenience functions def get_tokenizer(): from nanochat.common import get_base_dir base_dir = get_base_dir() tokenizer_dir = os.path.join(base_dir, "tokenizer") # return HuggingFaceTokenizer.from_directory(tokenizer_dir) return RustBPETokenizer.from_directory(tokenizer_dir) def get_token_bytes(device="cpu"): import torch from nanochat.common import get_base_dir base_dir = get_base_dir() tokenizer_dir = os.path.join(base_dir, "tokenizer") token_bytes_path = os.path.join(tokenizer_dir, "token_bytes.pt") assert os.path.exists(token_bytes_path), f"Token bytes not found at {token_bytes_path}? It gets written by tok_train.py" with open(token_bytes_path, "rb") as f: token_bytes = torch.load(f, map_location=device) return token_bytes