Spaces:
Runtime error
Runtime error
| import re | |
| import string | |
| import numpy as np | |
| from collections import Counter | |
| from typing import List, Set, Tuple, Union | |
| from scipy.optimize import linear_sum_assignment | |
| from word2number.w2n import word_to_num | |
| import json | |
| # copy from https://github.com/allenai/multimodalqa/blob/master/baselines/evaluate.py | |
| ALL_QUESTION_TYPES = [ | |
| 'TextQ', | |
| 'TableQ', | |
| 'ImageQ', | |
| 'ImageListQ', | |
| 'Compose(TableQ,ImageListQ)', | |
| 'Compose(TextQ,ImageListQ)', | |
| 'Compose(ImageQ,TableQ)', | |
| 'Compose(ImageQ,TextQ)', | |
| 'Compose(TextQ,TableQ)', | |
| 'Compose(TableQ,TextQ)', | |
| 'Intersect(TableQ,TextQ)', | |
| 'Intersect(ImageListQ,TableQ)', | |
| 'Intersect(ImageListQ,TextQ)', | |
| 'Compare(Compose(TableQ,ImageQ),TableQ)', | |
| 'Compare(Compose(TableQ,ImageQ),Compose(TableQ,TextQ))', | |
| 'Compare(TableQ,Compose(TableQ,TextQ))', | |
| ] | |
| TEXT_SINGLE_HOP_QUESTION_TYPES = [ | |
| 'TextQ', | |
| ] | |
| TEXT_AS_FIRST_HOP_QUESTION_TYPES = [ | |
| 'Compare(TableQ,Compose(TableQ,TextQ))', | |
| 'Compose(ImageQ,TextQ)', | |
| 'Compose(TableQ,TextQ)', | |
| 'Intersect(TableQ,TextQ)', | |
| 'Intersect(ImageListQ,TextQ)', | |
| ] | |
| TEXT_AS_SECOND_HOP_QUESTION_TYPES = [ | |
| 'Compare(Compose(TableQ,ImageQ),Compose(TableQ,TextQ))', | |
| 'Compose(TextQ,ImageListQ)', | |
| 'Compose(TextQ,TableQ)', | |
| ] | |
| TABLE_SINGLE_HOP_QUESTION_TYPES = [ | |
| "TableQ" | |
| ] | |
| TABLE_AS_FIRST_HOP_QUESTION_TYPES = [ | |
| 'Compose(ImageQ,TableQ)', | |
| 'Compose(TextQ,TableQ)', | |
| ] | |
| TABLE_AS_SECOND_HOP_QUESTION_TYPES = [ | |
| 'Compare(Compose(TableQ,ImageQ),TableQ)', | |
| 'Compare(TableQ,Compose(TableQ,TextQ))', | |
| 'Compose(TableQ,ImageListQ)', | |
| 'Compose(TableQ,TextQ)', | |
| 'Intersect(ImageListQ,TableQ)', | |
| 'Intersect(TableQ,TextQ)', | |
| ] | |
| IMAGE_SINGLE_HOP_QUESTION_TYPES = [ | |
| 'ImageQ', | |
| 'ImageListQ' | |
| ] | |
| IMAGE_AS_FIRST_HOP_QUESTION_TYPES = [ | |
| 'Compare(Compose(TableQ,ImageQ),Compose(TableQ,TextQ))', | |
| 'Compare(Compose(TableQ,ImageQ),TableQ)', | |
| 'Compose(TableQ,ImageListQ)', | |
| 'Compose(TextQ,ImageListQ)', | |
| 'Intersect(ImageListQ,TableQ)', | |
| ] | |
| IMAGE_AS_SECOND_HOP_QUESTION_TYPES = [ | |
| 'Compose(ImageQ,TableQ)', | |
| 'Compose(ImageQ,TextQ)', | |
| 'Intersect(ImageListQ,TextQ)', | |
| ] | |
| # every question should be answered either as a single hop question, or two-hop question | |
| assert set(TEXT_SINGLE_HOP_QUESTION_TYPES + TEXT_AS_SECOND_HOP_QUESTION_TYPES | |
| + TABLE_SINGLE_HOP_QUESTION_TYPES + TABLE_AS_SECOND_HOP_QUESTION_TYPES | |
| + IMAGE_SINGLE_HOP_QUESTION_TYPES + IMAGE_AS_SECOND_HOP_QUESTION_TYPES) == set(ALL_QUESTION_TYPES) | |
| assert len(set(TEXT_SINGLE_HOP_QUESTION_TYPES) & set(TEXT_AS_SECOND_HOP_QUESTION_TYPES)) == 0 | |
| assert len(set(TABLE_SINGLE_HOP_QUESTION_TYPES) & set(TABLE_AS_SECOND_HOP_QUESTION_TYPES)) == 0 | |
| assert len(set(IMAGE_SINGLE_HOP_QUESTION_TYPES) & set(IMAGE_AS_SECOND_HOP_QUESTION_TYPES)) == 0 | |
| SINGLE_HOP_QUESTION_TYPES = TEXT_SINGLE_HOP_QUESTION_TYPES \ | |
| + TABLE_SINGLE_HOP_QUESTION_TYPES \ | |
| + IMAGE_SINGLE_HOP_QUESTION_TYPES | |
| MULTI_HOP_QUESTION_TYPES = TEXT_AS_SECOND_HOP_QUESTION_TYPES \ | |
| + TABLE_AS_SECOND_HOP_QUESTION_TYPES + \ | |
| IMAGE_AS_SECOND_HOP_QUESTION_TYPES | |
| # no duplicated multi-hop question types | |
| assert len(MULTI_HOP_QUESTION_TYPES) == len(set(MULTI_HOP_QUESTION_TYPES)) | |
| # no duplication for the first hop | |
| assert set(TEXT_AS_FIRST_HOP_QUESTION_TYPES + TABLE_AS_FIRST_HOP_QUESTION_TYPES + IMAGE_AS_FIRST_HOP_QUESTION_TYPES) \ | |
| == set(MULTI_HOP_QUESTION_TYPES) | |
| # single + multi = all | |
| assert set(SINGLE_HOP_QUESTION_TYPES + MULTI_HOP_QUESTION_TYPES) == set(ALL_QUESTION_TYPES) | |
| def process_question_for_implicit_decomp(question, question_type, hop=0, bridge_entity='', sep_token='[SEP]'): | |
| if isinstance(bridge_entity, list) or isinstance(bridge_entity, set): | |
| bridge_entity = "; ".join(bridge_entity) | |
| return ( | |
| f'{question_type} {sep_token} ' | |
| f'HOP={hop} {sep_token} ' | |
| f'{bridge_entity} {sep_token} ' | |
| f'{question}') | |
| def extract_numbers_from_str(s): | |
| numbers = [] | |
| for token in s.split(): | |
| try: | |
| num = int(token.replace(",", "")) | |
| except: | |
| try: | |
| num = float(token) | |
| except: | |
| num = None | |
| if num: | |
| numbers.append(num) | |
| return numbers | |
| def read_jsonl(filename): | |
| with open(filename, 'r') as f: | |
| data = [json.loads(l.strip()) for l in f.readlines()] | |
| return data | |
| # From here through _match_numbers_if_present was originally copied from the evaluation code of DROP dataset: | |
| # https://github.com/allenai/allennlp-reading-comprehension/blob/master/allennlp_rc/eval/drop_eval.py | |
| def _remove_articles(text: str) -> str: | |
| regex = re.compile(r"\b(a|an|the)\b", re.UNICODE) | |
| return re.sub(regex, " ", text) | |
| def _white_space_fix(text: str) -> str: | |
| return " ".join(text.split()) | |
| EXCLUDE = set(string.punctuation) | |
| def _remove_punc(text: str) -> str: | |
| if not _is_number(text): | |
| return "".join(ch for ch in text if ch not in EXCLUDE) | |
| else: | |
| return text | |
| def _lower(text: str) -> str: | |
| return text.lower() | |
| def _tokenize(text: str) -> List[str]: | |
| return re.split(" |-", text) | |
| def _normalize_answer(text: str) -> str: | |
| """Lower text and remove punctuation, articles and extra whitespace.""" | |
| parts = [ | |
| _white_space_fix(_remove_articles(_normalize_number(_remove_punc(_lower(token))))) | |
| for token in _tokenize(text) | |
| ] | |
| parts = [part for part in parts if part.strip()] | |
| normalized = " ".join(parts).strip() | |
| return normalized | |
| def _is_number(text: str) -> bool: | |
| try: | |
| float(text) | |
| return True | |
| except ValueError: | |
| return False | |
| def _is_word_number(text: str) -> bool: | |
| try: | |
| word_to_num(text) | |
| return True | |
| except ValueError: | |
| return False | |
| def _normalize_number(text: str) -> str: | |
| if _is_number(text): | |
| return str(float(text)) | |
| #TODO: this is not included in the original drop evaluation script, we need to have our own in the end anyways. | |
| elif _is_word_number(text): | |
| return str(float(word_to_num(text))) | |
| else: | |
| return text | |
| def _answer_to_bags( | |
| answer: Union[str, List[str], Tuple[str, ...]] | |
| ) -> Tuple[List[str], List[Set[str]]]: | |
| if isinstance(answer, (list, tuple)): | |
| raw_spans = answer | |
| else: | |
| raw_spans = [answer] | |
| normalized_spans: List[str] = [] | |
| token_bags = [] | |
| for raw_span in raw_spans: | |
| normalized_span = _normalize_answer(raw_span) | |
| normalized_spans.append(normalized_span) | |
| token_bags.append(set(normalized_span.split())) | |
| return normalized_spans, token_bags | |
| def _align_bags(predicted: List[Set[str]], gold: List[Set[str]]) -> List[float]: | |
| """ | |
| Takes gold and predicted answer sets and first finds the optimal 1-1 alignment | |
| between them and gets maximum metric values over all the answers. | |
| """ | |
| scores = np.zeros([len(gold), len(predicted)]) | |
| for gold_index, gold_item in enumerate(gold): | |
| for pred_index, pred_item in enumerate(predicted): | |
| if _match_numbers_if_present(gold_item, pred_item): | |
| scores[gold_index, pred_index] = _compute_f1(pred_item, gold_item) | |
| row_ind, col_ind = linear_sum_assignment(-scores) | |
| max_scores = np.zeros([max(len(gold), len(predicted))]) | |
| for row, column in zip(row_ind, col_ind): | |
| max_scores[row] = max(max_scores[row], scores[row, column]) | |
| return max_scores | |
| def _compute_f1(predicted_bag: Set[str], gold_bag: Set[str]) -> float: | |
| intersection = len(gold_bag.intersection(predicted_bag)) | |
| if not predicted_bag: | |
| precision = 1.0 | |
| else: | |
| precision = intersection / float(len(predicted_bag)) | |
| if not gold_bag: | |
| recall = 1.0 | |
| else: | |
| recall = intersection / float(len(gold_bag)) | |
| f1 = ( | |
| (2 * precision * recall) / (precision + recall) | |
| if not (precision == 0.0 and recall == 0.0) | |
| else 0.0 | |
| ) | |
| return f1 | |
| def _match_numbers_if_present(gold_bag: Set[str], predicted_bag: Set[str]) -> bool: | |
| gold_numbers = set() | |
| predicted_numbers = set() | |
| for word in gold_bag: | |
| if _is_number(word): | |
| gold_numbers.add(word) | |
| for word in predicted_bag: | |
| if _is_number(word): | |
| predicted_numbers.add(word) | |
| if (not gold_numbers) or gold_numbers.intersection(predicted_numbers): | |
| return True | |
| return False | |
| def acc(predicted, gold): | |
| predicted_bags = _answer_to_bags(predicted) | |
| gold_bags = _answer_to_bags(gold) | |
| if set(predicted_bags[0]) == set(gold_bags[0]) and len(predicted_bags[0]) == len(gold_bags[0]): | |
| return 1.0 | |
| else: | |
| return 0.0 | |
| def f1(predicted, gold): | |
| predicted_bags = _answer_to_bags(predicted) | |
| gold_bags = _answer_to_bags(gold) | |
| f1_per_bag = _align_bags(predicted_bags[1], gold_bags[1]) | |
| f1 = np.mean(f1_per_bag) | |
| f1 = round(f1, 2) | |
| return f1 | |
| def metric_max_over_ground_truths(metric_fn, prediction, gold_answers): | |
| scores_for_ground_truths = [] | |
| for gold_answer in gold_answers: | |
| score = metric_fn(prediction, gold_answer) | |
| scores_for_ground_truths.append(score) | |
| return max(scores_for_ground_truths) | |
| def evaluate_predictions(predictions, gold_answers, example_types=None): | |
| """To support multiple gold annotations, `gold_answers` should be a list, | |
| with each item (either a string or a list) corresponding to one valid reference answer.""" | |
| instance_eval_results = {} | |
| instance_eval_results_by_types = {} | |
| eval_funcs = { | |
| "acc": acc, | |
| "f1": f1 | |
| } | |
| for qas_id in gold_answers: | |
| ref_answers = gold_answers[qas_id] | |
| if qas_id not in predictions: | |
| print(f"Missing prediction for question {qas_id}, and all scores for this question are set to zero") | |
| instance_eval_results[qas_id] = { | |
| metric: 0.0 for metric in eval_funcs.keys() | |
| } | |
| else: | |
| pred_answer = predictions[qas_id] | |
| instance_eval_results[qas_id] = { | |
| metric: metric_max_over_ground_truths( | |
| func, pred_answer, ref_answers | |
| ) for metric, func in eval_funcs.items() | |
| } | |
| if example_types is not None: | |
| example_type = example_types[qas_id] | |
| if example_type not in instance_eval_results_by_types: | |
| instance_eval_results_by_types[example_type] = {} | |
| instance_eval_results_by_types[example_type][qas_id] = instance_eval_results[qas_id] | |
| eval_scores = {metric: np.mean([result[metric] for result in instance_eval_results.values()]) | |
| for metric in eval_funcs.keys()} | |
| if example_types is not None: | |
| eval_scores_by_types = {} | |
| for example_type, type_instance_eval_results in instance_eval_results_by_types.items(): | |
| eval_scores_by_types[example_type] = { | |
| metric: np.mean([result[metric] for result in type_instance_eval_results.values()]) for metric in eval_funcs.keys() | |
| } | |
| return eval_scores, instance_eval_results, eval_scores_by_types | |
| else: | |
| return eval_scores, instance_eval_results | |
| def evaluate_prediction_file(prediction_path, gold_path): | |
| predicted_answers = json.load(open(prediction_path, encoding="utf-8")) | |
| examples = read_jsonl(gold_path) | |
| gold_answers, answer_modalities, hop_types, question_types = {}, {}, {}, {} | |
| for example in examples: | |
| qid = example["qid"] | |
| # Currently we only have one ground truth answer. | |
| # Even if there are multiple entries in example["answers"], the whole list should be regarded as one ref answer. | |
| # However, our script supports evaluation with multiple ref answers. | |
| # So, we will use an outer bracket here to pretend we have a list of ref answers. | |
| gold_answer = [str(item["answer"]) for item in example["answers"]] | |
| gold_answers[qid] = [gold_answer] | |
| answer_modality = set([item["modality"] for item in example["answers"]]) | |
| assert len(answer_modality) == 1 | |
| answer_modalities[qid] = answer_modality.pop() | |
| question_types[qid] = example["metadata"]["type"] | |
| hop_types[qid] = "Multi-hop" if example["metadata"]["type"] in MULTI_HOP_QUESTION_TYPES else "Single-hop" | |
| eval_scores, instance_eval_results = evaluate_predictions(predicted_answers, gold_answers) | |
| print("\n\nOverall result with different metrics: ") | |
| for metric, value in eval_scores.items(): | |
| print(f"{metric}: {value}") | |
| modality_counts = Counter(answer_modalities.values()) | |
| _, _, eval_scores_by_modalities = \ | |
| evaluate_predictions(predicted_answers, gold_answers, answer_modalities) | |
| print("\n\nEval results for different modalities:") | |
| for answer_modality in sorted(eval_scores_by_modalities.keys()): | |
| result = eval_scores_by_modalities[answer_modality] | |
| print(f"{answer_modality}") | |
| print(f"# of examples: {modality_counts[answer_modality]}") | |
| for metric, value in result.items(): | |
| print(f"{metric}: {value}") | |
| hop_type_counts = Counter(hop_types.values()) | |
| _, _, eval_scores_by_hop_types = evaluate_predictions(predicted_answers, gold_answers, hop_types) | |
| print("\n\nType\tCount\tEM\tF1") | |
| for hop_type in sorted(eval_scores_by_hop_types.keys()): | |
| result = eval_scores_by_hop_types[hop_type] | |
| print(f"{hop_type}\t{hop_type_counts[hop_type]}\t{result['acc']}\t{result['f1']}") | |
| question_type_counts = Counter(question_types.values()) | |
| _, _, eval_scores_by_qtypes = evaluate_predictions(predicted_answers, gold_answers, question_types) | |
| print("\n\nType\tCount\tEM\tF1") | |
| for question_type in sorted(eval_scores_by_qtypes.keys()): | |
| result = eval_scores_by_qtypes[question_type] | |
| print(f"{question_type}\t{question_type_counts[question_type]}\t{result['acc']}\t{result['f1']}") | |
| return eval_scores | |
| class EvaluateTool(object): | |
| def __init__(self, args): | |
| self.args = args | |
| def evaluate(self, preds, golds, section): | |
| summary = {} | |
| gold_answers, predicted_answers = {}, {} | |
| for pred, gold in zip(preds, golds): | |
| qid = gold["id"] | |
| gold_answer = [item.strip() for item in gold["answer_text"].split("|")] | |
| gold_answers[qid] = [gold_answer] | |
| predicted_answers[qid] = [item.strip() for item in pred.split("|")] | |
| eval_scores, instance_eval_results = evaluate_predictions(predicted_answers, gold_answers) | |
| for metric, value in eval_scores.items(): | |
| summary[metric] = value | |
| return summary |