Spaces:
Runtime error
Runtime error
| #!/usr/bin/python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| https://huggingface.co/spaces/fffiloni/langchain-chat-with-pdf-openai | |
| """ | |
| import argparse | |
| import httpx | |
| import importlib | |
| import json | |
| import logging | |
| import os | |
| import platform | |
| import shutil | |
| import time | |
| from typing import List, Tuple | |
| logging.basicConfig( | |
| level=logging.INFO if platform.system() == "Windows" else logging.INFO, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| import gradio as gr | |
| import openai | |
| from openai import OpenAI | |
| from PIL import Image | |
| import project_settings as settings | |
| from project_settings import project_path | |
| logger = logging.getLogger(__name__) | |
| def get_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--examples_json_file", | |
| default="examples.json", | |
| type=str | |
| ) | |
| parser.add_argument( | |
| "--history_json_file", | |
| default="history.json", | |
| type=str | |
| ) | |
| parser.add_argument( | |
| "--description_md_file", | |
| default="description.md", | |
| type=str | |
| ) | |
| parser.add_argument( | |
| "--openai_api_key", | |
| default=settings.environment.get("openai_api_key", default=None, dtype=str), | |
| type=str | |
| ) | |
| args = parser.parse_args() | |
| return args | |
| def dynamic_import_function(package_name: str, function_name: str): | |
| try: | |
| lib = importlib.import_module("functions.{}".format(package_name)) | |
| except ModuleNotFoundError as e: | |
| raise e | |
| function = getattr(lib, function_name) | |
| # del lib | |
| return function | |
| def click_create_assistant(openai_api_key: str, | |
| name: str, | |
| instructions: str, | |
| description: str, | |
| tools: str, | |
| files: List[str], | |
| file_ids: str, | |
| model: str, | |
| ): | |
| logger.info("click create assistant, name: {}".format(name)) | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| # tools | |
| tools = str(tools).strip() | |
| if tools is not None and len(tools) != 0: | |
| tools = tools.split("\n") | |
| tools = [json.loads(tool) for tool in tools if len(tool.strip()) != 0] | |
| else: | |
| tools = list() | |
| # files | |
| if files is not None and len(files) != 0: | |
| files = [ | |
| client.files.create( | |
| file=open(file, "rb"), | |
| purpose='assistants' | |
| ) for file in files | |
| ] | |
| else: | |
| files = list() | |
| # file_ids | |
| file_ids = str(file_ids).strip() | |
| if file_ids is not None and len(file_ids) != 0: | |
| file_ids = file_ids.split("\n") | |
| file_ids = [file_id.strip() for file_id in file_ids if len(file_id.strip()) != 0] | |
| else: | |
| file_ids = list() | |
| # assistant | |
| assistant = client.beta.assistants.create( | |
| name=name, | |
| instructions=instructions, | |
| description=description, | |
| tools=tools, | |
| file_ids=file_ids + [file.id for file in files], | |
| model=model, | |
| ) | |
| assistant_id = assistant.id | |
| return assistant_id, None | |
| def click_list_assistant(openai_api_key: str) -> str: | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| assistant_list = client.beta.assistants.list() | |
| assistant_list = assistant_list.model_dump(mode="json") | |
| result = "" | |
| for a in assistant_list["data"]: | |
| assis = "id: \n{}\nname: \n{}\ndescription: \n{}\n\n".format(a["id"], a["name"], a["description"]) | |
| result += assis | |
| return result | |
| def click_delete_assistant(openai_api_key: str, | |
| assistant_id: str) -> str: | |
| assistant_id = assistant_id.strip() | |
| logger.info("click delete assistant, assistant_id: {}".format(assistant_id)) | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| try: | |
| assistant_deleted = client.beta.assistants.delete(assistant_id=assistant_id) | |
| result = "success" if assistant_deleted.deleted else "failed" | |
| except openai.NotFoundError as e: | |
| result = e.message | |
| return result | |
| def click_delete_all_assistant(openai_api_key: str): | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| assistant_list = client.beta.assistants.list() | |
| for a in assistant_list.data: | |
| client.beta.assistants.delete(a.id) | |
| return None | |
| def click_list_file(openai_api_key: str): | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| file_list = client.files.list() | |
| file_list = file_list.model_dump(mode="json") | |
| result = "" | |
| for f in file_list["data"]: | |
| file = "id: \n{}\nfilename: \n{}\nbytes: \n{}\nstatus: \n{}\n\n".format( | |
| f["id"], f["filename"], f["bytes"], f["status"] | |
| ) | |
| result += file | |
| return result | |
| def click_delete_file(openai_api_key: str, | |
| file_id: str) -> str: | |
| file_id = file_id.strip() | |
| logger.info("click delete file, file_id: {}".format(file_id)) | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| try: | |
| assistant_deleted = client.files.delete(file_id=file_id) | |
| result = "success" if assistant_deleted.deleted else "failed" | |
| except openai.NotFoundError as e: | |
| result = e.message | |
| except httpx.InvalidURL as e: | |
| result = str(e) | |
| return result | |
| def click_upload_files(openai_api_key: str, | |
| files: List[str], | |
| ): | |
| logger.info("click upload files, files: {}".format(files)) | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| result = list() | |
| if files is not None and len(files) != 0: | |
| files = [ | |
| client.files.create( | |
| file=open(file, "rb"), | |
| purpose='assistants' | |
| ) for file in files | |
| ] | |
| file_ids = [file.id for file in files] | |
| result.extend(file_ids) | |
| return result | |
| def click_list_function_python_script(): | |
| function_script_dir = project_path / "functions" | |
| result = "" | |
| for script in function_script_dir.glob("*.py"): | |
| if script.name == "__init__.py": | |
| continue | |
| result += script.name | |
| result += "\n" | |
| return result | |
| def click_upload_function_python_script(files: List[str]): | |
| tgt = project_path / "functions" | |
| if files is None: | |
| return None | |
| for file in files: | |
| shutil.copy(file, tgt.as_posix()) | |
| return None | |
| def click_delete_function_python_script(filename: str): | |
| function_script_dir = project_path / "functions" | |
| filename = function_script_dir / filename.strip() | |
| filename = filename.as_posix() | |
| try: | |
| os.remove(filename) | |
| result = "success" | |
| except FileNotFoundError as e: | |
| result = str(e) | |
| except Exception as e: | |
| result = str(e) | |
| return result | |
| def click_download_function_python_script(name: str): | |
| function_script_dir = project_path / "functions" | |
| filename = function_script_dir / name.strip() | |
| if not filename.exists(): | |
| files = None | |
| flag = "File Not Found: {}".format(name.strip()) | |
| else: | |
| files = [filename.as_posix()] | |
| flag = "You can download it on `upload_python_script_files` now." | |
| return files, flag | |
| def convert_message_list_to_conversation(message_list: List[dict]) -> List[Tuple[str, str]]: | |
| conversation = list() | |
| for message in message_list: | |
| role = message["role"] | |
| content = message["content"] | |
| for c in content: | |
| c_type = c["type"] | |
| if c_type != "text": | |
| continue | |
| text: dict = c["text"] | |
| if c_type == "text": | |
| text_value = text["value"] | |
| text_annotations = text["annotations"] | |
| msg = text_value | |
| for text_annotation in text_annotations: | |
| a_type = text_annotation["type"] | |
| if a_type == "file_citation": | |
| msg += "\n\n" | |
| msg += "\nquote: \n{}\nfile_id: \n{}".format( | |
| text_annotation["file_citation"]["quote"], | |
| text_annotation["file_citation"]["file_id"], | |
| ) | |
| else: | |
| raise NotImplementedError | |
| if role == "assistant": | |
| msg = [None, msg] | |
| else: | |
| msg = [msg, None] | |
| conversation.append(msg) | |
| return conversation | |
| def refresh(openai_api_key: str, | |
| thread_id: str, | |
| ): | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| message_list = client.beta.threads.messages.list( | |
| thread_id=thread_id | |
| ) | |
| message_list = message_list.model_dump(mode="json") | |
| message_list = message_list["data"] | |
| message_list = list(sorted(message_list, key=lambda x: x["created_at"])) | |
| logger.debug("message_list: {}".format(message_list)) | |
| conversation = convert_message_list_to_conversation(message_list) | |
| return conversation | |
| def add_and_run(openai_api_key: str, | |
| assistant_id: str, | |
| thread_id: str, | |
| name: str, | |
| instructions: str, | |
| description: str, | |
| tools: str, | |
| files: List[str], | |
| file_ids: str, | |
| model: str, | |
| query: str, | |
| ): | |
| chatting_image = None | |
| client = OpenAI( | |
| api_key=openai_api_key, | |
| ) | |
| if assistant_id is None or len(assistant_id.strip()) == 0: | |
| assistant_id, _ = click_create_assistant( | |
| openai_api_key, | |
| name, instructions, description, tools, files, file_ids, model | |
| ) | |
| if thread_id is None or len(thread_id.strip()) == 0: | |
| thread = client.beta.threads.create() | |
| thread_id = thread.id | |
| logger.info(f"assistant_id: {assistant_id}, thread_id: {thread_id}, query: {query}") | |
| message = client.beta.threads.messages.create( | |
| thread_id=thread_id, | |
| role="user", | |
| content=query | |
| ) | |
| run = client.beta.threads.runs.create( | |
| thread_id=thread_id, | |
| assistant_id=assistant_id, | |
| ) | |
| delta_time = 0.1 | |
| last_conversation = None | |
| no_updates_count = 0 | |
| max_no_updates_count = 5 | |
| while True: | |
| time.sleep(delta_time) | |
| run = client.beta.threads.runs.retrieve( | |
| thread_id=thread_id, | |
| run_id=run.id, | |
| ) | |
| # required action | |
| if run.required_action is not None: | |
| if run.required_action.type == "submit_tool_outputs": | |
| tool_outputs = list() | |
| for tool_call in run.required_action.submit_tool_outputs.tool_calls: | |
| function_name = tool_call.function.name | |
| function_to_call = dynamic_import_function(function_name, function_name) | |
| kwargs_required: List[str] = dynamic_import_function(function_name, "kwargs")() | |
| try: | |
| function_args = json.loads(tool_call.function.arguments) | |
| except json.decoder.JSONDecodeError as e: | |
| print(tool_call.function.arguments) | |
| raise e | |
| kwargs = {k: function_args.get(k) for k in kwargs_required} | |
| function_response = function_to_call(**kwargs) | |
| tool_outputs.append({ | |
| "tool_call_id": tool_call.id, | |
| "output": function_response["text"], | |
| }) | |
| # chatting image | |
| image_filename: str = function_response.get("image", None) | |
| if image_filename is not None: | |
| # with open(image_filename, "rb") as f: | |
| chatting_image = Image.open(open(image_filename, "rb")) | |
| run = client.beta.threads.runs.submit_tool_outputs( | |
| thread_id=thread_id, | |
| run_id=run.id, | |
| tool_outputs=tool_outputs | |
| ) | |
| # get message | |
| conversation = refresh(openai_api_key, thread_id) | |
| if conversation == last_conversation: | |
| if any([run.completed_at is not None, | |
| run.cancelled_at is not None, | |
| run.failed_at is not None, | |
| run.expires_at is not None]): | |
| no_updates_count += 1 | |
| if no_updates_count >= max_no_updates_count: | |
| break | |
| last_conversation = conversation | |
| result = [ | |
| assistant_id, thread_id, | |
| conversation, | |
| # query and chatting image | |
| None, chatting_image | |
| ] | |
| yield result | |
| def main(): | |
| args = get_args() | |
| brief_description = """ | |
| ## OpenAI Assistant | |
| 基于 [OpenAI platform](https://platform.openai.com/docs/introduction) 开发的 assistant 界面及示例。使用方法等详细介绍在最下面。 | |
| """ | |
| with open(args.description_md_file, "r", encoding="utf-8") as f: | |
| md_description = f.read() | |
| # example json | |
| with open(args.examples_json_file, "r", encoding="utf-8") as f: | |
| examples = json.load(f) | |
| for example in examples: | |
| files: List[str] = example[4] | |
| if files is None: | |
| continue | |
| files = [(project_path / file).as_posix() for file in files] | |
| example[4] = files | |
| # history | |
| with open(args.history_json_file, "r", encoding="utf-8") as f: | |
| history = json.load(f) | |
| # ui | |
| with gr.Blocks() as blocks: | |
| gr.Markdown(value=brief_description) | |
| with gr.Row(): | |
| # settings | |
| with gr.Column(scale=3): | |
| with gr.Tabs(): | |
| with gr.TabItem("create assistant"): | |
| openai_api_key = gr.Text( | |
| value=args.openai_api_key, | |
| label="openai_api_key", | |
| placeholder="Fill with your `openai_api_key`" | |
| ) | |
| name = gr.Textbox(label="name") | |
| instructions = gr.Textbox(label="instructions") | |
| description = gr.Textbox(label="description") | |
| model = gr.Dropdown(["gpt-4-1106-preview"], value="gpt-4-1106-preview", label="model") | |
| # functions | |
| tools = gr.TextArea(label="functions") | |
| # upload files | |
| retrieval_files = gr.Files(label="retrieval_files") | |
| retrieval_file_ids = gr.TextArea(label="retrieval_file_ids") | |
| # create assistant | |
| create_assistant_button = gr.Button("create assistant", variant="secondary") | |
| with gr.TabItem("assistants"): | |
| list_assistant_button = gr.Button("list assistant") | |
| assistant_list = gr.TextArea(label="assistant_list") | |
| delete_assistant_id = gr.Textbox(max_lines=1, label="delete_assistant_id") | |
| delete_assistant_button = gr.Button("delete assistant") | |
| delete_all_assistant_button = gr.Button("delete all assistant") | |
| with gr.TabItem("files"): | |
| list_file_button = gr.Button("list file") | |
| file_list = gr.TextArea(label="file_list") | |
| upload_files = gr.Files(label="upload_files") | |
| upload_files_button = gr.Button("upload file") | |
| delete_file_id = gr.Textbox(max_lines=1, label="delete_file_id") | |
| delete_file_button = gr.Button("delete file") | |
| with gr.TabItem("function script"): | |
| list_function_python_script_button = gr.Button("list python script") | |
| list_function_python_script_list = gr.TextArea(label="python_script_list") | |
| upload_function_python_script_files = gr.Files(label="upload_python_script_files") | |
| upload_function_python_script_button = gr.Button("upload python script") | |
| function_python_script_file = gr.Textbox(max_lines=1, label="python_script_file") | |
| delete_function_python_script_button = gr.Button("delete python script") | |
| download_function_python_script_button = gr.Button("download python script") | |
| # chat | |
| with gr.Column(scale=5): | |
| chat_bot = gr.Chatbot(label="conversation", height=600) | |
| with gr.Row(): | |
| with gr.Column(scale=3, min_width=100): | |
| chatting_image = gr.Image(label="chatting_image", height=220, width=220, show_label=False) | |
| with gr.Column(scale=7): | |
| query = gr.Textbox(lines=1, label="query") | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=100): | |
| add_and_run_button = gr.Button("Add and run", variant="primary") | |
| with gr.Column(scale=1, min_width=100): | |
| refresh_button = gr.Button("Refresh") | |
| # states | |
| with gr.Column(scale=2): | |
| assistant_id = gr.Textbox(value=None, label="assistant_id") | |
| thread_id = gr.Textbox(value=None, label="thread_id") | |
| tips = gr.TextArea(value=None, label="tips") | |
| # examples | |
| with gr.Row(): | |
| gr.Examples( | |
| examples=examples, | |
| inputs=[ | |
| name, instructions, description, tools, retrieval_files, model, | |
| query, tips | |
| ], | |
| examples_per_page=5 | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown(value=md_description) | |
| with gr.Column(scale=1): | |
| gr.Markdown("会话记录") | |
| with gr.Tabs(): | |
| for h in history: | |
| with gr.TabItem(h["title"]): | |
| _ = gr.Chatbot(value=h["history"], label=h["title"], height=600) | |
| # create assistant | |
| create_assistant_button.click( | |
| click_create_assistant, | |
| inputs=[ | |
| openai_api_key, | |
| name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, | |
| ], | |
| outputs=[ | |
| assistant_id, thread_id | |
| ] | |
| ) | |
| # list assistant | |
| list_assistant_button.click( | |
| click_list_assistant, | |
| inputs=[ | |
| openai_api_key | |
| ], | |
| outputs=[ | |
| assistant_list | |
| ] | |
| ) | |
| # delete assistant button | |
| delete_assistant_button.click( | |
| click_delete_assistant, | |
| inputs=[ | |
| openai_api_key, | |
| delete_assistant_id | |
| ], | |
| outputs=[ | |
| delete_assistant_id | |
| ] | |
| ) | |
| # delete all assistant | |
| delete_all_assistant_button.click( | |
| click_delete_all_assistant, | |
| inputs=[ | |
| openai_api_key | |
| ], | |
| outputs=[ | |
| file_list | |
| ] | |
| ) | |
| # list file | |
| list_file_button.click( | |
| click_list_file, | |
| inputs=[ | |
| openai_api_key | |
| ], | |
| outputs=[ | |
| file_list | |
| ], | |
| ) | |
| # delete file | |
| delete_file_button.click( | |
| click_delete_file, | |
| inputs=[ | |
| openai_api_key, | |
| delete_file_id | |
| ], | |
| outputs=[ | |
| delete_file_id | |
| ] | |
| ) | |
| # upload files | |
| upload_files_button.click( | |
| click_upload_files, | |
| inputs=[ | |
| openai_api_key, | |
| upload_files | |
| ], | |
| outputs=[ | |
| ] | |
| ) | |
| # list python script | |
| list_function_python_script_button.click( | |
| click_list_function_python_script, | |
| inputs=[], | |
| outputs=[ | |
| list_function_python_script_list | |
| ] | |
| ) | |
| # upload function python script | |
| upload_function_python_script_button.click( | |
| click_upload_function_python_script, | |
| inputs=[ | |
| upload_function_python_script_files | |
| ], | |
| outputs=[ | |
| upload_function_python_script_files | |
| ] | |
| ) | |
| # delete function python script | |
| delete_function_python_script_button.click( | |
| click_delete_function_python_script, | |
| inputs=[ | |
| function_python_script_file | |
| ], | |
| outputs=[ | |
| function_python_script_file | |
| ] | |
| ) | |
| # download function python script | |
| download_function_python_script_button.click( | |
| click_download_function_python_script, | |
| inputs=[ | |
| function_python_script_file | |
| ], | |
| outputs=[ | |
| upload_function_python_script_files, function_python_script_file | |
| ] | |
| ) | |
| # query submit | |
| query.submit( | |
| add_and_run, | |
| inputs=[ | |
| openai_api_key, | |
| assistant_id, thread_id, | |
| name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, | |
| query, | |
| ], | |
| outputs=[ | |
| assistant_id, thread_id, | |
| chat_bot, | |
| query, | |
| chatting_image, | |
| ], | |
| api_name="query_submit", | |
| ) | |
| # add and run | |
| add_and_run_button.click( | |
| add_and_run, | |
| inputs=[ | |
| openai_api_key, | |
| assistant_id, thread_id, | |
| name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, | |
| query, | |
| ], | |
| outputs=[ | |
| assistant_id, thread_id, | |
| chat_bot, | |
| query, | |
| chatting_image | |
| ], | |
| ) | |
| # refresh | |
| refresh_button.click( | |
| refresh, | |
| inputs=[ | |
| openai_api_key, | |
| thread_id, | |
| ], | |
| outputs=[ | |
| chat_bot | |
| ] | |
| ) | |
| blocks.queue().launch( | |
| share=False if platform.system() == "Windows" else False, | |
| server_name="127.0.0.1" if platform.system() == "Windows" else "0.0.0.0", | |
| server_port=7860 | |
| ) | |
| return | |
| if __name__ == '__main__': | |
| main() | |