Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| #!/usr/bin/env python | |
| import gradio as gr | |
| import os | |
| import json | |
| import requests | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| from PIL import Image | |
| import base64 | |
| import hashlib | |
| DESCRIPTION = '''<h2 style='text-align: center'> <a href="https://github.com/THUDM/CogVLM2"> CogVLM2 </a></h2>''' | |
| NOTES = 'This app is adapted from <a href="https://github.com/THUDM/CogVLM">https://github.com/THUDM/CogVLM2</a> . It would be recommended to check out the repo if you want to see the detail of our model.\n\n该demo仅作为测试使用,不支持批量请求。如有大批量需求,欢迎联系[智谱AI](mailto:business@zhipuai.cn)。\n<a href="http://36.103.203.44:7861/">备用链接</a>' | |
| MAINTENANCE_NOTICE1 = 'Hint 1: If the app report "Something went wrong, connection error out", please turn off your proxy and retry.<br>Hint 2: If you upload a large size of image like 10MB, it may take some time to upload and process. Please be patient and wait.' | |
| default_chatbox = [("", "Hi, What do you want to know about this image?")] | |
| URL = os.environ.get("URL") | |
| def process_image_without_resize(image_prompt): | |
| image = Image.open(image_prompt) | |
| print(f"height:{image.height}, width:{image.width}") | |
| timestamp = time.time() | |
| file_ext = os.path.splitext(image_prompt)[1] | |
| filename = f"examples/{timestamp}{file_ext}" | |
| filename_grounding = f"examples/{timestamp}_grounding{file_ext}" | |
| image.save(filename) | |
| print(f"temporal filename {filename}") | |
| with open(filename, "rb") as image_file: | |
| bytes = base64.b64encode(image_file.read()) | |
| encoded_img = str(bytes, encoding='utf-8') | |
| image_hash = hashlib.sha256(bytes).hexdigest() | |
| os.remove(filename) | |
| return image, encoded_img, image_hash, filename_grounding | |
| def make_request(URL, headers, data): | |
| response = requests.request("POST", URL, headers=headers, data=data, timeout=(60, 100)) | |
| return response.json() | |
| def post( | |
| input_text, | |
| temperature, | |
| top_p, | |
| top_k, | |
| image_prompt, | |
| result_previous, | |
| hidden_image, | |
| is_english, | |
| ): | |
| result_text = [(ele[0], ele[1]) for ele in result_previous] | |
| for i in range(len(result_text)-1, -1, -1): | |
| if result_text[i][0] == "" or result_text[i][0] == None: | |
| del result_text[i] | |
| print(f"history {result_text}") | |
| is_zh = not is_english | |
| if image_prompt is None: | |
| print("Image empty") | |
| if is_zh: | |
| result_text.append((input_text, '图片为空!请上传图片并重试。')) | |
| else: | |
| result_text.append((input_text, 'Image empty! Please upload a image and retry.')) | |
| return input_text, result_text, hidden_image | |
| elif input_text == "": | |
| print("Text empty") | |
| result_text.append((input_text, 'Text empty! Please enter text and retry.')) | |
| return "", result_text, hidden_image | |
| headers = { | |
| "Content-Type": "application/json; charset=UTF-8", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36", | |
| } | |
| if image_prompt: | |
| pil_img, encoded_img, image_hash, image_path_grounding = process_image_without_resize(image_prompt) | |
| print(f"image_hash:{image_hash}, hidden_image_hash:{hidden_image}") | |
| if hidden_image is not None and image_hash != hidden_image: | |
| print("image has been update") | |
| result_text = [] | |
| hidden_image = image_hash | |
| else: | |
| encoded_img = None | |
| model_use = "vlm_chat" | |
| if not is_english: | |
| model_use = "vlm_chat_zh" | |
| prompt = input_text | |
| print(f'request {model_use} model... with prompt {prompt}') | |
| data = json.dumps({ | |
| 'model_use': model_use, | |
| 'text': prompt, | |
| 'history': result_text, | |
| 'image': encoded_img, | |
| 'temperature': temperature, | |
| 'top_p': top_p, | |
| 'top_k': top_k, | |
| 'do_sample': True, | |
| 'max_new_tokens': 2048 | |
| }) | |
| try: | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(make_request, URL, headers, data) | |
| # time.sleep(15) | |
| response = future.result() # Blocks until the request is complete | |
| # response = requests.request("POST", URL, headers=headers, data=data, timeout=(60, 100)).json() | |
| except Exception as e: | |
| print("error message", e) | |
| if is_zh: | |
| result_text.append((input_text, '超时!请稍等几分钟再重试。')) | |
| else: | |
| result_text.append((input_text, 'Timeout! Please wait a few minutes and retry.')) | |
| return "", result_text, hidden_image | |
| print('request done...') | |
| # response = {'result':input_text} | |
| answer = str(response['result']) | |
| result_text.append((input_text, answer)) | |
| print(result_text) | |
| print('finished') | |
| return "", result_text, hidden_image | |
| def clear_fn(value): | |
| return "", default_chatbox, None | |
| def clear_fn2(value): | |
| return default_chatbox | |
| def main(): | |
| gr.close_all() | |
| examples = [] | |
| with open("./examples/example_inputs.jsonl") as f: | |
| for line in f: | |
| data = json.loads(line) | |
| examples.append(data) | |
| with gr.Blocks(css='style.css') as demo: | |
| gr.Markdown(DESCRIPTION) | |
| gr.Markdown(NOTES) | |
| with gr.Row(): | |
| with gr.Column(scale=4.5): | |
| with gr.Group(): | |
| input_text = gr.Textbox(label='Input Text', placeholder='Please enter text prompt below and press ENTER.') | |
| with gr.Row(): | |
| run_button = gr.Button('Generate') | |
| clear_button = gr.Button('Clear') | |
| image_prompt = gr.Image(type="filepath", label="Image Prompt", value=None) | |
| with gr.Row(): | |
| is_english = gr.Checkbox(label="Use English Model") | |
| with gr.Row(): | |
| temperature = gr.Slider(maximum=1, value=0.8, minimum=0, label='Temperature') | |
| top_p = gr.Slider(maximum=1, value=0.4, minimum=0, label='Top P') | |
| top_k = gr.Slider(maximum=50, value=1, minimum=1, step=1, label='Top K') | |
| with gr.Column(scale=5.5): | |
| result_text = gr.components.Chatbot(label='Multi-round conversation History', value=[("", "Hi, What do you want to know about this image?")], height=550) | |
| hidden_image_hash = gr.Textbox(visible=False) | |
| gr_examples = gr.Examples(examples=[[example["text"], example["image"], example["is_english"]] for example in examples], | |
| inputs=[input_text, image_prompt, is_english], | |
| label="Example Inputs (Click to insert an examplet into the input box)", | |
| examples_per_page=6) | |
| gr.Markdown(MAINTENANCE_NOTICE1) | |
| print(gr.__version__) | |
| run_button.click(fn=post,inputs=[input_text, temperature, top_p, top_k, image_prompt, result_text, hidden_image_hash, is_english], | |
| outputs=[input_text, result_text, hidden_image_hash]) | |
| input_text.submit(fn=post,inputs=[input_text, temperature, top_p, top_k, image_prompt, result_text, hidden_image_hash, is_english], | |
| outputs=[input_text, result_text, hidden_image_hash]) | |
| clear_button.click(fn=clear_fn, inputs=clear_button, outputs=[input_text, result_text, image_prompt]) | |
| image_prompt.upload(fn=clear_fn2, inputs=clear_button, outputs=[result_text]) | |
| image_prompt.clear(fn=clear_fn2, inputs=clear_button, outputs=[result_text]) | |
| print(gr.__version__) | |
| demo.launch(max_threads=10) | |
| if __name__ == '__main__': | |
| main() |