Spaces:
Runtime error
Runtime error
| # import gradio as gr | |
| import gradio | |
| # import lmdb | |
| # import base64 | |
| # import io | |
| import random | |
| import time | |
| import os | |
| import re | |
| import json | |
| import copy | |
| # import sqlite3 | |
| import hashlib | |
| import uuid | |
| from urllib.parse import urljoin | |
| import openai | |
| def get_random_sleep(base_time, random_range): | |
| return (base_time + random.randint(-random_range, random_range))*0.001 | |
| def js_load(txt): | |
| try: | |
| return json.loads(txt) | |
| except Exception as error: | |
| print('') | |
| print('js_load:') | |
| print(str(error)) | |
| print('') | |
| return None | |
| def js_dump(thing): | |
| try: | |
| return json.dumps(thing) | |
| except Exception as error: | |
| print('') | |
| print('js_dump:') | |
| print(str(error)) | |
| print('') | |
| return None | |
| def filtered_history(history, num=0): | |
| if num > 0: | |
| filtered = list(filter(lambda it:(it['type'] in ['request', 'response']), history)) | |
| return filtered[-num:] | |
| return [] | |
| def filtered_history_messages(history, num=0): | |
| filtered = filtered_history(history, num) | |
| return list(map(lambda it:{'role': it.get('role'), 'content': it.get('content')}, filtered)) | |
| def make_md_line(role, content): | |
| return f"""\n##### `{role}`\n\n{content}\n""" | |
| def make_md_by_history(history): | |
| md = "" | |
| for item in history: | |
| md += make_md_line(item.get('role'), item.get('content')) | |
| return md | |
| def make_history_file_fn(history): | |
| uuid4 = str(uuid.uuid4()) | |
| json_file_path = None | |
| md_file_path = None | |
| try: | |
| # 如果目录不存在,则创建目录 | |
| os.makedirs('temp_files', exist_ok=True) | |
| json_file_content = json.dumps(history) | |
| json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') | |
| with open(json_file_path, 'w') as f: | |
| f.write(json_file_content) | |
| md_file_content = make_md_by_history(history) | |
| md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') | |
| with open(md_file_path, 'w') as f: | |
| f.write(md_file_content) | |
| return json_file_path, md_file_path, gradio.update(visible=True) | |
| except Exception as error: | |
| print(f"\n{error}\n") | |
| return json_file_path, md_file_path, gradio.update(visible=True) | |
| def make_history_file_fn__(history): | |
| uuid4 = str(uuid.uuid4()) | |
| try: | |
| json_file_content = json.dumps(history) | |
| json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') | |
| with open(json_file_path, 'w') as f: | |
| f.write(json_file_content) | |
| except Exception as error: | |
| print(f"\n{error}\n") | |
| json_file_path = None | |
| try: | |
| md_file_content = make_md_by_history(history) | |
| md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') | |
| with open(md_file_path, 'w') as f: | |
| f.write(md_file_content) | |
| except Exception as error: | |
| print(f"\n{error}\n") | |
| md_file_path = None | |
| return json_file_path, md_file_path, gradio.update(visible=True) | |
| def make_user_message_list_fn__( | |
| user_message_template, # 模板,套用到每一条消息上 | |
| user_message_template_mask, # 模板中要被替换的部分 | |
| user_message_template_mask_is_regex, # 决定如何构造用于替换的正则表达式 | |
| user_message_list_text, # 一段文本,包含了每一条用户消息 | |
| user_message_list_text_splitter, # 描述了应该以什么为线索来切分 user_message_list_text | |
| user_message_list_text_splitter_is_regex, # 决定如何进行切分 | |
| ) -> list: | |
| # 返回套用了模板的用户信息列表 | |
| # 这个实现首先根据是否使用正则表达式来切分用户消息列表文本,并将切分后的消息存储在一个列表中。 | |
| # 然后,针对每个消息,根据user_message_template_mask及user_message_template_mask_is_regex替换模板中的部分内容, | |
| # 并将替换后的结果添加到结果列表中。 | |
| # 最后,返回结果列表。 | |
| # 切分用户消息列表文本 | |
| if user_message_list_text_splitter_is_regex: | |
| user_messages = re.split(user_message_list_text_splitter, user_message_list_text) | |
| else: | |
| user_messages = user_message_list_text.split(user_message_list_text_splitter) | |
| # 生成套用模板的用户信息列表 | |
| user_message_result_list = [] | |
| for message in user_messages: | |
| # 替换模板内容 | |
| if user_message_template_mask_is_regex: | |
| transformed_message = re.sub(user_message_template_mask, message, user_message_template) | |
| else: | |
| transformed_message = user_message_template.replace(user_message_template_mask, message) | |
| user_message_result_list.append(transformed_message) | |
| return user_message_result_list | |
| def make_user_message_list_fn( | |
| user_message_template, | |
| user_message_template_mask, | |
| user_message_template_mask_is_regex, | |
| user_message_list_text, | |
| user_message_list_text_splitter, | |
| user_message_list_text_splitter_is_regex, | |
| ) -> list: | |
| # 实际上,只要保证在使用正则表达式进行替换或切分操作之前,已经将其编译为正则表达式对象即可。 | |
| # 在我的修改中,针对 xxx_is_regex 参数为 True 的情况,将这些参数编译成正则表达式。 | |
| # 对于替换操作和切分操作,只需检查是否已经编译为正则表达式,并使用相应的方法即可。 | |
| # 编译正则表达式 | |
| if user_message_template_mask_is_regex: | |
| user_message_template_mask = re.compile(user_message_template_mask) | |
| if user_message_list_text_splitter_is_regex: | |
| user_message_list_text_splitter = re.compile(user_message_list_text_splitter) | |
| # 切分用户消息列表文本 | |
| if user_message_list_text_splitter_is_regex: | |
| user_messages = user_message_list_text_splitter.split(user_message_list_text) | |
| else: | |
| user_messages = user_message_list_text.split(user_message_list_text_splitter) | |
| # 生成套用模板的用户信息列表 | |
| user_message_result_list = [] | |
| for message in user_messages: | |
| # 替换模板内容 | |
| if user_message_template_mask_is_regex: | |
| transformed_message = user_message_template_mask.sub(message, user_message_template) | |
| else: | |
| transformed_message = user_message_template.replace(user_message_template_mask, message) | |
| user_message_result_list.append(transformed_message) | |
| return user_message_result_list | |
| def sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): | |
| print("\n\n") | |
| assistant_message = "" | |
| tips = "" | |
| try: | |
| openai.api_key = api_key_text | |
| completion = openai.ChatCompletion.create(**payload) | |
| if payload.get('stream'): | |
| is_first=True | |
| for chunk in completion: | |
| if is_first: | |
| is_first = False | |
| continue | |
| if chunk.choices[0].finish_reason is None: | |
| print(chunk.choices[0].delta.content or '', end="") | |
| assistant_message += chunk.choices[0].delta.content or '' | |
| history_md_stream = make_md_line('assistant', assistant_message) | |
| tips = 'streaming' | |
| yield assistant_message, history_md_stream, tips, history | |
| else: | |
| pass | |
| pass | |
| print('') | |
| pass | |
| else: | |
| assistant_message = completion.choices[0].message.content | |
| history_md_stream = make_md_line('assistant', assistant_message) | |
| tips = 'got' | |
| print(assistant_message) | |
| yield assistant_message, history_md_stream, tips, history | |
| pass | |
| except Exception as error: | |
| tips = str(error) | |
| history.append({"role": "app", "content": tips}) | |
| print(f"\n{tips}\n") | |
| yield assistant_message, history_md_stream, tips, history | |
| pass | |
| print("\n\n") | |
| def sequential_chat_fn( | |
| history, | |
| system_prompt_enabled, | |
| system_prompt, | |
| user_message_template, | |
| user_message_template_mask, | |
| user_message_template_mask_is_regex, | |
| user_message_list_text, | |
| user_message_list_text_splitter, | |
| user_message_list_text_splitter_is_regex, | |
| history_prompt_num, | |
| api_key_text, token_text, | |
| sleep_base, sleep_rand, | |
| prop_stream, prop_model, prop_temperature, prop_top_p, prop_choices_num, prop_max_tokens, prop_presence_penalty, prop_frequency_penalty, prop_logit_bias, | |
| ): | |
| # outputs=[ | |
| # history, | |
| # history_md_stable, | |
| # history_md_stream, | |
| # tips, | |
| # file_row, | |
| # ], | |
| history_md_stable = "" | |
| history_md_stream = "" | |
| tips = "" | |
| try: | |
| user_message_list = make_user_message_list_fn( | |
| user_message_template, | |
| user_message_template_mask, | |
| user_message_template_mask_is_regex, | |
| user_message_list_text, | |
| user_message_list_text_splitter, | |
| user_message_list_text_splitter_is_regex, | |
| ) | |
| payload = { | |
| 'model': prop_model, | |
| 'temperature': prop_temperature, | |
| 'top_p': prop_top_p, | |
| 'n': prop_choices_num, | |
| 'stream': prop_stream, | |
| 'presence_penalty': prop_presence_penalty, | |
| 'frequency_penalty': prop_frequency_penalty, | |
| 'user': token_text, | |
| } | |
| if prop_max_tokens>0: | |
| payload['max_tokens'] = prop_max_tokens | |
| # if prop_logit_bias is not None: | |
| # payload['logit_bias'] = prop_logit_bias | |
| # headers = { | |
| # "Content-Type": "application/json", | |
| # "Authorization": f"Bearer {api_key_text}" | |
| # } | |
| for user_message in user_message_list: | |
| print('') | |
| print(user_message) | |
| print('') | |
| # make the_messages to sent | |
| the_messages = [] | |
| if system_prompt_enabled: | |
| the_messages.append({"role": "system", "content": system_prompt}) | |
| for msg in filtered_history_messages(history, num=history_prompt_num): | |
| the_messages.append(msg) | |
| the_messages.append({"role": "user", "content": user_message}) | |
| payload['messages'] = the_messages | |
| history.append({"role": "user", "content": user_message, "type": "request", "payload": payload}) | |
| history_md_stable = make_md_by_history(history) | |
| history_md_stream = "" | |
| tips = "" | |
| yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
| try: | |
| for (assistant_message, history_md_stream, tips, history) in sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): | |
| yield history, history_md_stable, history_md_stream, tips, gradio.update() | |
| history.append({"role": "assistant", "content": assistant_message, "type": "request"}) | |
| history_md_stable += history_md_stream | |
| history_md_stream = "" | |
| tips = "fine" | |
| yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
| except Exception as error: | |
| tips = f'error: {str(error)}' | |
| history.append({"role": "app", "content": tips}) | |
| print(f"\n{tips}\n") | |
| yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
| time.sleep(get_random_sleep(sleep_base, sleep_rand)) | |
| pass | |
| except Exception as error: | |
| tips = str(error) | |
| history.append({"role": "app", "content": tips}) | |
| print(f"\n{tips}\n") | |
| yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
| pass | |
| def on_click_send_btn( | |
| global_state_json, api_key_text, chat_input_role, chat_input, prompt_table, chat_use_prompt, chat_use_history, chat_log, | |
| chat_model, temperature, top_p, choices_num, stream, max_tokens, presence_penalty, frequency_penalty, logit_bias, | |
| ): | |
| old_state = json.loads(global_state_json or "{}") | |
| print('\n\n\n\n\n') | |
| print(prompt_table) | |
| prompt_table = prompt_table or [] | |
| chat_log = chat_log or [] | |
| chat_log_md = '' | |
| if chat_use_prompt: | |
| chat_log_md += '<center>(prompt)</center>\n\n' | |
| chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) | |
| chat_log_md += '\n---\n' | |
| if True: | |
| chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n' | |
| chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) | |
| chat_log_md += '\n---\n' | |
| # if chat_input=='': | |
| # return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input | |
| print('\n') | |
| print(chat_input) | |
| print('') | |
| try: | |
| logit_bias_json = json.dumps(logit_bias) if logit_bias else None | |
| except: | |
| return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input | |
| new_state = copy.deepcopy(old_state) or {} | |
| req_hist = copy.deepcopy(prompt_table) if chat_use_prompt else [] | |
| if chat_use_history: | |
| for hh in (chat_log or []): | |
| req_hist.append(hh) | |
| if chat_input and chat_input!="": | |
| req_hist.append([(chat_input_role or 'user'), chat_input]) | |
| openai.api_key = api_key_text | |
| props = { | |
| 'model': chat_model, | |
| 'messages': [xx for xx in map(lambda it: {'role':it[0], 'content':it[1]}, req_hist)], | |
| 'temperature': temperature, | |
| 'top_p': top_p, | |
| 'n': choices_num, | |
| 'stream': stream, | |
| 'presence_penalty': presence_penalty, | |
| 'frequency_penalty': frequency_penalty, | |
| } | |
| if max_tokens>0: | |
| props['max_tokens'] = max_tokens | |
| if logit_bias_json is not None: | |
| props['logit_bias'] = logit_bias_json | |
| props_json = json.dumps(props) | |
| try: | |
| completion = openai.ChatCompletion.create(**props) | |
| print('') | |
| # print(completion.choices) | |
| # the_response_role = completion.choices[0].message.role | |
| # the_response = completion.choices[0].message.content | |
| # print(the_response) | |
| # print('') | |
| # chat_last_resp = json.dumps(completion.__dict__) | |
| # chat_last_resp_dict = json.loads(chat_last_resp) | |
| # chat_last_resp_dict['api_key'] = "hidden by UI" | |
| # chat_last_resp_dict['organization'] = "hidden by UI" | |
| # chat_last_resp = json.dumps(chat_last_resp_dict) | |
| chat_log_md = '' | |
| if chat_use_prompt: | |
| chat_log_md += '<center>(prompt)</center>\n\n' | |
| chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) | |
| chat_log_md += '\n---\n' | |
| if True: | |
| chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n' | |
| chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) | |
| chat_log_md += '\n---\n' | |
| if chat_input and chat_input!="": | |
| chat_log.append([(chat_input_role or 'user'), chat_input]) | |
| chat_log_md += f"##### `{(chat_input_role or 'user')}`\n\n{chat_input}\n\n" | |
| partial_words = "" | |
| counter=0 | |
| if stream: | |
| the_response = '' | |
| the_response_role = '' | |
| for chunk in completion: | |
| #Skipping first chunk | |
| if counter == 0: | |
| the_response_role = chunk.choices[0].delta.role | |
| chat_log_md += f"##### `{the_response_role}`\n\n" | |
| counter += 1 | |
| continue | |
| # print(('chunk', chunk)) | |
| if chunk.choices[0].finish_reason is None: | |
| the_response_chunk = chunk.choices[0].delta.content | |
| the_response += the_response_chunk | |
| chat_log_md += f"{the_response_chunk}" | |
| yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, "{}", props_json, '' | |
| else: | |
| chat_log.append([the_response_role, the_response]) | |
| chat_log_md += f"\n\n" | |
| yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, '{"msg": "stream模式不支持显示"}', props_json, '' | |
| # chat_last_resp = json.dumps(completion.__dict__) | |
| # chat_last_resp_dict = json.loads(chat_last_resp) | |
| # chat_last_resp_dict['api_key'] = "hidden by UI" | |
| # chat_last_resp_dict['organization'] = "hidden by UI" | |
| # chat_last_resp = json.dumps(chat_last_resp_dict) | |
| else: | |
| the_response_role = completion.choices[0].message.role | |
| the_response = completion.choices[0].message.content | |
| print(the_response) | |
| print('') | |
| chat_log.append([the_response_role, the_response]) | |
| chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n" | |
| chat_last_resp = json.dumps(completion.__dict__) | |
| chat_last_resp_dict = json.loads(chat_last_resp) | |
| chat_last_resp_dict['api_key'] = "hidden by UI" | |
| chat_last_resp_dict['organization'] = "hidden by UI" | |
| chat_last_resp = json.dumps(chat_last_resp_dict) | |
| return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, '' | |
| # chat_log.append([the_response_role, the_response]) | |
| # chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n" | |
| # return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, '' | |
| except Exception as error: | |
| print(error) | |
| print('error!!!!!!') | |
| chat_log_md = '' | |
| if chat_use_prompt: | |
| chat_log_md += '<center>(prompt)</center>\n\n' | |
| chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) | |
| chat_log_md += '\n---\n' | |
| if True: | |
| chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n' | |
| chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) | |
| chat_log_md += '\n---\n' | |
| # chat_log_md = '' | |
| # chat_log_md = "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) if chat_use_prompt else '' | |
| # chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", hist)]) | |
| chat_log_md += "\n" | |
| chat_log_md += str(error) | |
| return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, None, props_json, chat_input | |
| def clear_history(): | |
| return [], "" | |
| def copy_history(txt): | |
| # print('\n\n copying') | |
| # print(txt) | |
| # print('\n\n') | |
| pass | |
| def update_saved_prompt_titles(global_state_json, selected_saved_prompt_title): | |
| print('') | |
| global_state = json.loads(global_state_json or "{}") | |
| print(global_state) | |
| print(selected_saved_prompt_title) | |
| saved_prompts = global_state.get('saved_prompts') or [] | |
| print(saved_prompts) | |
| the_choices = [(it.get('title') or '[untitled]') for it in saved_prompts] | |
| print(the_choices) | |
| print('') | |
| return gradio.Dropdown.update(choices=the_choices) | |
| def save_prompt(global_state_json, saved_prompts, prompt_title, prompt_table): | |
| the_choices = [] | |
| global_state = json.loads(global_state_json or "{}") | |
| saved_prompts = global_state.get('saved_prompts') or [] | |
| if len(saved_prompts): | |
| the_choices = [it.get('title') or '[untitled]' for it in saved_prompts] | |
| pass | |
| return global_state_json, gradio.Dropdown.update(choices=the_choices, value=prompt_title), prompt_title, prompt_table | |
| def load_saved_prompt(title): | |
| pass | |