Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| from __future__ import print_function, division | |
| import glob | |
| import json | |
| import uuid | |
| from copy import deepcopy | |
| from collections import defaultdict, OrderedDict | |
| import numpy as np | |
| from torchmoji.filter_utils import is_special_token | |
| from torchmoji.word_generator import WordGenerator | |
| from torchmoji.global_variables import SPECIAL_TOKENS, VOCAB_PATH | |
| class VocabBuilder(): | |
| """ Create vocabulary with words extracted from sentences as fed from a | |
| word generator. | |
| """ | |
| def __init__(self, word_gen): | |
| # initialize any new key with value of 0 | |
| self.word_counts = defaultdict(lambda: 0, {}) | |
| self.word_length_limit=30 | |
| for token in SPECIAL_TOKENS: | |
| assert len(token) < self.word_length_limit | |
| self.word_counts[token] = 0 | |
| self.word_gen = word_gen | |
| def count_words_in_sentence(self, words): | |
| """ Generates word counts for all tokens in the given sentence. | |
| # Arguments: | |
| words: Tokenized sentence whose words should be counted. | |
| """ | |
| for word in words: | |
| if 0 < len(word) and len(word) <= self.word_length_limit: | |
| try: | |
| self.word_counts[word] += 1 | |
| except KeyError: | |
| self.word_counts[word] = 1 | |
| def save_vocab(self, path=None): | |
| """ Saves the vocabulary into a file. | |
| # Arguments: | |
| path: Where the vocabulary should be saved. If not specified, a | |
| randomly generated filename is used instead. | |
| """ | |
| dtype = ([('word','|S{}'.format(self.word_length_limit)),('count','int')]) | |
| np_dict = np.array(self.word_counts.items(), dtype=dtype) | |
| # sort from highest to lowest frequency | |
| np_dict[::-1].sort(order='count') | |
| data = np_dict | |
| if path is None: | |
| path = str(uuid.uuid4()) | |
| np.savez_compressed(path, data=data) | |
| print("Saved dict to {}".format(path)) | |
| def get_next_word(self): | |
| """ Returns next tokenized sentence from the word geneerator. | |
| # Returns: | |
| List of strings, representing the next tokenized sentence. | |
| """ | |
| return self.word_gen.__iter__().next() | |
| def count_all_words(self): | |
| """ Generates word counts for all words in all sentences of the word | |
| generator. | |
| """ | |
| for words, _ in self.word_gen: | |
| self.count_words_in_sentence(words) | |
| class MasterVocab(): | |
| """ Combines vocabularies. | |
| """ | |
| def __init__(self): | |
| # initialize custom tokens | |
| self.master_vocab = {} | |
| def populate_master_vocab(self, vocab_path, min_words=1, force_appearance=None): | |
| """ Populates the master vocabulary using all vocabularies found in the | |
| given path. Vocabularies should be named *.npz. Expects the | |
| vocabularies to be numpy arrays with counts. Normalizes the counts | |
| and combines them. | |
| # Arguments: | |
| vocab_path: Path containing vocabularies to be combined. | |
| min_words: Minimum amount of occurences a word must have in order | |
| to be included in the master vocabulary. | |
| force_appearance: Optional vocabulary filename that will be added | |
| to the master vocabulary no matter what. This vocabulary must | |
| be present in vocab_path. | |
| """ | |
| paths = glob.glob(vocab_path + '*.npz') | |
| sizes = {path: 0 for path in paths} | |
| dicts = {path: {} for path in paths} | |
| # set up and get sizes of individual dictionaries | |
| for path in paths: | |
| np_data = np.load(path)['data'] | |
| for entry in np_data: | |
| word, count = entry | |
| if count < min_words: | |
| continue | |
| if is_special_token(word): | |
| continue | |
| dicts[path][word] = count | |
| sizes[path] = sum(dicts[path].values()) | |
| print('Overall word count for {} -> {}'.format(path, sizes[path])) | |
| print('Overall word number for {} -> {}'.format(path, len(dicts[path]))) | |
| vocab_of_max_size = max(sizes, key=sizes.get) | |
| max_size = sizes[vocab_of_max_size] | |
| print('Min: {}, {}, {}'.format(sizes, vocab_of_max_size, max_size)) | |
| # can force one vocabulary to always be present | |
| if force_appearance is not None: | |
| force_appearance_path = [p for p in paths if force_appearance in p][0] | |
| force_appearance_vocab = deepcopy(dicts[force_appearance_path]) | |
| print(force_appearance_path) | |
| else: | |
| force_appearance_path, force_appearance_vocab = None, None | |
| # normalize word counts before inserting into master dict | |
| for path in paths: | |
| normalization_factor = max_size / sizes[path] | |
| print('Norm factor for path {} -> {}'.format(path, normalization_factor)) | |
| for word in dicts[path]: | |
| if is_special_token(word): | |
| print("SPECIAL - ", word) | |
| continue | |
| normalized_count = dicts[path][word] * normalization_factor | |
| # can force one vocabulary to always be present | |
| if force_appearance_vocab is not None: | |
| try: | |
| force_word_count = force_appearance_vocab[word] | |
| except KeyError: | |
| continue | |
| #if force_word_count < 5: | |
| #continue | |
| if word in self.master_vocab: | |
| self.master_vocab[word] += normalized_count | |
| else: | |
| self.master_vocab[word] = normalized_count | |
| print('Size of master_dict {}'.format(len(self.master_vocab))) | |
| print("Hashes for master dict: {}".format( | |
| len([w for w in self.master_vocab if '#' in w[0]]))) | |
| def save_vocab(self, path_count, path_vocab, word_limit=100000): | |
| """ Saves the master vocabulary into a file. | |
| """ | |
| # reserve space for 10 special tokens | |
| words = OrderedDict() | |
| for token in SPECIAL_TOKENS: | |
| # store -1 instead of np.inf, which can overflow | |
| words[token] = -1 | |
| # sort words by frequency | |
| desc_order = OrderedDict(sorted(self.master_vocab.items(), | |
| key=lambda kv: kv[1], reverse=True)) | |
| words.update(desc_order) | |
| # use encoding of up to 30 characters (no token conversions) | |
| # use float to store large numbers (we don't care about precision loss) | |
| np_vocab = np.array(words.items(), | |
| dtype=([('word','|S30'),('count','float')])) | |
| # output count for debugging | |
| counts = np_vocab[:word_limit] | |
| np.savez_compressed(path_count, counts=counts) | |
| # output the index of each word for easy lookup | |
| final_words = OrderedDict() | |
| for i, w in enumerate(words.keys()[:word_limit]): | |
| final_words.update({w:i}) | |
| with open(path_vocab, 'w') as f: | |
| f.write(json.dumps(final_words, indent=4, separators=(',', ': '))) | |
| def all_words_in_sentences(sentences): | |
| """ Extracts all unique words from a given list of sentences. | |
| # Arguments: | |
| sentences: List or word generator of sentences to be processed. | |
| # Returns: | |
| List of all unique words contained in the given sentences. | |
| """ | |
| vocab = [] | |
| if isinstance(sentences, WordGenerator): | |
| sentences = [s for s, _ in sentences] | |
| for sentence in sentences: | |
| for word in sentence: | |
| if word not in vocab: | |
| vocab.append(word) | |
| return vocab | |
| def extend_vocab_in_file(vocab, max_tokens=10000, vocab_path=VOCAB_PATH): | |
| """ Extends JSON-formatted vocabulary with words from vocab that are not | |
| present in the current vocabulary. Adds up to max_tokens words. | |
| Overwrites file in vocab_path. | |
| # Arguments: | |
| new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. | |
| must have run count_all_words() previously. | |
| max_tokens: Maximum number of words to be added. | |
| vocab_path: Path to the vocabulary json which is to be extended. | |
| """ | |
| try: | |
| with open(vocab_path, 'r') as f: | |
| current_vocab = json.load(f) | |
| except IOError: | |
| print('Vocabulary file not found, expected at ' + vocab_path) | |
| return | |
| extend_vocab(current_vocab, vocab, max_tokens) | |
| # Save back to file | |
| with open(vocab_path, 'w') as f: | |
| json.dump(current_vocab, f, sort_keys=True, indent=4, separators=(',',': ')) | |
| def extend_vocab(current_vocab, new_vocab, max_tokens=10000): | |
| """ Extends current vocabulary with words from vocab that are not | |
| present in the current vocabulary. Adds up to max_tokens words. | |
| # Arguments: | |
| current_vocab: Current dictionary of tokens. | |
| new_vocab: Vocabulary to be added. MUST have word_counts populated, i.e. | |
| must have run count_all_words() previously. | |
| max_tokens: Maximum number of words to be added. | |
| # Returns: | |
| How many new tokens have been added. | |
| """ | |
| if max_tokens < 0: | |
| max_tokens = 10000 | |
| words = OrderedDict() | |
| # sort words by frequency | |
| desc_order = OrderedDict(sorted(new_vocab.word_counts.items(), | |
| key=lambda kv: kv[1], reverse=True)) | |
| words.update(desc_order) | |
| base_index = len(current_vocab.keys()) | |
| added = 0 | |
| for word in words: | |
| if added >= max_tokens: | |
| break | |
| if word not in current_vocab.keys(): | |
| current_vocab[word] = base_index + added | |
| added += 1 | |
| return added | |