Spaces:
Running
Running
| # Copyright 2024 the LlamaFactory team. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import os | |
| from functools import partial | |
| from typing import TYPE_CHECKING, Any, Dict, List, Union | |
| from datasets import Features | |
| from ..extras.logging import get_logger | |
| from .data_utils import Role | |
| if TYPE_CHECKING: | |
| from datasets import Dataset, IterableDataset | |
| from transformers import Seq2SeqTrainingArguments | |
| from ..hparams import DataArguments | |
| from .parser import DatasetAttr | |
| logger = get_logger(__name__) | |
| def _convert_images(images: List[Any], dataset_attr: "DatasetAttr", data_args: "DataArguments") -> List[Any]: | |
| r""" | |
| Optionally concatenates image path to dataset dir when loading from local disk. | |
| """ | |
| outputs = [] | |
| if dataset_attr.load_from in ["script", "file"]: | |
| for image in images: | |
| if isinstance(image, str) and os.path.isfile(os.path.join(data_args.dataset_dir, image)): | |
| outputs.append(os.path.join(data_args.dataset_dir, image)) | |
| else: | |
| outputs.append(image) | |
| return outputs | |
| def convert_alpaca( | |
| examples: Dict[str, List[Any]], dataset_attr: "DatasetAttr", data_args: "DataArguments" | |
| ) -> Dict[str, List[Any]]: | |
| r""" | |
| Converts alpaca format dataset to the standard format. | |
| """ | |
| outputs = {"prompt": [], "response": [], "system": [], "tools": [], "images": []} | |
| convert_images = partial(_convert_images, dataset_attr=dataset_attr, data_args=data_args) | |
| for i in range(len(examples[dataset_attr.prompt])): | |
| prompt = [] | |
| if dataset_attr.history and isinstance(examples[dataset_attr.history][i], list): | |
| for old_prompt, old_response in examples[dataset_attr.history][i]: | |
| prompt.append({"role": Role.USER.value, "content": old_prompt}) | |
| prompt.append({"role": Role.ASSISTANT.value, "content": old_response}) | |
| content = [] | |
| if dataset_attr.prompt and examples[dataset_attr.prompt][i]: | |
| content.append(examples[dataset_attr.prompt][i]) | |
| if dataset_attr.query and examples[dataset_attr.query][i]: | |
| content.append(examples[dataset_attr.query][i]) | |
| prompt.append({"role": Role.USER.value, "content": "\n".join(content)}) # "prompt\nquery" | |
| if dataset_attr.kto_tag and isinstance(examples[dataset_attr.kto_tag][i], bool): # kto example | |
| response = [{"role": Role.ASSISTANT.value, "content": examples[dataset_attr.response][i]}] | |
| if examples[dataset_attr.kto_tag][i]: | |
| response = response + [{"role": Role.ASSISTANT.value, "content": ""}] | |
| else: | |
| response = [{"role": Role.ASSISTANT.value, "content": ""}] + response | |
| elif ( | |
| dataset_attr.ranking | |
| and isinstance(examples[dataset_attr.chosen][i], str) | |
| and isinstance(examples[dataset_attr.rejected][i], str) | |
| ): # pairwise example | |
| response = [ | |
| {"role": Role.ASSISTANT.value, "content": examples[dataset_attr.chosen][i]}, | |
| {"role": Role.ASSISTANT.value, "content": examples[dataset_attr.rejected][i]}, | |
| ] | |
| elif dataset_attr.response and isinstance(examples[dataset_attr.response][i], str): # normal example | |
| response = [{"role": Role.ASSISTANT.value, "content": examples[dataset_attr.response][i]}] | |
| else: # unsupervised | |
| response = [] | |
| outputs["prompt"].append(prompt) | |
| outputs["response"].append(response) | |
| outputs["system"].append(examples[dataset_attr.system][i] if dataset_attr.system else "") | |
| outputs["tools"].append(examples[dataset_attr.tools][i] if dataset_attr.tools else "") | |
| outputs["images"].append(convert_images(examples[dataset_attr.images][i]) if dataset_attr.images else []) | |
| return outputs | |
| def convert_sharegpt( | |
| examples: Dict[str, List[Any]], dataset_attr: "DatasetAttr", data_args: "DataArguments" | |
| ) -> Dict[str, List[Any]]: | |
| r""" | |
| Converts sharegpt format dataset to the standard format. | |
| """ | |
| outputs = {"prompt": [], "response": [], "system": [], "tools": [], "images": []} | |
| convert_images = partial(_convert_images, dataset_attr=dataset_attr, data_args=data_args) | |
| tag_mapping = { | |
| dataset_attr.user_tag: Role.USER.value, | |
| dataset_attr.assistant_tag: Role.ASSISTANT.value, | |
| dataset_attr.observation_tag: Role.OBSERVATION.value, | |
| dataset_attr.function_tag: Role.FUNCTION.value, | |
| dataset_attr.system_tag: Role.SYSTEM.value, | |
| } | |
| odd_tags = (dataset_attr.user_tag, dataset_attr.observation_tag) | |
| even_tags = (dataset_attr.assistant_tag, dataset_attr.function_tag) | |
| accept_tags = (odd_tags, even_tags) | |
| for i, messages in enumerate(examples[dataset_attr.messages]): | |
| if dataset_attr.system_tag and messages[0][dataset_attr.role_tag] == dataset_attr.system_tag: | |
| system = messages[0][dataset_attr.content_tag] | |
| messages = messages[1:] | |
| else: | |
| system = examples[dataset_attr.system][i] if dataset_attr.system else "" | |
| if len(messages) == 0: | |
| continue | |
| aligned_messages = [] | |
| broken_data = False | |
| for turn_idx, message in enumerate(messages): | |
| if message[dataset_attr.role_tag] not in accept_tags[turn_idx % 2]: | |
| logger.warning("Invalid role tag in {}.".format(messages)) | |
| broken_data = True | |
| aligned_messages.append( | |
| {"role": tag_mapping[message[dataset_attr.role_tag]], "content": message[dataset_attr.content_tag]} | |
| ) | |
| if (not dataset_attr.ranking and len(aligned_messages) % 2 != 0) or ( | |
| dataset_attr.ranking and len(aligned_messages) % 2 == 0 | |
| ): | |
| logger.warning("Invalid message count in {}.".format(messages)) | |
| broken_data = True | |
| if dataset_attr.kto_tag and isinstance(examples[dataset_attr.kto_tag][i], bool): # kto example | |
| prompt = aligned_messages[:-1] | |
| response = aligned_messages[-1:] | |
| if examples[dataset_attr.kto_tag][i]: | |
| response = response + [{"role": Role.ASSISTANT.value, "content": ""}] | |
| else: | |
| response = [{"role": Role.ASSISTANT.value, "content": ""}] + response | |
| elif ( | |
| dataset_attr.ranking | |
| and isinstance(examples[dataset_attr.chosen][i], dict) | |
| and isinstance(examples[dataset_attr.rejected][i], dict) | |
| ): # pairwise example | |
| chosen = examples[dataset_attr.chosen][i] | |
| rejected = examples[dataset_attr.rejected][i] | |
| if ( | |
| chosen[dataset_attr.role_tag] not in accept_tags[-1] | |
| or rejected[dataset_attr.role_tag] not in accept_tags[-1] | |
| ): | |
| logger.warning("Invalid role tag in {}.".format([chosen, rejected])) | |
| broken_data = True | |
| prompt = aligned_messages | |
| response = [ | |
| {"role": tag_mapping[chosen[dataset_attr.role_tag]], "content": chosen[dataset_attr.content_tag]}, | |
| {"role": tag_mapping[rejected[dataset_attr.role_tag]], "content": rejected[dataset_attr.content_tag]}, | |
| ] | |
| else: # normal example | |
| prompt = aligned_messages[:-1] | |
| response = aligned_messages[-1:] | |
| if broken_data: | |
| logger.warning("Skipping this abnormal example.") | |
| continue | |
| outputs["prompt"].append(prompt) | |
| outputs["response"].append(response) | |
| outputs["system"].append(system) | |
| outputs["tools"].append(examples[dataset_attr.tools][i] if dataset_attr.tools else "") | |
| outputs["images"].append(convert_images(examples[dataset_attr.images][i]) if dataset_attr.images else []) | |
| return outputs | |
| def align_dataset( | |
| dataset: Union["Dataset", "IterableDataset"], | |
| dataset_attr: "DatasetAttr", | |
| data_args: "DataArguments", | |
| training_args: "Seq2SeqTrainingArguments", | |
| ) -> Union["Dataset", "IterableDataset"]: | |
| r""" | |
| Aligned dataset: | |
| prompt: [{"role": "user", "content": "..."}] * (2T - 1) | |
| response: [{"role": "assistant", "content": "..."}] * N (N > 1 for ranking dataset) | |
| system: "..." | |
| tools: "...", | |
| images: [], | |
| """ | |
| if dataset_attr.formatting == "alpaca": | |
| convert_func = partial(convert_alpaca, dataset_attr=dataset_attr, data_args=data_args) | |
| else: | |
| convert_func = partial(convert_sharegpt, dataset_attr=dataset_attr, data_args=data_args) | |
| column_names = list(next(iter(dataset)).keys()) | |
| features = Features.from_dict( | |
| { | |
| "prompt": [ | |
| {"role": {"dtype": "string", "_type": "Value"}, "content": {"dtype": "string", "_type": "Value"}} | |
| ], | |
| "response": [ | |
| {"role": {"dtype": "string", "_type": "Value"}, "content": {"dtype": "string", "_type": "Value"}} | |
| ], | |
| "system": {"dtype": "string", "_type": "Value"}, | |
| "tools": {"dtype": "string", "_type": "Value"}, | |
| "images": [{"_type": "Image"}], | |
| } | |
| ) | |
| kwargs = {} | |
| if not data_args.streaming: | |
| kwargs = dict( | |
| num_proc=data_args.preprocessing_num_workers, | |
| load_from_cache_file=(not data_args.overwrite_cache) or (training_args.local_process_index != 0), | |
| desc="Converting format of dataset", | |
| ) | |
| return dataset.map( | |
| convert_func, | |
| batched=True, | |
| remove_columns=column_names, | |
| features=features, | |
| **kwargs, | |
| ) | |