Spaces:
Build error
Build error
| import os | |
| from pathlib import Path | |
| from typing import Literal | |
| from fastapi import BackgroundTasks, HTTPException | |
| from huggingface_hub import ( | |
| CommitOperationAdd, | |
| CommitOperationDelete, | |
| comment_discussion, | |
| create_commit, | |
| create_repo, | |
| delete_repo, | |
| get_repo_discussions, | |
| snapshot_download, | |
| space_info, | |
| ) | |
| from huggingface_hub.repocard import RepoCard | |
| from requests import HTTPError | |
| from gradio_webhooks import GradioWebhookApp, WebhookPayload | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| app = GradioWebhookApp() | |
| async def post_webhook(payload: WebhookPayload, task_queue: BackgroundTasks): | |
| if payload.repo.type != "space": | |
| print("HTTP 400: not a space") | |
| raise HTTPException(400, f"Must be a Space, not {payload.repo.type}") | |
| space_id = payload.repo.name | |
| if ( | |
| payload.event.scope.startswith("discussion") | |
| and payload.event.action == "create" | |
| and payload.discussion is not None | |
| and payload.discussion.isPullRequest | |
| and payload.discussion.status == "open" | |
| ): | |
| # New PR! | |
| if not is_pr_synced(space_id=space_id, pr_num=payload.discussion.num): | |
| task_queue.add_task( | |
| sync_ci_space, | |
| space_id=space_id, | |
| pr_num=payload.discussion.num, | |
| private=payload.repo.private, | |
| ) | |
| print("New PR! Sync task scheduled") | |
| else: | |
| print("New comment on PR but CI space already synced") | |
| elif ( | |
| payload.event.scope.startswith("discussion") | |
| and payload.event.action == "update" | |
| and payload.discussion is not None | |
| and payload.discussion.isPullRequest | |
| and ( | |
| payload.discussion.status == "merged" | |
| or payload.discussion.status == "closed" | |
| ) | |
| ): | |
| # PR merged or closed! | |
| task_queue.add_task( | |
| delete_ci_space, | |
| space_id=space_id, | |
| pr_num=payload.discussion.num, | |
| ) | |
| print("PR is merged (or closed)! Delete task scheduled") | |
| elif ( | |
| payload.event.scope.startswith("repo.content") | |
| and payload.event.action == "update" | |
| ): | |
| # New repo change. Is it a commit on a PR? | |
| # => loop through all PRs and check if new changes happened | |
| print("New repo content update. Checking PRs state.") | |
| for discussion in get_repo_discussions( | |
| repo_id=space_id, repo_type="space", token=HF_TOKEN | |
| ): | |
| if discussion.is_pull_request and discussion.status == "open": | |
| if not is_pr_synced(space_id=space_id, pr_num=discussion.num): | |
| task_queue.add_task( | |
| sync_ci_space, | |
| space_id=space_id, | |
| pr_num=discussion.num, | |
| private=payload.repo.private, | |
| ) | |
| print(f"Scheduled update for PR {discussion.num}.") | |
| print(f"Done looping over PRs.") | |
| else: | |
| print(f"Webhook ignored.") | |
| print(f"Done.") | |
| return {"processed": True} | |
| def is_pr_synced(space_id: str, pr_num: int) -> bool: | |
| # What is the last synced commit for this PR? | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| try: | |
| card = RepoCard.load( | |
| repo_id_or_path=ci_space_id, repo_type="space", token=HF_TOKEN | |
| ) | |
| last_synced_sha = getattr(card.data, "synced_sha", None) | |
| except HTTPError: | |
| last_synced_sha = None | |
| # What is the last commit id for this PR? | |
| info = space_info(repo_id=space_id, revision=f"refs/pr/{pr_num}") | |
| last_pr_sha = info.sha | |
| # Is it up to date ? | |
| return last_synced_sha == last_pr_sha | |
| def sync_ci_space(space_id: str, pr_num: int, private: bool) -> None: | |
| # Create a temporary space for CI if didn't exist | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| try: | |
| create_repo( | |
| ci_space_id, | |
| repo_type="space", | |
| space_sdk="docker", | |
| private=private, | |
| token=HF_TOKEN, | |
| ) | |
| is_new = True | |
| except HTTPError as err: | |
| if err.response.status_code == 409: # already exists | |
| is_new = False | |
| else: | |
| raise | |
| # Download space codebase from PR revision | |
| snapshot_path = Path( | |
| snapshot_download( | |
| repo_id=space_id, | |
| revision=f"refs/pr/{pr_num}", | |
| repo_type="space", | |
| token=HF_TOKEN, | |
| ) | |
| ) | |
| # Sync space codebase with PR revision | |
| operations = [ # little aggressive but works | |
| CommitOperationDelete(".", is_folder=True) | |
| ] | |
| for filepath in snapshot_path.glob("**/*"): | |
| if filepath.is_file(): | |
| path_in_repo = str(filepath.relative_to(snapshot_path)) | |
| # Upload all files without changes except for the README file | |
| if path_in_repo == "README.md": | |
| card = RepoCard.load(filepath) | |
| setattr(card.data, "synced_sha", snapshot_path.name) # latest sha | |
| path_or_fileobj = str(card).encode() | |
| else: | |
| path_or_fileobj = filepath | |
| operations.append( | |
| CommitOperationAdd( | |
| path_in_repo=path_in_repo, path_or_fileobj=path_or_fileobj | |
| ) | |
| ) | |
| create_commit( | |
| repo_id=ci_space_id, | |
| repo_type="space", | |
| operations=operations, | |
| commit_message=f"Sync CI Space with PR {pr_num}.", | |
| token=HF_TOKEN, | |
| ) | |
| # Post a comment on the PR | |
| notify_pr(space_id=space_id, pr_num=pr_num, action="create" if is_new else "update") | |
| def delete_ci_space(space_id: str, pr_num: int) -> None: | |
| # Delete | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| delete_repo(repo_id=ci_space_id, repo_type="space", token=HF_TOKEN) | |
| # Notify about deletion | |
| notify_pr(space_id=space_id, pr_num=pr_num, action="delete") | |
| def notify_pr( | |
| space_id: str, pr_num: int, action: Literal["create", "update", "delete"] | |
| ) -> None: | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| if action == "create": | |
| comment = NOTIFICATION_TEMPLATE_CREATE.format(ci_space_id=ci_space_id) | |
| elif action == "update": | |
| comment = NOTIFICATION_TEMPLATE_UPDATE.format(ci_space_id=ci_space_id) | |
| elif action == "delete": | |
| comment = NOTIFICATION_TEMPLATE_DELETE | |
| else: | |
| raise ValueError(f"Status {action} not handled.") | |
| comment_discussion( | |
| repo_id=space_id, | |
| repo_type="space", | |
| discussion_num=pr_num, | |
| comment=comment, | |
| token=HF_TOKEN, | |
| ) | |
| def _get_ci_space_id(space_id: str, pr_num: int) -> str: | |
| return f"{space_id}-ci-pr-{pr_num}" | |
| NOTIFICATION_TEMPLATE_CREATE = """\ | |
| Hey there! | |
| Following the creation of this PR, a temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been launched. | |
| Any changes pushed to this PR will be synced with the test Space. | |
| (This is an automated message) | |
| """ | |
| NOTIFICATION_TEMPLATE_UPDATE = """\ | |
| Hey there! | |
| Following new commits that happened in this PR, the temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been updated. | |
| (This is an automated message) | |
| """ | |
| NOTIFICATION_TEMPLATE_DELETE = """\ | |
| Hey there! | |
| PR is now merged/closed. The temporary test Space has been deleted. | |
| (This is an automated message) | |
| """ | |
| app.ready() | |