Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| """ | |
| BPE Tokenizer in the style of GPT-4. | |
| Two implementations are available: | |
| 1) HuggingFace Tokenizer that can do both training and inference but is really confusing | |
| 2) Our own RustBPE Tokenizer for training and tiktoken for efficient inference | |
| """ | |
| import os | |
| import copy | |
| from functools import lru_cache | |
| SPECIAL_TOKENS = [ | |
| # every document begins with the Beginning of Sequence (BOS) token that delimits documents | |
| "<|bos|>", | |
| # tokens below are only used during finetuning to render Conversations into token ids | |
| "<|user_start|>", # user messages | |
| "<|user_end|>", | |
| "<|assistant_start|>", # assistant messages | |
| "<|assistant_end|>", | |
| "<|python_start|>", # assistant invokes python REPL tool | |
| "<|python_end|>", | |
| "<|output_start|>", # python REPL outputs back to assistant | |
| "<|output_end|>", | |
| ] | |
| # NOTE: this split pattern deviates from GPT-4 in that we use \p{N}{1,2} instead of \p{N}{1,3} | |
| # I did this because I didn't want to "waste" too many tokens on numbers for smaller vocab sizes. | |
| # I haven't validated that this is actually a good idea, TODO. | |
| SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,2}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+""" | |
| # ----------------------------------------------------------------------------- | |
| # Generic GPT-4-style tokenizer based on HuggingFace Tokenizer | |
| from tokenizers import Tokenizer as HFTokenizer | |
| from tokenizers import pre_tokenizers, decoders, Regex | |
| from tokenizers.models import BPE | |
| from tokenizers.trainers import BpeTrainer | |
| class HuggingFaceTokenizer: | |
| """Light wrapper around HuggingFace Tokenizer for some utilities""" | |
| def __init__(self, tokenizer): | |
| self.tokenizer = tokenizer | |
| def from_pretrained(cls, hf_path): | |
| # init from a HuggingFace pretrained tokenizer (e.g. "gpt2") | |
| tokenizer = HFTokenizer.from_pretrained(hf_path) | |
| return cls(tokenizer) | |
| def from_directory(cls, tokenizer_dir): | |
| # init from a local directory on disk (e.g. "out/tokenizer") | |
| tokenizer_path = os.path.join(tokenizer_dir, "tokenizer.json") | |
| tokenizer = HFTokenizer.from_file(tokenizer_path) | |
| return cls(tokenizer) | |
| def train_from_iterator(cls, text_iterator, vocab_size): | |
| # train from an iterator of text | |
| # Configure the HuggingFace Tokenizer | |
| tokenizer = HFTokenizer(BPE( | |
| byte_fallback=True, # needed! | |
| unk_token=None, | |
| fuse_unk=False, | |
| )) | |
| # Normalizer: None | |
| tokenizer.normalizer = None | |
| # Pre-tokenizer: GPT-4 style | |
| # the regex pattern used by GPT-4 to split text into groups before BPE | |
| # NOTE: The pattern was changed from \p{N}{1,3} to \p{N}{1,2} because I suspect it is harmful to | |
| # very small models and smaller vocab sizes, because it is a little bit wasteful in the token space. | |
| # (but I haven't validated this! TODO) | |
| gpt4_split_regex = Regex(SPLIT_PATTERN) # huggingface demands that you wrap it in Regex!! | |
| tokenizer.pre_tokenizer = pre_tokenizers.Sequence([ | |
| pre_tokenizers.Split(pattern=gpt4_split_regex, behavior="isolated", invert=False), | |
| pre_tokenizers.ByteLevel(add_prefix_space=False, use_regex=False) | |
| ]) | |
| # Decoder: ByteLevel (it pairs together with the ByteLevel pre-tokenizer) | |
| tokenizer.decoder = decoders.ByteLevel() | |
| # Post-processor: None | |
| tokenizer.post_processor = None | |
| # Trainer: BPE | |
| trainer = BpeTrainer( | |
| vocab_size=vocab_size, | |
| show_progress=True, | |
| min_frequency=0, # no minimum frequency | |
| initial_alphabet=pre_tokenizers.ByteLevel.alphabet(), | |
| special_tokens=SPECIAL_TOKENS, | |
| ) | |
| # Kick off the training | |
| tokenizer.train_from_iterator(text_iterator, trainer) | |
| return cls(tokenizer) | |
| def get_vocab_size(self): | |
| return self.tokenizer.get_vocab_size() | |
| def get_special_tokens(self): | |
| special_tokens_map = self.tokenizer.get_added_tokens_decoder() | |
| special_tokens = [w.content for w in special_tokens_map.values()] | |
| return special_tokens | |
| def id_to_token(self, id): | |
| return self.tokenizer.id_to_token(id) | |
| def _encode_one(self, text, prepend=None, append=None): | |
| # encode a single string | |
| # prepend/append can be either a string of a special token or a token id directly. | |
| assert isinstance(text, str) | |
| ids = [] | |
| if prepend is not None: | |
| prepend_id = prepend if isinstance(prepend, int) else self.encode_special(prepend) | |
| ids.append(prepend_id) | |
| ids.extend(self.tokenizer.encode(text, add_special_tokens=False).ids) | |
| if append is not None: | |
| append_id = append if isinstance(append, int) else self.encode_special(append) | |
| ids.append(append_id) | |
| return ids | |
| def encode_special(self, text): | |
| # encode a single special token via exact match | |
| return self.tokenizer.token_to_id(text) | |
| def get_bos_token_id(self): | |
| bos = self.encode_special("<|bos|>") | |
| return bos | |
| def encode(self, text, *args, **kwargs): | |
| if isinstance(text, str): | |
| return self._encode_one(text, *args, **kwargs) | |
| elif isinstance(text, list): | |
| return [self._encode_one(t, *args, **kwargs) for t in text] | |
| else: | |
| raise ValueError(f"Invalid input type: {type(text)}") | |
| def __call__(self, *args, **kwargs): | |
| return self.encode(*args, **kwargs) | |
| def decode(self, ids): | |
| return self.tokenizer.decode(ids, skip_special_tokens=False) | |
| def save(self, tokenizer_dir): | |
| # save the tokenizer to disk | |
| os.makedirs(tokenizer_dir, exist_ok=True) | |
| tokenizer_path = os.path.join(tokenizer_dir, "tokenizer.json") | |
| self.tokenizer.save(tokenizer_path) | |
| print(f"Saved tokenizer to {tokenizer_path}") | |
| # ----------------------------------------------------------------------------- | |
| # Tokenizer based on rustbpe + tiktoken combo | |
| import pickle | |
| import rustbpe | |
| import tiktoken | |
| class RustBPETokenizer: | |
| """Light wrapper around tiktoken (for efficient inference) but train with rustbpe""" | |
| def __init__(self, enc, bos_token): | |
| self.enc = enc | |
| self.bos_token_id = self.encode_special(bos_token) | |
| def train_from_iterator(cls, text_iterator, vocab_size): | |
| # 1) train using rustbpe | |
| tokenizer = rustbpe.Tokenizer() | |
| # the special tokens are inserted later in __init__, we don't train them here | |
| vocab_size_no_special = vocab_size - len(SPECIAL_TOKENS) | |
| assert vocab_size_no_special >= 256, f"vocab_size_no_special must be at least 256, got {vocab_size_no_special}" | |
| tokenizer.train_from_iterator(text_iterator, vocab_size_no_special, pattern=SPLIT_PATTERN) | |
| # 2) construct the associated tiktoken encoding for inference | |
| pattern = tokenizer.get_pattern() | |
| mergeable_ranks_list = tokenizer.get_mergeable_ranks() | |
| mergeable_ranks = {bytes(k): v for k, v in mergeable_ranks_list} | |
| tokens_offset = len(mergeable_ranks) | |
| special_tokens = {name: tokens_offset + i for i, name in enumerate(SPECIAL_TOKENS)} | |
| enc = tiktoken.Encoding( | |
| name="rustbpe", | |
| pat_str=pattern, | |
| mergeable_ranks=mergeable_ranks, # dict[bytes, int] (token bytes -> merge priority rank) | |
| special_tokens=special_tokens, # dict[str, int] (special token name -> token id) | |
| ) | |
| return cls(enc, "<|bos|>") | |
| def from_directory(cls, tokenizer_dir): | |
| pickle_path = os.path.join(tokenizer_dir, "tokenizer.pkl") | |
| with open(pickle_path, "rb") as f: | |
| enc = pickle.load(f) | |
| return cls(enc, "<|bos|>") | |
| def from_pretrained(cls, tiktoken_name): | |
| # https://github.com/openai/tiktoken/blob/eedc8563/tiktoken_ext/openai_public.py | |
| enc = tiktoken.get_encoding(tiktoken_name) | |
| # tiktoken calls the special document delimiter token "<|endoftext|>" | |
| # yes this is confusing because this token is almost always PREPENDED to the beginning of the document | |
| # it most often is used to signal the start of a new sequence to the LLM during inference etc. | |
| # so in nanoChat we always use "<|bos|>" short for "beginning of sequence", but historically it is often called "<|endoftext|>". | |
| return cls(enc, "<|endoftext|>") | |
| def get_vocab_size(self): | |
| return self.enc.n_vocab | |
| def get_special_tokens(self): | |
| return self.enc.special_tokens_set | |
| def id_to_token(self, id): | |
| return self.enc.decode([id]) | |
| def encode_special(self, text): | |
| return self.enc.encode_single_token(text) | |
| def get_bos_token_id(self): | |
| return self.bos_token_id | |
| def encode(self, text, prepend=None, append=None, num_threads=8): | |
| # text can be either a string or a list of strings | |
| if prepend is not None: | |
| prepend_id = prepend if isinstance(prepend, int) else self.encode_special(prepend) | |
| if append is not None: | |
| append_id = append if isinstance(append, int) else self.encode_special(append) | |
| if isinstance(text, str): | |
| ids = self.enc.encode_ordinary(text) | |
| if prepend is not None: | |
| ids.insert(0, prepend_id) # TODO: slightly inefficient here? :( hmm | |
| if append is not None: | |
| ids.append(append_id) | |
| elif isinstance(text, list): | |
| ids = self.enc.encode_ordinary_batch(text, num_threads=num_threads) | |
| if prepend is not None: | |
| for ids_row in ids: | |
| ids_row.insert(0, prepend_id) # TODO: same | |
| if append is not None: | |
| for ids_row in ids: | |
| ids_row.append(append_id) | |
| else: | |
| raise ValueError(f"Invalid input type: {type(text)}") | |
| return ids | |
| def __call__(self, *args, **kwargs): | |
| return self.encode(*args, **kwargs) | |
| def decode(self, ids): | |
| return self.enc.decode(ids) | |
| def save(self, tokenizer_dir): | |
| # save the encoding object to disk | |
| os.makedirs(tokenizer_dir, exist_ok=True) | |
| pickle_path = os.path.join(tokenizer_dir, "tokenizer.pkl") | |
| with open(pickle_path, "wb") as f: | |
| pickle.dump(self.enc, f) | |
| print(f"Saved tokenizer encoding to {pickle_path}") | |
| def render_conversation(self, conversation, max_tokens=2048): | |
| """ | |
| Tokenize a single Chat conversation (which we call a "doc" or "document" here). | |
| Returns: | |
| - ids: list[int] is a list of token ids of this rendered conversation | |
| - mask: list[int] of same length, mask = 1 for tokens that the Assistant is expected to train on. | |
| """ | |
| # ids, masks that we will return and a helper function to help build them up. | |
| ids, mask = [], [] | |
| def add_tokens(token_ids, mask_val): | |
| if isinstance(token_ids, int): | |
| token_ids = [token_ids] | |
| ids.extend(token_ids) | |
| mask.extend([mask_val] * len(token_ids)) | |
| # sometimes the first message is a system message... | |
| # => just merge it with the second (user) message | |
| if conversation["messages"][0]["role"] == "system": | |
| # some conversation surgery is necessary here for now... | |
| conversation = copy.deepcopy(conversation) # avoid mutating the original | |
| messages = conversation["messages"] | |
| assert messages[1]["role"] == "user", "System message must be followed by a user message" | |
| messages[1]["content"] = messages[0]["content"] + "\n\n" + messages[1]["content"] | |
| messages = messages[1:] | |
| else: | |
| messages = conversation["messages"] | |
| assert len(messages) >= 1, f"Conversation has less than 1 message: {messages}" | |
| # fetch all the special tokens we need | |
| bos = self.get_bos_token_id() | |
| user_start, user_end = self.encode_special("<|user_start|>"), self.encode_special("<|user_end|>") | |
| assistant_start, assistant_end = self.encode_special("<|assistant_start|>"), self.encode_special("<|assistant_end|>") | |
| python_start, python_end = self.encode_special("<|python_start|>"), self.encode_special("<|python_end|>") | |
| output_start, output_end = self.encode_special("<|output_start|>"), self.encode_special("<|output_end|>") | |
| # now we can tokenize the conversation | |
| add_tokens(bos, 0) | |
| for i, message in enumerate(messages): | |
| # some sanity checking here around assumptions, to prevent footguns | |
| must_be_from = "user" if i % 2 == 0 else "assistant" | |
| assert message["role"] == must_be_from, f"Message {i} is from {message['role']} but should be from {must_be_from}" | |
| # content can be either a simple string or a list of parts (e.g. containing tool calls) | |
| content = message["content"] | |
| if message["role"] == "user": | |
| assert isinstance(content, str), "User messages are simply expected to be strings" | |
| value_ids = self.encode(content) | |
| add_tokens(user_start, 0) | |
| add_tokens(value_ids, 0) | |
| add_tokens(user_end, 0) | |
| elif message["role"] == "assistant": | |
| add_tokens(assistant_start, 0) | |
| if isinstance(content, str): | |
| # simple string => simply add the tokens | |
| value_ids = self.encode(content) | |
| add_tokens(value_ids, 1) | |
| elif isinstance(content, list): | |
| for part in content: | |
| value_ids = self.encode(part["text"]) | |
| if part["type"] == "text": | |
| # string part => simply add the tokens | |
| add_tokens(value_ids, 1) | |
| elif part["type"] == "python": | |
| # python tool call => add the tokens inside <|python_start|> and <|python_end|> | |
| add_tokens(python_start, 1) | |
| add_tokens(value_ids, 1) | |
| add_tokens(python_end, 1) | |
| elif part["type"] == "python_output": | |
| # python output => add the tokens inside <|output_start|> and <|output_end|> | |
| # none of these tokens are supervised because the tokens come from Python at test time | |
| add_tokens(output_start, 0) | |
| add_tokens(value_ids, 0) | |
| add_tokens(output_end, 0) | |
| else: | |
| raise ValueError(f"Unknown part type: {part['type']}") | |
| else: | |
| raise ValueError(f"Unknown content type: {type(content)}") | |
| add_tokens(assistant_end, 1) | |
| # truncate to max_tokens tokens MAX (helps prevent OOMs) | |
| ids = ids[:max_tokens] | |
| mask = mask[:max_tokens] | |
| return ids, mask | |
| def visualize_tokenization(self, ids, mask): | |
| """Small helper function useful in debugging: visualize the tokenization of render_conversation""" | |
| RED = '\033[91m' | |
| GREEN = '\033[92m' | |
| RESET = '\033[0m' | |
| tokens = [] | |
| for i, (token_id, mask_val) in enumerate(zip(ids, mask)): | |
| token_str = self.decode([token_id]) | |
| color = GREEN if mask_val == 1 else RED | |
| tokens.append(f"{color}{token_str}{RESET}") | |
| return '|'.join(tokens) | |
| def render_for_completion(self, conversation): | |
| """ | |
| Used during Reinforcement Learning. In that setting, we want to | |
| render the conversation priming the Assistant for a completion. | |
| Unlike the Chat SFT case, we don't need to return the mask. | |
| """ | |
| # We have some surgery to do: we need to pop the last message (of the Assistant) | |
| conversation = copy.deepcopy(conversation) # avoid mutating the original | |
| messages = conversation["messages"] | |
| assert messages[-1]["role"] == "assistant", "Last message must be from the Assistant" | |
| messages.pop() # remove the last message (of the Assistant) inplace | |
| # Now tokenize the conversation | |
| ids, mask = self.render_conversation(conversation) | |
| # Finally, to prime the Assistant for a completion, append the Assistant start token | |
| assistant_start = self.encode_special("<|assistant_start|>") | |
| ids.append(assistant_start) | |
| return ids | |
| # ----------------------------------------------------------------------------- | |
| # nanochat-specific convenience functions | |
| def get_tokenizer(): | |
| from nanochat.common import get_base_dir | |
| base_dir = get_base_dir() | |
| tokenizer_dir = os.path.join(base_dir, "tokenizer") | |
| # return HuggingFaceTokenizer.from_directory(tokenizer_dir) | |
| return RustBPETokenizer.from_directory(tokenizer_dir) | |
| def get_token_bytes(device="cpu"): | |
| import torch | |
| from nanochat.common import get_base_dir | |
| base_dir = get_base_dir() | |
| tokenizer_dir = os.path.join(base_dir, "tokenizer") | |
| token_bytes_path = os.path.join(tokenizer_dir, "token_bytes.pt") | |
| assert os.path.exists(token_bytes_path), f"Token bytes not found at {token_bytes_path}? It gets written by tok_train.py" | |
| with open(token_bytes_path, "rb") as f: | |
| token_bytes = torch.load(f, map_location=device) | |
| return token_bytes | |