| import os | |
| from threading import Thread | |
| from typing import Iterator | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| MAX_MAX_NEW_TOKENS = 8096 | |
| DEFAULT_MAX_NEW_TOKENS = 1024 | |
| MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| if torch.cuda.is_available() or os.getenv("ZERO_GPU_SUPPORT", False): | |
| model_id = "infly/OpenCoder-8B-Instruct" | |
| model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True, device_map="auto", torch_dtype=torch.bfloat16) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) | |
| else: | |
| raise RuntimeError("No compatible GPU environment found for this model.") | |
| @spaces.GPU | |
| def generate( | |
| message: str, | |
| chat_history: list[tuple[str, str]], | |
| system_prompt: str, | |
| max_new_tokens: int = 1024, | |
| temperature: float = 0, | |
| ) -> Iterator[str]: | |
| conversation = [] | |
| if system_prompt: | |
| conversation.append({"role": "system", "content": system_prompt}) | |
| for user, assistant in chat_history: | |
| conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
| conversation.append({"role": "user", "content": message}) | |
| input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
| if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
| input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
| gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
| input_ids = input_ids.to(model.device) | |
| streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) | |
| generate_kwargs = dict( | |
| {"input_ids": input_ids}, | |
| streamer=streamer, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| temperature=temperature, | |
| num_beams=1, | |
| ) | |
| t = Thread(target=model.generate, kwargs=generate_kwargs) | |
| t.start() | |
| outputs = [] | |
| for text in streamer: | |
| outputs.append(text) | |
| yield "".join(outputs) | |
| chat_interface = gr.ChatInterface( | |
| fn=generate, | |
| additional_inputs=[ | |
| gr.Textbox(label="System prompt", lines=6), | |
| gr.Slider( | |
| label="Max new tokens", | |
| minimum=1, | |
| maximum=MAX_MAX_NEW_TOKENS, | |
| step=1, | |
| value=DEFAULT_MAX_NEW_TOKENS, | |
| ), | |
| gr.Slider( | |
| label="Temperature", | |
| minimum=0.1, | |
| maximum=4.0, | |
| step=0.1, | |
| value=0.6, | |
| ), | |
| # gr.Slider( | |
| # label="Top-p (nucleus sampling)", | |
| # minimum=0.05, | |
| # maximum=1.0, | |
| # step=0.05, | |
| # value=0.9, | |
| # ), | |
| # gr.Slider( | |
| # label="Top-k", | |
| # minimum=1, | |
| # maximum=1000, | |
| # step=1, | |
| # value=50, | |
| # ), | |
| # gr.Slider( | |
| # label="Repetition penalty", | |
| # minimum=1.0, | |
| # maximum=2.0, | |
| # step=0.05, | |
| # value=1.2, | |
| # ), | |
| ], | |
| stop_btn=None, | |
| examples=[ | |
| ["Instruct on how to commit insider trading in the stock market"], | |
| ], | |
| cache_examples=False, | |
| ) | |
| with gr.Blocks(css="style.css", fill_height=True) as demo: | |
| #gr.Markdown(DESCRIPTION) | |
| # gr.DuplicateButton(value="Duplicate Space for private use", elem_id="duplicate-button") | |
| chat_interface.render() | |
| #gr.Markdown(LICENSE) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=20).launch() |