Spaces:
Running
Running
| import json | |
| import logging | |
| import datasets | |
| import huggingface_hub | |
| import pandas as pd | |
| from transformers import pipeline | |
| import requests | |
| import os | |
| from app_env import HF_WRITE_TOKEN | |
| logger = logging.getLogger(__name__) | |
| AUTH_CHECK_URL = "https://huggingface.co/api/whoami-v2" | |
| logger = logging.getLogger(__file__) | |
| class HuggingFaceInferenceAPIResponse: | |
| def __init__(self, message): | |
| self.message = message | |
| def get_labels_and_features_from_dataset(ds): | |
| try: | |
| dataset_features = ds.features | |
| label_keys = [i for i in dataset_features.keys() if i.startswith("label")] | |
| features = [f for f in dataset_features.keys() if not f.startswith("label")] | |
| if len(label_keys) == 0: # no labels found | |
| # return everything for post processing | |
| return list(dataset_features.keys()), list(dataset_features.keys()), None | |
| labels = None | |
| if not isinstance(dataset_features[label_keys[0]], datasets.ClassLabel): | |
| if hasattr(dataset_features[label_keys[0]], "feature"): | |
| label_feat = dataset_features[label_keys[0]].feature | |
| labels = label_feat.names | |
| else: | |
| labels = ds.unique(label_keys[0]) | |
| else: | |
| labels = dataset_features[label_keys[0]].names | |
| return labels, features, label_keys | |
| except Exception as e: | |
| logging.warning(f"Get Labels/Features Failed for dataset: {e}") | |
| return None, None, None | |
| def check_model_task(model_id): | |
| # check if model is valid on huggingface | |
| try: | |
| task = huggingface_hub.model_info(model_id).pipeline_tag | |
| if task is None: | |
| return None | |
| return task | |
| except Exception: | |
| return None | |
| def get_model_labels(model_id, example_input): | |
| hf_token = os.environ.get(HF_WRITE_TOKEN, default="") | |
| payload = {"inputs": example_input, "options": {"use_cache": True}} | |
| response = hf_inference_api(model_id, hf_token, payload) | |
| if "error" in response: | |
| return None | |
| return extract_from_response(response, "label") | |
| def extract_from_response(data, key): | |
| results = [] | |
| if isinstance(data, dict): | |
| res = data.get(key) | |
| if res is not None: | |
| results.append(res) | |
| for value in data.values(): | |
| results.extend(extract_from_response(value, key)) | |
| elif isinstance(data, list): | |
| for element in data: | |
| results.extend(extract_from_response(element, key)) | |
| return results | |
| def hf_inference_api(model_id, hf_token, payload): | |
| hf_inference_api_endpoint = os.environ.get( | |
| "HF_INFERENCE_ENDPOINT", default="https://api-inference.huggingface.co" | |
| ) | |
| url = f"{hf_inference_api_endpoint}/models/{model_id}" | |
| headers = {"Authorization": f"Bearer {hf_token}"} | |
| response = requests.post(url, headers=headers, json=payload) | |
| if not hasattr(response, "status_code") or response.status_code != 200: | |
| logger.warning(f"Request to inference API returns {response}") | |
| try: | |
| output = response.json() | |
| if "error" in output and "Input is too long" in output["error"]: | |
| payload.update({"parameters": {"truncation": True, "max_length": 512}}) | |
| response = requests.post(url, headers=headers, json=payload) | |
| if not hasattr(response, "status_code") or response.status_code != 200: | |
| logger.warning(f"Request to inference API returns {response}") | |
| return response.json() | |
| except Exception: | |
| return {"error": response.content} | |
| def preload_hf_inference_api(model_id): | |
| payload = { | |
| "inputs": "This is a test", | |
| "options": { | |
| "use_cache": True, | |
| }, | |
| } | |
| hf_token = os.environ.get(HF_WRITE_TOKEN, default="") | |
| hf_inference_api(model_id, hf_token, payload) | |
| def check_model_pipeline(model_id): | |
| try: | |
| task = huggingface_hub.model_info(model_id).pipeline_tag | |
| except Exception: | |
| return None | |
| try: | |
| ppl = pipeline(task=task, model=model_id) | |
| return ppl | |
| except Exception: | |
| return None | |
| def text_classificaiton_match_label_case_unsensative(id2label_mapping, label): | |
| for model_label in id2label_mapping.keys(): | |
| if model_label.upper() == label.upper(): | |
| return model_label, label | |
| return None, label | |
| def text_classification_map_model_and_dataset_labels(id2label, dataset_features): | |
| id2label_mapping = {id2label[k]: None for k in id2label.keys()} | |
| dataset_labels = None | |
| for feature in dataset_features.values(): | |
| if not isinstance(feature, datasets.ClassLabel): | |
| continue | |
| if len(feature.names) != len(id2label_mapping.keys()): | |
| continue | |
| dataset_labels = feature.names | |
| # Try to match labels | |
| for label in feature.names: | |
| if label in id2label_mapping.keys(): | |
| model_label = label | |
| else: | |
| # Try to find case unsensative | |
| model_label, label = text_classificaiton_match_label_case_unsensative( | |
| id2label_mapping, label | |
| ) | |
| if model_label is not None: | |
| id2label_mapping[model_label] = label | |
| else: | |
| print(f"Label {label} is not found in model labels") | |
| return id2label_mapping, dataset_labels | |
| """ | |
| params: | |
| column_mapping: dict | |
| example: { | |
| "text": "sentences", | |
| "label": { | |
| "label0": "LABEL_0", | |
| "label1": "LABEL_1" | |
| } | |
| } | |
| ppl: pipeline | |
| """ | |
| def check_column_mapping_keys_validity(column_mapping, ppl): | |
| # get the element in all the list elements | |
| column_mapping = json.loads(column_mapping) | |
| if "data" not in column_mapping.keys(): | |
| return True | |
| user_labels = set([pair[0] for pair in column_mapping["data"]]) | |
| model_labels = set([pair[1] for pair in column_mapping["data"]]) | |
| id2label = ppl.model.config.id2label | |
| original_labels = set(id2label.values()) | |
| return user_labels == model_labels == original_labels | |
| """ | |
| params: | |
| column_mapping: dict | |
| dataset_features: dict | |
| example: { | |
| 'text': Value(dtype='string', id=None), | |
| 'label': ClassLabel(names=['negative', 'neutral', 'positive'], id=None) | |
| } | |
| """ | |
| def infer_text_input_column(column_mapping, dataset_features): | |
| # Check whether we need to infer the text input column | |
| infer_text_input_column = True | |
| feature_map_df = None | |
| if "text" in column_mapping.keys(): | |
| dataset_text_column = column_mapping["text"] | |
| if dataset_text_column in dataset_features.keys(): | |
| infer_text_input_column = False | |
| else: | |
| logging.warning(f"Provided {dataset_text_column} is not in Dataset columns") | |
| if infer_text_input_column: | |
| # Try to retrieve one | |
| candidates = [ | |
| f for f in dataset_features if dataset_features[f].dtype == "string" | |
| ] | |
| feature_map_df = pd.DataFrame( | |
| {"Dataset Features": [candidates[0]], "Model Input Features": ["text"]} | |
| ) | |
| if len(candidates) > 0: | |
| logging.debug(f"Candidates are {candidates}") | |
| column_mapping["text"] = candidates[0] | |
| return column_mapping, feature_map_df | |
| """ | |
| params: | |
| column_mapping: dict | |
| id2label_mapping: dict | |
| example: | |
| id2label_mapping: { | |
| 'negative': 'negative', | |
| 'neutral': 'neutral', | |
| 'positive': 'positive' | |
| } | |
| """ | |
| def infer_output_label_column( | |
| column_mapping, id2label_mapping, id2label, dataset_labels | |
| ): | |
| # Check whether we need to infer the output label column | |
| if "data" in column_mapping.keys(): | |
| if isinstance(column_mapping["data"], list): | |
| # Use the column mapping passed by user | |
| for user_label, model_label in column_mapping["data"]: | |
| id2label_mapping[model_label] = user_label | |
| elif None in id2label_mapping.values(): | |
| column_mapping["label"] = {i: None for i in id2label.keys()} | |
| return column_mapping, None | |
| if "data" not in column_mapping.keys(): | |
| # Column mapping should contain original model labels | |
| column_mapping["label"] = { | |
| str(i): id2label_mapping[label] | |
| for i, label in zip(id2label.keys(), dataset_labels) | |
| } | |
| id2label_df = pd.DataFrame( | |
| { | |
| "Dataset Labels": dataset_labels, | |
| "Model Prediction Labels": [ | |
| id2label_mapping[label] for label in dataset_labels | |
| ], | |
| } | |
| ) | |
| return column_mapping, id2label_df | |
| def check_dataset_features_validity(d_id, config, split): | |
| # We assume dataset is ok here | |
| ds = datasets.load_dataset(d_id, config, split=split, trust_remote_code=True) | |
| try: | |
| dataset_features = ds.features | |
| except AttributeError: | |
| # Dataset does not have features, need to provide everything | |
| return None, None | |
| # Load dataset as DataFrame | |
| df = ds.to_pandas() | |
| return df, dataset_features | |
| def select_the_first_string_column(ds): | |
| for feature in ds.features.keys(): | |
| if isinstance(ds[0][feature], str): | |
| return feature | |
| return None | |
| def get_example_prediction( | |
| model_id, dataset_id, dataset_config, dataset_split, hf_token | |
| ): | |
| # get a sample prediction from the model on the dataset | |
| prediction_input = None | |
| prediction_result = None | |
| try: | |
| # Use the first item to test prediction | |
| ds = datasets.load_dataset( | |
| dataset_id, dataset_config, split=dataset_split, trust_remote_code=True | |
| ) | |
| if "text" not in ds.features.keys(): | |
| # Dataset does not have text column | |
| prediction_input = ds[0][select_the_first_string_column(ds)] | |
| else: | |
| prediction_input = ds[0]["text"] | |
| payload = {"inputs": prediction_input, "options": {"use_cache": True}} | |
| results = hf_inference_api(model_id, hf_token, payload) | |
| if isinstance(results, dict) and "error" in results.keys(): | |
| if "estimated_time" in results.keys(): | |
| return prediction_input, HuggingFaceInferenceAPIResponse( | |
| f"Estimated time: {int(results['estimated_time'])}s. Please try again later." | |
| ) | |
| return prediction_input, HuggingFaceInferenceAPIResponse( | |
| f"Inference Error: {results['error']}." | |
| ) | |
| while isinstance(results, list): | |
| if isinstance(results[0], dict): | |
| break | |
| results = results[0] | |
| prediction_result = { | |
| f'{result["label"]}': result["score"] for result in results | |
| } | |
| except Exception as e: | |
| # inference api prediction failed, show the error message | |
| logger.error(f"Get example prediction failed {e}") | |
| return prediction_input, None | |
| return prediction_input, prediction_result | |
| def get_sample_prediction(ppl, df, column_mapping, id2label_mapping): | |
| # get a sample prediction from the model on the dataset | |
| prediction_input = None | |
| prediction_result = None | |
| try: | |
| # Use the first item to test prediction | |
| prediction_input = df.head(1).at[0, column_mapping["text"]] | |
| results = ppl({"text": prediction_input}, top_k=None) | |
| prediction_result = { | |
| f'{result["label"]}': result["score"] for result in results | |
| } | |
| except Exception: | |
| # Pipeline prediction failed, need to provide labels | |
| return prediction_input, None | |
| # Display results in original label and mapped label | |
| prediction_result = { | |
| f'{result["label"]}(original) - {id2label_mapping[result["label"]]}(mapped)': result[ | |
| "score" | |
| ] | |
| for result in results | |
| } | |
| return prediction_input, prediction_result | |
| def text_classification_fix_column_mapping(column_mapping, ppl, d_id, config, split): | |
| # load dataset as pd DataFrame | |
| # get features column from dataset | |
| df, dataset_features = check_dataset_features_validity(d_id, config, split) | |
| column_mapping, feature_map_df = infer_text_input_column( | |
| column_mapping, dataset_features | |
| ) | |
| if feature_map_df is None: | |
| # dataset does not have any features | |
| return None, None, None, None, None | |
| # Retrieve all labels | |
| id2label = ppl.model.config.id2label | |
| # Infer labels | |
| id2label_mapping, dataset_labels = text_classification_map_model_and_dataset_labels( | |
| id2label, dataset_features | |
| ) | |
| column_mapping, id2label_df = infer_output_label_column( | |
| column_mapping, id2label_mapping, id2label, dataset_labels | |
| ) | |
| if id2label_df is None: | |
| # does not able to infer output label column | |
| return column_mapping, None, None, None, feature_map_df | |
| # Get a sample prediction | |
| prediction_input, prediction_result = get_sample_prediction( | |
| ppl, df, column_mapping, id2label_mapping | |
| ) | |
| if prediction_result is None: | |
| # does not able to get a sample prediction | |
| return column_mapping, prediction_input, None, id2label_df, feature_map_df | |
| return ( | |
| column_mapping, | |
| prediction_input, | |
| prediction_result, | |
| id2label_df, | |
| feature_map_df, | |
| ) | |
| def check_hf_token_validity(hf_token): | |
| if hf_token == "": | |
| return False | |
| if not isinstance(hf_token, str): | |
| return False | |
| # use huggingface api to check the token | |
| headers = {"Authorization": f"Bearer {hf_token}"} | |
| response = requests.get(AUTH_CHECK_URL, headers=headers) | |
| if response.status_code != 200: | |
| return False | |
| return True | |