Spaces:
Runtime error
Runtime error
| import collections | |
| import os | |
| from typing import Dict | |
| import streamlit as st | |
| from datasets import get_dataset_config_names | |
| from jinja2 import Environment, FileSystemLoader | |
| import utils | |
| from configuration import OPTIMIZERS_ACCELERATE, OPTIMIZERS_TRAINER, TASKS, TASKS_TO_PIPELINE_TAG | |
| from utils import (get_dataset_infos_dict, get_datasets, get_model_to_model_id, | |
| render_features) | |
| def show_API_component(inputs: Dict[str, str]) -> Dict[str, str]: | |
| template_dict = collections.defaultdict() | |
| template_dirs = [ | |
| f for f in os.scandir("templates") if f.is_dir() and f.name != "example" | |
| ] | |
| template_dirs = sorted(template_dirs, key=lambda e: e.name) | |
| for template_dir in template_dirs: | |
| template_dict[template_dir.name] = template_dir.path | |
| st.write("## API") | |
| inputs['api'] = st.selectbox( | |
| "Which Hugging Face API do you want to use?", list(template_dict.keys()) | |
| ) | |
| inputs['template_dir'] = template_dict.get(inputs['api']) | |
| return inputs | |
| def show_model_component(inputs: Dict[str, str]) -> Dict[str, str]: | |
| model_info = get_model_to_model_id() | |
| models = model_info['model_to_model_id'] | |
| models_pipeline = model_info["model_to_pipeline_tag"] | |
| st.write("## Model") | |
| models_for_task = [] | |
| for model in models: | |
| if (models_pipeline[model] == inputs["nlp_task"]): | |
| models_for_task.append(model) | |
| model = st.selectbox("Which model?", list(models_for_task)) | |
| inputs["model_checkpoint"] = models.get(model) | |
| inputs["pretrained"] = st.checkbox("Use pre-trained model") | |
| return inputs | |
| def show_task_component(inputs: Dict[str, str]) -> Dict[str, str]: | |
| st.write("## Task") | |
| task = st.selectbox("Which task?", TASKS) | |
| inputs["task"] = task | |
| inputs["nlp_task"] = st.selectbox( | |
| "Which NLP task?", TASKS_TO_PIPELINE_TAG[task]) | |
| return inputs | |
| def show_input_data_component(inputs: Dict[str, str]) -> Dict[str, str]: | |
| st.write("## Input data") | |
| english_datasets = get_datasets() | |
| english_datasets_for_task = [] | |
| for dataset in english_datasets: | |
| for task_category in english_datasets[dataset]: | |
| if task_category == inputs["nlp_task"]: | |
| english_datasets_for_task.append(dataset) | |
| continue | |
| inputs["dataset"] = st.selectbox( | |
| "Which one?", tuple(english_datasets_for_task) | |
| ) | |
| configs = get_dataset_config_names(inputs["dataset"]) | |
| inputs["subset"] = st.selectbox("Which subset?", list(configs)) | |
| data_info_dict = get_dataset_infos_dict( | |
| inputs["dataset"], inputs["subset"]) | |
| assert data_info_dict.splits is not None | |
| if 'train' in list(data_info_dict.splits.keys()): | |
| train_index = list(data_info_dict.splits.keys()).index('train') | |
| else: | |
| train_index = 0 | |
| inputs["train"] = st.selectbox("Which split for training?", list( | |
| data_info_dict.splits.keys()), index=train_index) | |
| if 'validation' in list(data_info_dict.splits.keys()): | |
| validation_index = list( | |
| data_info_dict.splits.keys()).index('validation') | |
| else: | |
| validation_index = len(list(data_info_dict.splits.keys()))-1 | |
| inputs["validation"] = st.selectbox("Which split for validation?", list( | |
| data_info_dict.splits.keys()), index=validation_index) | |
| assert data_info_dict.features is not None | |
| feature_index = 0 | |
| if inputs["nlp_task"] == 'translation': | |
| if 'translation' in list(data_info_dict.features.keys()): | |
| feature_index = list( | |
| data_info_dict.features.keys()).index('translation') | |
| inputs["feature"] = st.selectbox( | |
| "Which data feature?", list(data_info_dict.features.keys()), feature_index) | |
| if inputs["feature"] == 'translation': | |
| inputs["source_language"] = st.selectbox( | |
| "Which language for source?", list(data_info_dict.features['translation'].languages)) | |
| inputs["target_language"] = st.selectbox( | |
| "Which language for target?", list(data_info_dict.features['translation'].languages)) | |
| return inputs | |
| def show_preprocessing_component(inputs: Dict[str, str]) -> Dict[str, str]: | |
| st.write("## Preprocessing") | |
| inputs["block_size"] = st.number_input( | |
| "The length of each block (i.e. context size)", 1, None, 128) | |
| if inputs["task"] == "MaskedLM": | |
| inputs["mlm_probability"] = st.number_input( | |
| "The probability with which to (randomly) mask tokens in the input", 0.0, 1.00, 0.15) | |
| inputs["whole_word_masking"] = st.checkbox( | |
| "Use whole word masking") | |
| return inputs | |
| def show_training_comoponent(inputs: Dict[str, str]) -> Dict[str, str]: | |
| st.write("## Training") | |
| # inputs['with_tracker'] = st.selectbox( | |
| # "Loggers to monitor the training ", ["none", "all", "tensorboard", "wandb", "comet_ml"]) | |
| inputs["seed"] = st.number_input( | |
| "Seed", 1, None, 4) | |
| if inputs['api'] == 'Accelerate': | |
| optimizer_dict_to_use = OPTIMIZERS_ACCELERATE | |
| else: | |
| optimizer_dict_to_use = OPTIMIZERS_TRAINER | |
| inputs["optimizer"] = st.selectbox( | |
| "Optimizer", list(optimizer_dict_to_use.keys())) | |
| default_lr = optimizer_dict_to_use[inputs["optimizer"]] | |
| inputs["lr"] = st.number_input( | |
| "Learning rate", 0.000, None, default_lr, format="%f" | |
| ) | |
| inputs["use_weight_decay"] = st.checkbox("Use weight decay") | |
| if inputs["use_weight_decay"]: | |
| inputs["weight_decay"] = st.number_input( | |
| "Weight decay", 0.000, None, 0.01, format="%f" | |
| ) | |
| inputs["gradient_accumulation_steps"] = st.number_input( | |
| "Gradient Accumulation Steps", 1, None, 8) | |
| inputs['lr_scheduler_type'] = st.selectbox( | |
| "The scheduler type to use", ["linear", "cosine", "cosine_with_restarts", "polynomial", "constant", "constant_with_warmup"]) | |
| inputs['num_warmup_steps'] = st.number_input( | |
| "Num warmup steps", 0, None, 0) | |
| inputs["batch_size"] = st.number_input("Batch size", 1, None, 32) | |
| inputs["num_epochs"] = st.number_input("Epochs", 1, None, 3) | |
| return inputs | |
| def show_datset_view_component(inputs: Dict[str, str]) -> Dict[str, str]: | |
| data_info_dict = get_dataset_infos_dict( | |
| inputs["dataset"], inputs["subset"]) | |
| st.write(f'## Dataset view: {inputs["dataset"]}/{inputs["subset"]}') | |
| st.markdown( | |
| "*Homepage*: " | |
| + data_info_dict.homepage | |
| + "\n\n*Dataset*: https://github.com/huggingface/datasets/blob/master/datasets/%s/%s.py" | |
| % (inputs["dataset"], inputs["dataset"]) | |
| ) | |
| s = [] | |
| s .append('dataset' + "=" + inputs["dataset"]) | |
| s.append('config' + "=" + inputs["subset"]) | |
| st.markdown( | |
| "*Permalink*: https://huggingface.co/datasets/viewer/?" | |
| + "&".join(s) | |
| ) | |
| # https://github.com/huggingface/datasets-viewer/blob/master/run.py#L282 | |
| st.write(f'{data_info_dict.description}') | |
| st.write(render_features(data_info_dict.features)) | |
| # TODO make a conditional if the size of the data is too big, switch to streaming mode | |
| # TODO cashe this part of the code | |
| # selected_dataset = load_dataset( | |
| # inputs["dataset"], inputs["subset"], split=inputs["train"], streaming=True) | |
| # print(selected_dataset) | |
| # print(next(iter(selected_dataset))) | |
| return inputs | |
| def show_code_component(inputs: Dict[str, str]) -> Dict[str, str]: | |
| # Generate code and notebook based on template.py.jinja file in the template dir. | |
| env = Environment( | |
| loader=FileSystemLoader(inputs['template_dir']), trim_blocks=True, lstrip_blocks=True, | |
| ) | |
| template = env.get_template(f'task_templates/{inputs["nlp_task"]}.py.jinja') | |
| code = template.render(header=utils.code_header, notebook=False, **inputs) | |
| notebook_code = template.render( | |
| header=utils.notebook_header, notebook=True, **inputs) | |
| notebook = utils.to_notebook(notebook_code) | |
| st.write(f'## Code view: {inputs["api"]}') | |
| st.write("") # add vertical space | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| utils.download_button(code, "generated-code.py", "π Download (.py)") | |
| with col2: | |
| utils.download_button( | |
| notebook, "generated-notebook.ipynb", "π Download (.ipynb)") | |
| colab_error = st.empty() | |
| # Display code. | |
| st.code(code) | |
| return inputs | |