Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import random | |
| from datasets import load_dataset, Features, Value | |
| import json | |
| import os | |
| import uuid | |
| from huggingface_hub import HfApi | |
| import time | |
| # # Sample dataset with unique 10-digit IDs | |
| # qa_dataset = { | |
| # "1234567890": { | |
| # "question": "What is the capital of France?", | |
| # "choices": ["A. Berlin", "B. Madrid", "C. Paris", "D. Lisbon"], | |
| # "answer": "C. Paris" | |
| # }, | |
| # "0987654321": { | |
| # "question": "What is the largest planet in our solar system?", | |
| # "choices": ["A. Earth", "B. Jupiter", "C. Saturn", "D. Mars", "E. Venus"], | |
| # "answer": "B. Jupiter" | |
| # }, | |
| # # Add more questions with unique IDs as needed | |
| # } | |
| truth_data = load_dataset("commonsense-index-dev/commonsense-candidates", "iter7-0520", split="train") | |
| LAST_LOG_UPDATE = time.time() | |
| logs = None | |
| qa_dataset = {} | |
| for item in truth_data: | |
| qa_dataset[item["id"]] = { | |
| "question": item["task"], | |
| "choices": item["choices"], | |
| "answer": item["answer"] | |
| } | |
| if "metadata" in item: | |
| qa_dataset[item["id"]]["reason"] = item["metadata"].get("reasoning", "N/A") | |
| def update_logs(): | |
| global LAST_LOG_UPDATE | |
| global logs | |
| # if logs is None or time.time() - LAST_LOG_UPDATE > 3600: | |
| if logs is None: | |
| # update logs for every 60 minutes | |
| logs = load_dataset("commonsense-index-dev/DemoFeedback", split="train") | |
| LAST_LOG_UPDATE = time.time() | |
| update_logs() | |
| def get_random_question(user_name="Anonymous"): | |
| global logs | |
| update_logs() | |
| # if user_name == "": | |
| # user_name = "Anonymous" | |
| # question_id = random.choice(list(qa_dataset.keys())) | |
| # else: | |
| # logs = load_dataset("commonsense-index-dev/DemoFeedback", split="train") | |
| feedback_counts = {qid: 0 for qid in qa_dataset.keys()} | |
| user_seen_data = set() | |
| for item in logs: | |
| feedback_counts[item["question_id"]] += 1 | |
| if True or item["user_name"] == user_name: # TODO: remove this when data is almost all annotated | |
| user_seen_data.add(item["question_id"]) | |
| # sample a question that has the least feedback, and if there are multiple, sample randomly | |
| min_feedback = min(feedback_counts.values()) | |
| question_ids = [k for k, v in feedback_counts.items() if v == min_feedback] | |
| question_ids = list(set(question_ids) - user_seen_data) | |
| question_id = random.choice(question_ids) | |
| question_data = qa_dataset[question_id] | |
| reasoning = question_data["reason"] | |
| return question_id, question_data["question"], question_data["choices"], reasoning | |
| def get_question_by_id(question_id): | |
| if question_id in qa_dataset: | |
| question_data = qa_dataset[question_id] | |
| return question_id, question_data["question"], question_data["choices"] | |
| else: | |
| return None, "Invalid question ID", [] | |
| def check_answer(question_id, choice, reasoning): | |
| correct_answer = qa_dataset[question_id]["answer"] | |
| text = "" | |
| if choice[3:] == correct_answer: | |
| text += "### β Correct!" | |
| text += "\n### Reasoning: " + reasoning | |
| else: | |
| text += "### β Incorrect. Try again!" | |
| return text | |
| def load_question(question_id=None, user_name="Anonymous"): | |
| question_id, question, choices, reasoning = get_random_question(user_name) | |
| question = f"---\n#### QID: {question_id}\n## {question} \n---" | |
| choices_markdown = "\n".join(choices) | |
| return question_id, question, choices_markdown, \ | |
| gr.update(value="", visible=True), reasoning, \ | |
| gr.update(value="", visible=True), \ | |
| gr.update(value="Submit your feedback! π", interactive=True) | |
| def show_buttons(choices_markdown): | |
| choices = choices_markdown.split("\n") | |
| visibility = [gr.update(visible=False)] * 10 | |
| for i in range(len(choices)): | |
| # generate ABCDEFGHIJ labels | |
| choices[i] = chr(65 + i) + ") " + choices[i] | |
| visibility[i] = gr.update(visible=True, value=choices[i]) | |
| return visibility | |
| def submit_feedback(question_id, user_reason, user_revision, example_quality, user_name_text): | |
| if "N/A" in question_id or "N/A" in example_quality: | |
| # send a message to the user to sample an example and select a choice first | |
| return { | |
| submit_button: {"interactive": True, "__type__": "update", "value": "Submit your feedback! π Please sample an example and select a choice!"}, | |
| } | |
| # create a jsonl file and upload it to hf | |
| if user_name_text == "": | |
| user_name_text = "Anonymous" | |
| feedback_item = { | |
| "question_id": question_id, | |
| "user_name": user_name_text, | |
| "user_reason": user_reason, | |
| "revision": user_revision, | |
| "example_quality": example_quality, | |
| } | |
| jsonl_str = json.dumps(feedback_item) | |
| api = HfApi() | |
| token = os.getenv("HF_TOKEN") | |
| if token is None: | |
| raise ValueError("Hugging Face token not found. Ensure the HF_TOKEN environment variable is set.") | |
| # Generate a random filename using UUID | |
| filename = f"{uuid.uuid4()}.json" | |
| # Define the repository | |
| repo_id = "commonsense-index-dev/DemoFeedback" | |
| # Upload the json_str as a file directly to the specified path in your dataset repository | |
| api.upload_file( | |
| token=token, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| path_or_fileobj=jsonl_str.encode("utf-8"), # Convert string to bytes | |
| path_in_repo=filename, | |
| commit_message=f"{user_name_text}'s feedback on {question_id}", | |
| ) | |
| return { | |
| submit_button: {"interactive": False, "__type__": "update", "value": "Submitted! β \n Please sample the next one."} | |
| } | |
| def refresh_feedback(question_id): | |
| return gr.update(value="", visible=True), gr.update(value="", visible=True), gr.update(value="", visible=True), gr.update(value="", visible=True) | |
| with gr.Blocks() as app: | |
| gr.Markdown("# Commonsense Index Data Viewer") | |
| with gr.Row(): | |
| # question_id_input = gr.Textbox(label="Enter Question ID", placeholder="leave empty for random sampling") | |
| random_button = gr.Button("π² Click here to randomly sample an example") | |
| question_display = gr.Markdown(visible=True) | |
| choices_markdown = gr.Markdown(visible=False) | |
| choice_buttons = [gr.Button(visible=False) for _ in range(10)] | |
| result_display = gr.Markdown(visible=True) | |
| reasoning_display = gr.Markdown(visible=False) | |
| question_id = gr.Textbox(label="Question ID:", interactive=False, visible=False) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| reason_textbox = gr.Textbox(label="Reason", placeholder="Please talk why the correct answer is correct and why the others are wrong. If you think this is a bad example, please explain too.", type="text", elem_classes="", max_lines=5, lines=3, show_copy_button=False, visible=True, scale=4, interactive=True) | |
| revision_textbox = gr.Textbox(label="Revision", placeholder="Please suggest a revision to the question.", type="text", elem_classes="", max_lines=5, lines=3, show_copy_button=False, visible=True, scale=4, interactive=True) | |
| with gr.Column(): | |
| example_quality = gr.Radio(label="Quality", choices=["Good", "Bad"], interactive=True, visible=True) | |
| user_name = gr.Textbox(label="Your username", placeholder="Your username", type="text", elem_classes="", max_lines=1, show_copy_button=False, visible=True, interactive=True, show_label=False) | |
| submit_button = gr.Button("Submit your feedback! π", elem_classes="btn_boderline", visible=True, interactive=True) | |
| random_button.click(fn=load_question, inputs=[user_name], outputs=[question_id, question_display, choices_markdown, result_display, reasoning_display, example_quality, submit_button]) | |
| choices_markdown.change(fn=show_buttons, inputs=choices_markdown, outputs=choice_buttons) | |
| question_id.change(fn=refresh_feedback, inputs=[question_id], outputs=[reason_textbox, revision_textbox, example_quality]) | |
| submit_button.click(fn=submit_feedback, inputs=[question_id, reason_textbox, revision_textbox, example_quality, user_name], outputs=[submit_button]) | |
| for i, button in enumerate(choice_buttons): | |
| button.click(fn=check_answer, inputs=[question_id, button, reasoning_display], outputs=result_display) | |
| app.launch() |