Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| import numpy as np | |
| import pandas as pd | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModel, | |
| AutoConfig, | |
| TFAutoModelForSequenceClassification, | |
| ) | |
| from tensorflow import keras | |
| from sklearn.model_selection import train_test_split | |
| import logging | |
| import time | |
| from .models import Models, ModelsByFamily # noqa: F401 | |
| from .split_strategies import ( # noqa: F401 | |
| SplitStrategy, | |
| SplitStrategies, | |
| RegexExpressions | |
| ) | |
| from .aggregation_strategies import ( # noqa: F401 | |
| AggregationStrategy, | |
| AggregationStrategies | |
| ) | |
| from .helper import ( | |
| get_features, | |
| softmax, | |
| remove_dir, | |
| make_dir, | |
| copy_dir | |
| ) | |
| AUTOSAVE_PATH = './ernie-autosave/' | |
| def clean_autosave(): | |
| remove_dir(AUTOSAVE_PATH) | |
| class SentenceClassifier: | |
| def __init__(self, | |
| model_name=Models.BertBaseUncased, | |
| model_path=None, | |
| max_length=64, | |
| labels_no=2, | |
| tokenizer_kwargs=None, | |
| model_kwargs=None): | |
| self._loaded_data = False | |
| self._model_path = None | |
| if model_kwargs is None: | |
| model_kwargs = {} | |
| model_kwargs['num_labels'] = labels_no | |
| if tokenizer_kwargs is None: | |
| tokenizer_kwargs = {} | |
| tokenizer_kwargs['max_len'] = max_length | |
| if model_path is not None: | |
| self._load_local_model(model_path) | |
| else: | |
| self._load_remote_model(model_name, tokenizer_kwargs, model_kwargs) | |
| def model(self): | |
| return self._model | |
| def tokenizer(self): | |
| return self._tokenizer | |
| def load_dataset(self, | |
| dataframe=None, | |
| validation_split=0.1, | |
| random_state=None, | |
| stratify=None, | |
| csv_path=None, | |
| read_csv_kwargs=None): | |
| if dataframe is None and csv_path is None: | |
| raise ValueError | |
| if csv_path is not None: | |
| dataframe = pd.read_csv(csv_path, **read_csv_kwargs) | |
| sentences = list(dataframe[dataframe.columns[0]]) | |
| labels = dataframe[dataframe.columns[1]].values | |
| ( | |
| training_sentences, | |
| validation_sentences, | |
| training_labels, | |
| validation_labels | |
| ) = train_test_split( | |
| sentences, | |
| labels, | |
| test_size=validation_split, | |
| shuffle=True, | |
| random_state=random_state, | |
| stratify=stratify | |
| ) | |
| self._training_features = get_features( | |
| self._tokenizer, training_sentences, training_labels) | |
| self._training_size = len(training_sentences) | |
| self._validation_features = get_features( | |
| self._tokenizer, | |
| validation_sentences, | |
| validation_labels | |
| ) | |
| self._validation_split = len(validation_sentences) | |
| logging.info(f'training_size: {self._training_size}') | |
| logging.info(f'validation_split: {self._validation_split}') | |
| self._loaded_data = True | |
| def fine_tune(self, | |
| epochs=4, | |
| learning_rate=2e-5, | |
| epsilon=1e-8, | |
| clipnorm=1.0, | |
| optimizer_function=keras.optimizers.Adam, | |
| optimizer_kwargs=None, | |
| loss_function=keras.losses.SparseCategoricalCrossentropy, | |
| loss_kwargs=None, | |
| accuracy_function=keras.metrics.SparseCategoricalAccuracy, | |
| accuracy_kwargs=None, | |
| training_batch_size=32, | |
| validation_batch_size=64, | |
| **kwargs): | |
| if not self._loaded_data: | |
| raise Exception('Data has not been loaded.') | |
| if optimizer_kwargs is None: | |
| optimizer_kwargs = { | |
| 'learning_rate': learning_rate, | |
| 'epsilon': epsilon, | |
| 'clipnorm': clipnorm | |
| } | |
| optimizer = optimizer_function(**optimizer_kwargs) | |
| if loss_kwargs is None: | |
| loss_kwargs = {'from_logits': True} | |
| loss = loss_function(**loss_kwargs) | |
| if accuracy_kwargs is None: | |
| accuracy_kwargs = {'name': 'accuracy'} | |
| accuracy = accuracy_function(**accuracy_kwargs) | |
| self._model.compile(optimizer=optimizer, loss=loss, metrics=[accuracy]) | |
| training_features = self._training_features.shuffle( | |
| self._training_size).batch(training_batch_size).repeat(-1) | |
| validation_features = self._validation_features.batch( | |
| validation_batch_size) | |
| training_steps = self._training_size // training_batch_size | |
| if training_steps == 0: | |
| training_steps = self._training_size | |
| logging.info(f'training_steps: {training_steps}') | |
| validation_steps = self._validation_split // validation_batch_size | |
| if validation_steps == 0: | |
| validation_steps = self._validation_split | |
| logging.info(f'validation_steps: {validation_steps}') | |
| for i in range(epochs): | |
| self._model.fit(training_features, | |
| epochs=1, | |
| validation_data=validation_features, | |
| steps_per_epoch=training_steps, | |
| validation_steps=validation_steps, | |
| **kwargs) | |
| # The fine-tuned model does not have the same input interface | |
| # after being exported and loaded again. | |
| self._reload_model() | |
| def predict_one( | |
| self, | |
| text, | |
| split_strategy=None, | |
| aggregation_strategy=None | |
| ): | |
| return next( | |
| self.predict([text], | |
| batch_size=1, | |
| split_strategy=split_strategy, | |
| aggregation_strategy=aggregation_strategy)) | |
| def predict( | |
| self, | |
| texts, | |
| batch_size=32, | |
| split_strategy=None, | |
| aggregation_strategy=None | |
| ): | |
| if split_strategy is None: | |
| yield from self._predict_batch(texts, batch_size) | |
| else: | |
| if aggregation_strategy is None: | |
| aggregation_strategy = AggregationStrategies.Mean | |
| split_indexes = [0] | |
| sentences = [] | |
| for text in texts: | |
| new_sentences = split_strategy.split(text, self.tokenizer) | |
| if not new_sentences: | |
| continue | |
| split_indexes.append(split_indexes[-1] + len(new_sentences)) | |
| sentences.extend(new_sentences) | |
| predictions = list(self._predict_batch(sentences, batch_size)) | |
| for i, split_index in enumerate(split_indexes[:-1]): | |
| stop_index = split_indexes[i + 1] | |
| yield aggregation_strategy.aggregate( | |
| predictions[split_index:stop_index] | |
| ) | |
| def dump(self, path): | |
| if self._model_path: | |
| copy_dir(self._model_path, path) | |
| else: | |
| self._dump(path) | |
| def _dump(self, path): | |
| make_dir(path) | |
| make_dir(path + '/tokenizer') | |
| self._model.save_pretrained(path) | |
| self._tokenizer.save_pretrained(path + '/tokenizer') | |
| self._config.save_pretrained(path + '/tokenizer') | |
| def _predict_batch(self, sentences: list, batch_size: int): | |
| sentences_number = len(sentences) | |
| if batch_size > sentences_number: | |
| batch_size = sentences_number | |
| for i in range(0, sentences_number, batch_size): | |
| input_ids_list = [] | |
| attention_mask_list = [] | |
| stop_index = i + batch_size | |
| stop_index = stop_index if stop_index < sentences_number \ | |
| else sentences_number | |
| for j in range(i, stop_index): | |
| features = self._tokenizer.encode_plus( | |
| sentences[j], | |
| add_special_tokens=True, | |
| max_length=self._tokenizer.model_max_length | |
| ) | |
| input_ids, _, attention_mask = ( | |
| features['input_ids'], | |
| features['token_type_ids'], | |
| features['attention_mask'] | |
| ) | |
| input_ids = self._list_to_padded_array(features['input_ids']) | |
| attention_mask = self._list_to_padded_array( | |
| features['attention_mask']) | |
| input_ids_list.append(input_ids) | |
| attention_mask_list.append(attention_mask) | |
| input_dict = { | |
| 'input_ids': np.array(input_ids_list), | |
| 'attention_mask': np.array(attention_mask_list) | |
| } | |
| logit_predictions = self._model.predict_on_batch(input_dict) | |
| yield from ( | |
| [softmax(logit_prediction) | |
| for logit_prediction in logit_predictions[0]] | |
| ) | |
| def _list_to_padded_array(self, items): | |
| array = np.array(items) | |
| padded_array = np.zeros(self._tokenizer.model_max_length, dtype=np.int) | |
| padded_array[:array.shape[0]] = array | |
| return padded_array | |
| def _get_temporary_path(self, name=''): | |
| return f'{AUTOSAVE_PATH}{name}/{int(round(time.time() * 1000))}' | |
| def _reload_model(self): | |
| self._model_path = self._get_temporary_path( | |
| name=self._get_model_family()) | |
| self._dump(self._model_path) | |
| self._load_local_model(self._model_path) | |
| def _load_local_model(self, model_path): | |
| try: | |
| self._tokenizer = AutoTokenizer.from_pretrained( | |
| model_path + '/tokenizer') | |
| self._config = AutoConfig.from_pretrained( | |
| model_path + '/tokenizer') | |
| # Old models didn't use to have a tokenizer folder | |
| except OSError: | |
| self._tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| self._config = AutoConfig.from_pretrained(model_path) | |
| self._model = TFAutoModelForSequenceClassification.from_pretrained( | |
| model_path, | |
| from_pt=False | |
| ) | |
| def _get_model_family(self): | |
| model_family = ''.join(self._model.name[2:].split('_')[:2]) | |
| return model_family | |
| def _load_remote_model(self, model_name, tokenizer_kwargs, model_kwargs): | |
| do_lower_case = False | |
| if 'uncased' in model_name.lower(): | |
| do_lower_case = True | |
| tokenizer_kwargs.update({'do_lower_case': do_lower_case}) | |
| self._tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, **tokenizer_kwargs) | |
| self._config = AutoConfig.from_pretrained(model_name) | |
| temporary_path = self._get_temporary_path() | |
| make_dir(temporary_path) | |
| # TensorFlow model | |
| try: | |
| self._model = TFAutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| from_pt=False | |
| ) | |
| # PyTorch model | |
| except TypeError: | |
| try: | |
| self._model = \ | |
| TFAutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| from_pt=True | |
| ) | |
| # Loading a TF model from a PyTorch checkpoint is not supported | |
| # when using a model identifier name | |
| except OSError: | |
| model = AutoModel.from_pretrained(model_name) | |
| model.save_pretrained(temporary_path) | |
| self._model = \ | |
| TFAutoModelForSequenceClassification.from_pretrained( | |
| temporary_path, | |
| from_pt=True | |
| ) | |
| # Clean the model's last layer if the provided properties are different | |
| clean_last_layer = False | |
| for key, value in model_kwargs.items(): | |
| if not hasattr(self._model.config, key): | |
| clean_last_layer = True | |
| break | |
| if getattr(self._model.config, key) != value: | |
| clean_last_layer = True | |
| break | |
| if clean_last_layer: | |
| try: | |
| getattr(self._model, self._get_model_family() | |
| ).save_pretrained(temporary_path) | |
| self._model = self._model.__class__.from_pretrained( | |
| temporary_path, | |
| from_pt=False, | |
| **model_kwargs | |
| ) | |
| # The model is itself the main layer | |
| except AttributeError: | |
| # TensorFlow model | |
| try: | |
| self._model = self._model.__class__.from_pretrained( | |
| model_name, | |
| from_pt=False, | |
| **model_kwargs | |
| ) | |
| # PyTorch Model | |
| except (OSError, TypeError): | |
| model = AutoModel.from_pretrained(model_name) | |
| model.save_pretrained(temporary_path) | |
| self._model = self._model.__class__.from_pretrained( | |
| temporary_path, | |
| from_pt=True, | |
| **model_kwargs | |
| ) | |
| remove_dir(temporary_path) | |
| assert self._tokenizer and self._model | |