Spaces:
Running
Running
| import os | |
| import pprint | |
| import re | |
| from typing import List, Optional, Union | |
| from qwen_agent import Agent, MultiAgentHub | |
| from qwen_agent.agents.user_agent import PENDING_USER_INPUT | |
| from qwen_agent.gui.gradio_utils import format_cover_html | |
| from qwen_agent.gui.utils import convert_fncall_to_text, convert_history_to_chatbot, get_avatar_image | |
| from qwen_agent.llm.schema import CONTENT, FILE, IMAGE, NAME, ROLE, USER, Message | |
| from qwen_agent.log import logger | |
| from qwen_agent.utils.utils import print_traceback | |
| from patching import common_programming_language_extensions | |
| class WebUI: | |
| """A Common chatbot application for agent.""" | |
| def __init__(self, agent: Union[Agent, MultiAgentHub, List[Agent]], chatbot_config: Optional[dict] = None): | |
| """ | |
| Initialization the chatbot. | |
| Args: | |
| agent: The agent or a list of agents, | |
| supports various types of agents such as Assistant, GroupChat, Router, etc. | |
| chatbot_config: The chatbot configuration. | |
| Set the configuration as {'user.name': '', 'user.avatar': '', 'agent.avatar': '', 'input.placeholder': '', 'prompt.suggestions': []}. | |
| """ | |
| chatbot_config = chatbot_config or {} | |
| if isinstance(agent, MultiAgentHub): | |
| self.agent_list = [agent for agent in agent.nonuser_agents] | |
| self.agent_hub = agent | |
| elif isinstance(agent, list): | |
| self.agent_list = agent | |
| self.agent_hub = None | |
| else: | |
| self.agent_list = [agent] | |
| self.agent_hub = None | |
| user_name = chatbot_config.get('user.name', 'user') | |
| self.user_config = { | |
| 'name': user_name, | |
| 'avatar': chatbot_config.get( | |
| 'user.avatar', | |
| get_avatar_image(user_name), | |
| ), | |
| } | |
| self.agent_config_list = [{ | |
| 'name': agent.name, | |
| 'avatar': chatbot_config.get( | |
| 'agent.avatar', | |
| os.path.join(os.path.dirname(__file__), 'assets/logo.jpeg'), | |
| ), | |
| 'description': agent.description or "I'm a helpful assistant.", | |
| } for agent in self.agent_list] | |
| self.input_placeholder = chatbot_config.get('input.placeholder', '跟我聊聊吧~') | |
| self.prompt_suggestions = chatbot_config.get('prompt.suggestions', []) | |
| self.verbose = chatbot_config.get('verbose', False) | |
| """ | |
| Run the chatbot. | |
| Args: | |
| messages: The chat history. | |
| """ | |
| def run(self, | |
| messages: List[Message] = None, | |
| share: bool = False, | |
| server_name: str = None, | |
| server_port: int = None, | |
| concurrency_limit: int = 10, | |
| enable_mention: bool = False, | |
| **kwargs): | |
| self.run_kwargs = kwargs | |
| from qwen_agent.gui.gradio import gr, mgr | |
| customTheme = gr.themes.Default( | |
| primary_hue=gr.themes.utils.colors.blue, | |
| radius_size=gr.themes.utils.sizes.radius_none, | |
| ) | |
| with gr.Blocks( | |
| css=os.path.join(os.path.dirname(__file__), 'assets/appBot.css'), | |
| theme=customTheme, | |
| ) as demo: | |
| history = gr.State([]) | |
| with gr.Row(elem_classes='container'): | |
| with gr.Column(scale=4): | |
| chatbot = mgr.Chatbot(value=convert_history_to_chatbot(messages=messages), | |
| avatar_images=[ | |
| self.user_config, | |
| self.agent_config_list, | |
| ], | |
| height=800, | |
| avatar_image_width=80, | |
| flushing=False, | |
| show_copy_button=True, | |
| latex_delimiters=[{ | |
| 'left': '\\(', | |
| 'right': '\\)', | |
| 'display': True | |
| }, { | |
| 'left': '\\begin{equation}', | |
| 'right': '\\end{equation}', | |
| 'display': True | |
| }, { | |
| 'left': '\\begin{align}', | |
| 'right': '\\end{align}', | |
| 'display': True | |
| }, { | |
| 'left': '\\begin{alignat}', | |
| 'right': '\\end{alignat}', | |
| 'display': True | |
| }, { | |
| 'left': '\\begin{gather}', | |
| 'right': '\\end{gather}', | |
| 'display': True | |
| }, { | |
| 'left': '\\begin{CD}', | |
| 'right': '\\end{CD}', | |
| 'display': True | |
| }, { | |
| 'left': '\\[', | |
| 'right': '\\]', | |
| 'display': True | |
| }]) | |
| input = mgr.MultimodalInput(placeholder=self.input_placeholder, upload_button_props=dict(file_types=[".pdf", ".docx", ".pptx", ".txt", ".html", ".csv", ".tsv", ".xlsx", ".xls"] + ["." + file_type for file_type in common_programming_language_extensions])) | |
| with gr.Column(scale=1): | |
| if len(self.agent_list) > 1: | |
| agent_selector = gr.Radio( | |
| [(agent.name, i) for i, agent in enumerate(self.agent_list)], | |
| label='Models', | |
| info='Select Your Model Here', | |
| value=0, | |
| interactive=True, | |
| ) | |
| # gr.Dropdown( | |
| # [(agent.name, i) for i, agent in enumerate(self.agent_list)], | |
| # label='Models', | |
| # info='Select Your Model Here', | |
| # value=0, | |
| # interactive=True, | |
| # ) | |
| agent_info_block = self._create_agent_info_block() | |
| # agent_plugins_block = self._create_agent_plugins_block() | |
| if self.prompt_suggestions: | |
| gr.Examples( | |
| label='推荐对话', | |
| examples=self.prompt_suggestions, | |
| inputs=[input], | |
| ) | |
| if len(self.agent_list) > 1: | |
| agent_selector.change( | |
| fn=self.change_agent, | |
| inputs=[agent_selector, chatbot, history], | |
| outputs=[agent_selector, agent_info_block, chatbot, history], #, agent_plugins_block], | |
| queue=False, | |
| ) | |
| input_promise = input.submit( | |
| fn=self.add_text, | |
| inputs=[input, chatbot, history], | |
| outputs=[input, chatbot, history, agent_selector], | |
| queue=False, | |
| ) | |
| if len(self.agent_list) > 1 and enable_mention: | |
| input_promise = input_promise.then( | |
| self.add_mention, | |
| [chatbot, agent_selector], | |
| [chatbot, agent_selector], | |
| ).then( | |
| self.agent_run, | |
| [chatbot, history, agent_selector], | |
| [chatbot, history, agent_selector], | |
| ) | |
| else: | |
| input_promise = input_promise.then( | |
| self.agent_run, | |
| [chatbot, history, agent_selector], | |
| [chatbot, history, agent_selector], | |
| ) | |
| input_promise.then(self.flushed, None, [input, agent_selector]) | |
| demo.load(None) | |
| demo.queue(default_concurrency_limit=concurrency_limit).launch(share=share, | |
| server_name=server_name, | |
| server_port=server_port) | |
| def change_agent(self, agent_selector, _chatbot, _history): | |
| _chatbot = [] | |
| _history.clear() | |
| yield agent_selector, self._create_agent_info_block(agent_selector), _chatbot, _history #, self._create_agent_plugins_block(agent_selector) | |
| def add_text(self, _input, _chatbot, _history): | |
| from qwen_agent.gui.gradio import gr | |
| if _input.text == "/clear": | |
| _chatbot = [] | |
| _history.clear() | |
| yield gr.update(interactive=False, value=""), _chatbot, _history, gr.update(interactive=False) | |
| return | |
| _history.append({ | |
| ROLE: USER, | |
| CONTENT: [{ | |
| 'text': _input.text | |
| }], | |
| }) | |
| if self.user_config[NAME]: | |
| _history[-1][NAME] = self.user_config[NAME] | |
| if _input.files: | |
| for file in _input.files: | |
| if file.mime_type.startswith('image/'): | |
| _history[-1][CONTENT].append({IMAGE: 'file://' + file.path}) | |
| else: | |
| _history[-1][CONTENT].append({FILE: file.path}) | |
| _chatbot.append([_input, None]) | |
| yield gr.update(interactive=False, value=None), _chatbot, _history, gr.update(interactive=False) | |
| def add_mention(self, _chatbot, _agent_selector): | |
| if len(self.agent_list) == 1: | |
| yield _chatbot, _agent_selector | |
| query = _chatbot[-1][0].text | |
| match = re.search(r'@\w+\b', query) | |
| if match: | |
| _agent_selector = self._get_agent_index_by_name(match.group()[1:]) | |
| agent_name = self.agent_list[_agent_selector].name | |
| if ('@' + agent_name) not in query and self.agent_hub is None: | |
| _chatbot[-1][0].text = '@' + agent_name + ' ' + query | |
| yield _chatbot, _agent_selector | |
| def agent_run(self, _chatbot, _history, _agent_selector=None): | |
| if not _history: | |
| if _agent_selector is not None: | |
| yield _chatbot, _history, _agent_selector | |
| else: | |
| yield _chatbot, _history | |
| return | |
| if self.verbose: | |
| logger.info('agent_run input:\n' + pprint.pformat(_history, indent=2)) | |
| num_input_bubbles = len(_chatbot) - 1 | |
| num_output_bubbles = 1 | |
| _chatbot[-1][1] = [None for _ in range(len(self.agent_list))] | |
| agent_runner = self.agent_list[_agent_selector or 0] | |
| if self.agent_hub: | |
| agent_runner = self.agent_hub | |
| responses = [] | |
| for responses in agent_runner.run(_history, **self.run_kwargs): | |
| # usage = responses.usage | |
| # responses = [Message(ASSISTANT, responses.output.choices[0].message.content)] | |
| if not responses: | |
| continue | |
| if responses[-1][CONTENT] == PENDING_USER_INPUT: | |
| logger.info('Interrupted. Waiting for user input!') | |
| break | |
| display_responses = convert_fncall_to_text(responses) | |
| # display_responses[-1][CONTENT] += "\n<summary>" + repr({"usage": usage}) + "</summary>" | |
| if not display_responses: | |
| continue | |
| if display_responses[-1][CONTENT] is None: | |
| continue | |
| while len(display_responses) > num_output_bubbles: | |
| # Create a new chat bubble | |
| _chatbot.append([None, None]) | |
| _chatbot[-1][1] = [None for _ in range(len(self.agent_list))] | |
| num_output_bubbles += 1 | |
| assert num_output_bubbles == len(display_responses) | |
| assert num_input_bubbles + num_output_bubbles == len(_chatbot) | |
| for i, rsp in enumerate(display_responses): | |
| agent_index = self._get_agent_index_by_name(rsp[NAME]) | |
| _chatbot[num_input_bubbles + i][1][agent_index] = rsp[CONTENT] | |
| if len(self.agent_list) > 1: | |
| _agent_selector = agent_index | |
| if _agent_selector is not None: | |
| yield _chatbot, _history, _agent_selector | |
| else: | |
| yield _chatbot, _history | |
| if responses: | |
| for res in responses: | |
| res['content'] = re.sub(r"\n<summary>input tokens.*</summary>", "", res['content']) | |
| _history.extend([res for res in responses if res[CONTENT] != PENDING_USER_INPUT]) | |
| if _agent_selector is not None: | |
| yield _chatbot, _history, _agent_selector | |
| else: | |
| yield _chatbot, _history | |
| if self.verbose: | |
| logger.info('agent_run response:\n' + pprint.pformat(responses, indent=2)) | |
| def flushed(self): | |
| from qwen_agent.gui.gradio import gr | |
| return gr.update(interactive=True), gr.update(interactive=True) | |
| def _get_agent_index_by_name(self, agent_name): | |
| if agent_name is None: | |
| return 0 | |
| try: | |
| agent_name = agent_name.strip() | |
| for i, agent in enumerate(self.agent_list): | |
| if agent.name == agent_name: | |
| return i | |
| return 0 | |
| except Exception: | |
| print_traceback() | |
| return 0 | |
| def _create_agent_info_block(self, agent_index=0): | |
| from qwen_agent.gui.gradio import gr | |
| agent_config_interactive = self.agent_config_list[agent_index] | |
| return gr.HTML( | |
| format_cover_html( | |
| bot_name=agent_config_interactive['name'], | |
| bot_description=agent_config_interactive['description'], | |
| bot_avatar=agent_config_interactive['avatar'], | |
| )) | |
| def _create_agent_plugins_block(self, agent_index=0): | |
| from qwen_agent.gui.gradio import gr | |
| agent_interactive = self.agent_list[agent_index] | |
| if agent_interactive.function_map: | |
| capabilities = [key for key in agent_interactive.function_map.keys()] | |
| return gr.CheckboxGroup( | |
| label='插件', | |
| value=capabilities, | |
| choices=capabilities, | |
| interactive=False, | |
| ) | |
| else: | |
| return gr.CheckboxGroup( | |
| label='插件', | |
| value=[], | |
| choices=[], | |
| interactive=False, | |
| ) | |