Spaces:
Sleeping
Sleeping
| """ | |
| This file has functions to update the meilisearch index with new comments. | |
| Payload from HF webhooklooks like this: | |
| { | |
| "event": { | |
| "action": "update", | |
| "scope": "discussion.comment" | |
| }, | |
| "repo": { | |
| "type": "dataset", | |
| "name": "allenai/objaverse", | |
| "id": "63977bb96bdef8095268ded0", | |
| "private": false, | |
| "url": { | |
| "web": "https://huggingface.co/datasets/allenai/objaverse", | |
| "api": "https://huggingface.co/api/datasets/allenai/objaverse" | |
| }, | |
| "owner": { | |
| "id": "5e70f3648ce3c604d78fe132" | |
| } | |
| }, | |
| "discussion": { | |
| "id": "66f1a1092eb1ea2422555d24", | |
| "title": "PullRequest", | |
| "url": { | |
| "web": "https://huggingface.co/datasets/allenai/objaverse/discussions/63", | |
| "api": "https://huggingface.co/api/datasets/allenai/objaverse/discussions/63" | |
| }, | |
| "status": "draft", | |
| "author": { | |
| "id": "6673e848436907f83a815ab0" | |
| }, | |
| "num": 63, | |
| "isPullRequest": true, | |
| "changes": { | |
| "base": "refs/heads/main" | |
| } | |
| }, | |
| "comment": { | |
| "id": "66f1a1092eb1ea2422555d25", | |
| "author": { | |
| "id": "6673e848436907f83a815ab0" | |
| }, | |
| "hidden": true, | |
| "url": { | |
| "web": "https://huggingface.co/datasets/allenai/objaverse/discussions/63#66f1a1092eb1ea2422555d25" | |
| } | |
| }, | |
| "webhook": { | |
| "id": "66d7991f9b7da501cd100d95", | |
| "version": 3 | |
| } | |
| } | |
| """ | |
| import time | |
| import json | |
| import os | |
| from datetime import datetime, timezone | |
| import requests | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi | |
| from meilisearch import Client | |
| from huggingface_hub import HfApi | |
| from constants import MeilisearchIndexFields | |
| load_dotenv(".env", override=True) | |
| WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") | |
| MEILISEARCH_URL = os.getenv("MS_URL") | |
| MEILISEARCH_KEY = os.getenv("MS_ADMIN_KEY") | |
| ms_client = Client(MEILISEARCH_URL, MEILISEARCH_KEY) | |
| api = HfApi(token=os.environ["HF_WEBHOOK_TOKEN"]) | |
| async def process_webhook(request): | |
| payload = await request.body() | |
| payload = payload.decode("utf-8") | |
| print(payload) | |
| payload = json.loads(payload) | |
| secret = request.headers.get("X-Webhook-Secret") | |
| if secret != WEBHOOK_SECRET: | |
| print("Invalid secret") | |
| return {"error": "Invalid secret"}, 400 | |
| if payload["repo"]["type"] == "model": | |
| if "discussion" not in payload or payload["discussion"]["isPullRequest"] or payload["repo"]["private"]: | |
| return {"status": "skipped"}, 200 | |
| changing_status = "comment" not in payload and payload["event"]["action"] == "update" | |
| if changing_status: | |
| update_discussion_status(payload) | |
| else: | |
| add_new_comment(payload) | |
| return {"status": "success"}, 200 | |
| def user_id_to_username(user_id): | |
| api_url = f"https://huggingface.co/api/users/{user_id}/overview" | |
| try: | |
| response = requests.get(api_url) | |
| return response.json()["user"] | |
| except Exception as e: | |
| print(f"Couldn't get username for id {user_id}: {e}") | |
| return user_id | |
| def add_new_comment(payload): | |
| comment = payload["comment"].get("content", "") | |
| comment_id = payload["comment"]["id"] | |
| repo_id = payload["repo"]["name"] | |
| title = payload["discussion"]["title"] | |
| author_id = payload["comment"]["author"]["id"] | |
| author = user_id_to_username(author_id) | |
| url = payload["discussion"]["url"]["web"] | |
| updatedAt = int(datetime.now(timezone.utc).timestamp()) | |
| status = payload["discussion"]["status"] | |
| melisearch_payload = { | |
| MeilisearchIndexFields.ID.value: comment_id, | |
| MeilisearchIndexFields.TITLE.value: title, | |
| MeilisearchIndexFields.STATUS.value: status, | |
| MeilisearchIndexFields.AUTHOR.value: author, | |
| MeilisearchIndexFields.URL.value: url, | |
| MeilisearchIndexFields.REPO_ID.value: repo_id, | |
| MeilisearchIndexFields.CONTENT.value: comment, | |
| MeilisearchIndexFields.UPDATED_AT.value: updatedAt, | |
| } | |
| ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).add_documents([melisearch_payload]) | |
| def update_discussion_status(payload): | |
| # If closing and commenting at the same time, | |
| # the comment comes with status = open after the webhook that says the discussion is closed. | |
| # Adding the sleep ensures the update comes afterwards | |
| time.sleep(1) | |
| url = payload["discussion"]["url"]["web"] | |
| status = payload["discussion"]["status"] | |
| existing_results = ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).search( | |
| query="", | |
| opt_params={"filter": f"url = '{url}'"} | |
| ) | |
| if len(existing_results["hits"]) > 0: | |
| docs2update = [ | |
| {MeilisearchIndexFields.ID.value: d[MeilisearchIndexFields.ID.value], MeilisearchIndexFields.STATUS.value: status} | |
| for d in existing_results["hits"] | |
| ] | |
| update_request = ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).update_documents(docs2update) | |
| print("Update request:", update_request) | |
| def is_user(user_or_org): | |
| api_url = f"https://huggingface.co/api/users/{user_or_org}/overview" | |
| response = requests.get(api_url) | |
| return response.status_code == 200 | |
| def update_webhooks(): | |
| """ | |
| Update the old webhook every so often with trending models. | |
| """ | |
| print("Updating webhook") | |
| existing_webhooks = api.list_webhooks() | |
| webhook_url = os.environ["HF_WEBHOOK_URL"] | |
| webhook2update = [x for x in existing_webhooks if x.url == webhook_url] | |
| if len(webhook2update) > 1: | |
| print("More than one webhook found") | |
| print(webhook2update) | |
| print("updating the first one") | |
| id2update = webhook2update[0].id | |
| watch_dict = {} | |
| for ww in webhook2update[0].watched: | |
| watch_dict[ww.name] = ww.type | |
| # get trending models | |
| trending_models = api.list_models(sort="likes7d", direction=-1, limit=1000) | |
| to_add = [] | |
| for m in trending_models: | |
| org_or_user = m.id.split("/")[0] | |
| if org_or_user in watch_dict: | |
| continue | |
| if is_user(org_or_user): | |
| to_add.append({"name": m.id, "type": "user"}) | |
| else: | |
| to_add.append({"name": m.id, "type": "org"}) | |
| new_watched = webhook2update[0].watched + to_add | |
| print("There are now", len(new_watched), "items in the watched list") | |
| api.update_webhook( | |
| id=id2update, | |
| url=webhook_url, | |
| watched=new_watched, | |
| domains=["discussion"], | |
| secret=WEBHOOK_SECRET, | |
| ) | |