Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import logging | |
| import os | |
| from datetime import datetime | |
| from typing import List, Optional, Tuple | |
| import polars as pl | |
| import requests | |
| import stamina | |
| from chromadb.utils import embedding_functions | |
| from dotenv import load_dotenv | |
| from huggingface_hub import InferenceClient | |
| from tqdm.contrib.concurrent import thread_map | |
| from utils import get_collection, get_chroma_client | |
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| EMBEDDING_MODEL_NAME = "Alibaba-NLP/gte-large-en-v1.5" | |
| EMBEDDING_MODEL_REVISION = "104333d6af6f97649377c2afbde10a7704870c7b" | |
| INFERENCE_MODEL_URL = ( | |
| "https://spwy1g6626yhjhpr.us-east-1.aws.endpoints.huggingface.cloud" | |
| ) | |
| DATASET_PARQUET_URL = ( | |
| "hf://datasets/librarian-bots/dataset_cards_with_metadata/data/train-*.parquet" | |
| ) | |
| COLLECTION_NAME = "dataset_cards" | |
| MAX_EMBEDDING_LENGTH = 8192 | |
| def card_embedding_function(): | |
| logger.info(f"Initializing embedding function with model: {EMBEDDING_MODEL_NAME}") | |
| return embedding_functions.SentenceTransformerEmbeddingFunction( | |
| model_name=EMBEDDING_MODEL_NAME, | |
| trust_remote_code=True, | |
| revision=EMBEDDING_MODEL_REVISION, | |
| ) | |
| def get_last_modified_in_collection(collection) -> datetime | None: | |
| logger.info("Fetching last modified date from collection") | |
| try: | |
| all_items = collection.get(include=["metadatas"]) | |
| if last_modified := [ | |
| datetime.fromisoformat(item["last_modified"]) | |
| for item in all_items["metadatas"] | |
| ]: | |
| last_mod = max(last_modified) | |
| logger.info(f"Last modified date: {last_mod}") | |
| return last_mod | |
| else: | |
| logger.info("No last modified date found") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error fetching last modified date: {str(e)}") | |
| return None | |
| def parse_markdown_column( | |
| df: pl.DataFrame, markdown_column: str, dataset_id_column: str | |
| ) -> pl.DataFrame: | |
| logger.info("Parsing markdown column") | |
| return df.with_columns( | |
| parsed_markdown=( | |
| pl.col(markdown_column) | |
| .str.extract(r"(?s)^---.*?---\s*(.*)", group_index=1) | |
| .fill_null(pl.col(markdown_column)) | |
| .str.strip_chars() | |
| ), | |
| prepended_markdown=( | |
| pl.concat_str( | |
| [ | |
| pl.lit("Dataset ID "), | |
| pl.col(dataset_id_column).cast(pl.Utf8), | |
| pl.lit("\n\n"), | |
| pl.col(markdown_column) | |
| .str.extract(r"(?s)^---.*?---\s*(.*)", group_index=1) | |
| .fill_null(pl.col(markdown_column)) | |
| .str.strip_chars(), | |
| ] | |
| ) | |
| ), | |
| ) | |
| def is_unmodified_template(card: str) -> bool: | |
| # Check for a combination of template-specific phrases | |
| template_indicators = [ | |
| "# Dataset Card for Dataset Name", | |
| "<!-- Provide a quick summary of the dataset. -->", | |
| "This dataset card aims to be a base template for new datasets", | |
| "[More Information Needed]", | |
| ] | |
| # Count how many indicators are present | |
| indicator_count = sum(indicator in card for indicator in template_indicators) | |
| # Check if the card contains a high number of "[More Information Needed]" occurrences | |
| more_info_needed_count = card.count("[More Information Needed]") | |
| # Consider it an unmodified template if it has most of the indicators | |
| # and a high number of "[More Information Needed]" occurrences | |
| return indicator_count >= 3 or more_info_needed_count >= 7 | |
| def load_cards( | |
| min_len: int = 50, | |
| min_likes: int | None = None, | |
| last_modified: Optional[datetime] = None, | |
| ) -> Optional[Tuple[List[str], List[str], List[datetime]]]: | |
| logger.info( | |
| f"Loading cards with min_len={min_len}, min_likes={min_likes}, last_modified={last_modified}" | |
| ) | |
| df = pl.read_parquet(DATASET_PARQUET_URL) | |
| df = df.filter(~pl.col("tags").list.contains("not-for-all-audiences")) | |
| df = parse_markdown_column(df, "card", "datasetId") | |
| df = df.with_columns(pl.col("parsed_markdown").str.len_chars().alias("card_len")) | |
| df = df.filter(pl.col("card_len") > min_len) | |
| if min_likes: | |
| df = df.filter(pl.col("likes") > min_likes) | |
| if last_modified: | |
| df = df.filter(pl.col("last_modified") > last_modified) | |
| # Filter out unmodified template cards | |
| df = df.filter( | |
| ~pl.col("prepended_markdown").map_elements( | |
| is_unmodified_template, return_dtype=pl.Boolean | |
| ) | |
| ) | |
| if len(df) == 0: | |
| logger.info("No cards found matching criteria") | |
| return None | |
| cards = df.get_column("prepended_markdown").to_list() | |
| model_ids = df.get_column("datasetId").to_list() | |
| last_modifieds = df.get_column("last_modified").to_list() | |
| logger.info(f"Loaded {len(cards)} cards") | |
| return cards, model_ids, last_modifieds | |
| def embed_card(text, client): | |
| text = text[:MAX_EMBEDDING_LENGTH] | |
| return client.feature_extraction(text) | |
| def get_inference_client(): | |
| logger.info(f"Initializing inference client with model: {INFERENCE_MODEL_URL}") | |
| return InferenceClient( | |
| model=INFERENCE_MODEL_URL, | |
| token=HF_TOKEN, | |
| ) | |
| def refresh_card_data(min_len: int = 250, min_likes: Optional[int] = None): | |
| logger.info(f"Starting data refresh with min_len={min_len}, min_likes={min_likes}") | |
| embedding_function = card_embedding_function() | |
| chroma_client = get_chroma_client() | |
| collection = get_collection(chroma_client, embedding_function, COLLECTION_NAME) | |
| most_recent = get_last_modified_in_collection(collection) | |
| if data := load_cards( | |
| min_len=min_len, min_likes=min_likes, last_modified=most_recent | |
| ): | |
| _create_and_upsert_embeddings(data, collection) | |
| else: | |
| logger.info("No new data to refresh") | |
| def _create_and_upsert_embeddings(data, collection): | |
| cards, model_ids, last_modifieds = data | |
| logger.info("Embedding cards...") | |
| inference_client = get_inference_client() | |
| results = thread_map(lambda card: embed_card(card, inference_client), cards) | |
| logger.info(f"Upserting {len(model_ids)} items to collection") | |
| collection.upsert( | |
| ids=model_ids, | |
| embeddings=[embedding.tolist()[0] for embedding in results], | |
| metadatas=[{"last_modified": str(lm)} for lm in last_modifieds], | |
| ) | |
| logger.info("Data refresh completed successfully") | |
| if __name__ == "__main__": | |
| refresh_card_data() | |