Spaces:
Runtime error
Runtime error
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| import gc | |
| import sys | |
| from diffusers import FluxPipeline | |
| import time | |
| from sentence_transformers import SentenceTransformer | |
| import psutil | |
| import json | |
| import spaces | |
| from threading import Thread | |
| #----------------- | |
| from relatively_constant_variables import knowledge_base | |
| # Initialize the zero tensor on CUDA | |
| zero = torch.Tensor([0]).cuda() | |
| print(zero.device) # This will print 'cpu' outside the @spaces.GPU decorated function | |
| modelnames = ["stvlynn/Gemma-2-2b-Chinese-it", "unsloth/Llama-3.2-1B-Instruct", "unsloth/Llama-3.2-3B-Instruct", "nbeerbower/mistral-nemo-wissenschaft-12B", "princeton-nlp/gemma-2-9b-it-SimPO", "cognitivecomputations/dolphin-2.9.3-mistral-7B-32k", "01-ai/Yi-Coder-9B-Chat", "ArliAI/Llama-3.1-8B-ArliAI-RPMax-v1.1", "ArliAI/Phi-3.5-mini-3.8B-ArliAI-RPMax-v1.1", | |
| "Qwen/Qwen2.5-7B-Instruct", "Qwen/Qwen2-0.5B-Instruct", "Qwen/Qwen2-1.5B-Instruct", "Qwen/Qwen2-7B-Instruct", "Qwen/Qwen1.5-MoE-A2.7B-Chat", "HuggingFaceTB/SmolLM-135M-Instruct", "microsoft/Phi-3-mini-4k-instruct", "Groq/Llama-3-Groq-8B-Tool-Use", "hugging-quants/Meta-Llama-3.1-8B-Instruct-BNB-NF4", | |
| "SpectraSuite/TriLM_3.9B_Unpacked", "h2oai/h2o-danube3-500m-chat", "OuteAI/Lite-Mistral-150M-v2-Instruct", "Zyphra/Zamba2-1.2B", "anthracite-org/magnum-v2-4b", ] | |
| # imagemodelnames = ["black-forest-labs/FLUX.1-schnell"] | |
| current_model_index = 0 | |
| current_image_model_index = 0 | |
| modelname = modelnames[current_model_index] | |
| # imagemodelname = imagemodelnames[current_image_model_index] | |
| lastmodelnameinloadfunction = None | |
| lastimagemodelnameinloadfunction = None | |
| # Load the embedding model | |
| embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| # Initialize model and tokenizer as global variables | |
| model = None | |
| tokenizer = None | |
| # flux_pipe = None | |
| # Dictionary to store loaded models | |
| loaded_models = {} | |
| def get_size_str(bytes): | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
| if bytes < 1024: | |
| return f"{bytes:.2f} {unit}" | |
| bytes /= 1024 | |
| def load_model(model_name): | |
| global model, tokenizer, lastmodelnameinloadfunction, loaded_models | |
| print(f"Loading model and tokenizer: {model_name}") | |
| # Record initial GPU memory usage | |
| initial_memory = torch.cuda.memory_allocated() | |
| # Clear old model and tokenizer if they exist | |
| if 'model' in globals() and model is not None: | |
| model = None | |
| if 'tokenizer' in globals() and tokenizer is not None: | |
| tokenizer = None | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype="auto", | |
| device_map="auto" | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model_size = sum(p.numel() * p.element_size() for p in model.parameters()) | |
| tokenizer_size = sum(sys.getsizeof(v) for v in tokenizer.__dict__.values()) | |
| loaded_models[model_name] = (model, tokenizer) | |
| # Calculate memory usage | |
| final_memory = torch.cuda.memory_allocated() | |
| memory_used = final_memory - initial_memory | |
| loaded_models[model_name] = [str(time.time()), memory_used] | |
| lastmodelnameinloadfunction = (model_name, model_size, tokenizer_size) | |
| print(f"Model and tokenizer {model_name} loaded successfully") | |
| print(f"Model size: {get_size_str(model_size)}") | |
| print(f"Tokenizer size: {get_size_str(tokenizer_size)}") | |
| print(f"GPU memory used: {get_size_str(memory_used)}") | |
| return (f"Model and tokenizer {model_name} loaded successfully. " | |
| f"Model size: {get_size_str(model_size)}, " | |
| f"Tokenizer size: {get_size_str(tokenizer_size)}, " | |
| f"GPU memory used: {get_size_str(memory_used)}") | |
| # def load_image_model(imagemodelname): | |
| # global flux_pipe, lastimagemodelnameinloadfunction, loaded_models | |
| # print(f"Loading image model: {imagemodelname}") | |
| # # Record initial GPU memory usage | |
| # initial_memory = torch.cuda.memory_allocated() | |
| # if 'flux_pipe' in globals() and flux_pipe is not None: | |
| # flux_pipe = None | |
| # torch.cuda.empty_cache() | |
| # gc.collect() | |
| # flux_pipe = FluxPipeline.from_pretrained(imagemodelname, torch_dtype=torch.bfloat16) | |
| # flux_pipe.enable_model_cpu_offload() | |
| # model_size = sum(p.numel() * p.element_size() for p in flux_pipe.transformer.parameters()) | |
| # #tokenizer_size = 0 # FLUX doesn't use a separate tokenizer | |
| # loaded_models[imagemodelname] = flux_pipe | |
| # # Calculate memory usage | |
| # final_memory = torch.cuda.memory_allocated() | |
| # memory_used = final_memory - initial_memory | |
| # loaded_models[imagemodelname] = [str(time.time()), memory_used] | |
| # lastimagemodelnameinloadfunction = (imagemodelname, model_size) #, tokenizer_size) | |
| # print(f"Model and tokenizer {imagemodelname} loaded successfully") | |
| # print(f"Model size: {get_size_str(model_size)}") | |
| # #print(f"Tokenizer size: {get_size_str(tokenizer_size)}") | |
| # print(f"GPU memory used: {get_size_str(memory_used)}") | |
| # return (f"Model and tokenizer {imagemodelname} loaded successfully. " | |
| # f"Model size: {get_size_str(model_size)}, " | |
| # #f"Tokenizer size: {get_size_str(tokenizer_size)}, " | |
| # f"GPU memory used: {get_size_str(memory_used)}") | |
| def clear_all_models(): | |
| global model, tokenizer, flux_pipe, loaded_models | |
| for model_name, model_obj in loaded_models.items(): | |
| if isinstance(model_obj, tuple): | |
| model_obj[0].to('cpu') | |
| del model_obj[0] | |
| del model_obj[1] | |
| else: | |
| model_obj.to('cpu') | |
| del model_obj | |
| model = None | |
| tokenizer = None | |
| flux_pipe = None | |
| loaded_models.clear() | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return "All models cleared from memory." | |
| def load_model_list(model_list): | |
| messages = [] | |
| for model_name in model_list: | |
| message = load_model(model_name) | |
| messages.append(message) | |
| return "\n".join(messages) | |
| def loaded_model_list(): | |
| global loaded_models | |
| return loaded_models | |
| # Initial model load | |
| load_model(modelname) | |
| # load_image_model(imagemodelname) | |
| # Create embeddings for the knowledge base | |
| knowledge_base_embeddings = embedding_model.encode([doc["content"] for doc in knowledge_base]) | |
| def retrieve(query, k=2): | |
| query_embedding = embedding_model.encode([query]) | |
| similarities = torch.nn.functional.cosine_similarity(torch.tensor(query_embedding), torch.tensor(knowledge_base_embeddings)) | |
| top_k_indices = similarities.argsort(descending=True)[:k] | |
| return [(knowledge_base[i]["content"], knowledge_base[i]["id"]) for i in top_k_indices] | |
| def get_ram_usage(): | |
| ram = psutil.virtual_memory() | |
| return f"RAM Usage: {ram.percent:.2f}%, Available: {ram.available / (1024 ** 3):.2f}GB, Total: {ram.total / (1024 ** 3):.2f}GB" | |
| # Global dictionary to store outputs | |
| output_dict = {} | |
| def empty_output_dict(): | |
| global output_dict | |
| output_dict = {} | |
| print("Output dictionary has been emptied.") | |
| def get_model_details(model): | |
| return { | |
| "name": model.config.name_or_path, | |
| "architecture": model.config.architectures[0] if model.config.architectures else "Unknown", | |
| "num_parameters": sum(p.numel() for p in model.parameters()), | |
| } | |
| def get_tokenizer_details(tokenizer): | |
| return { | |
| "name": tokenizer.__class__.__name__, | |
| "vocab_size": tokenizer.vocab_size, | |
| "model_max_length": tokenizer.model_max_length, | |
| } | |
| def generate_response(prompt, use_rag, stream=False): | |
| global output_dict, model, tokenizer | |
| print(zero.device) # This will print 'cuda:0' inside the @spaces.GPU decorated function | |
| torch.cuda.empty_cache() | |
| print(dir(model)) | |
| if use_rag: | |
| retrieved_docs = retrieve(prompt) | |
| context = " ".join([doc for doc, _ in retrieved_docs]) | |
| doc_ids = [doc_id for _, doc_id in retrieved_docs] | |
| full_prompt = f"Context: {context}\nQuestion: {prompt}\nAnswer:" | |
| else: | |
| full_prompt = prompt | |
| doc_ids = None | |
| messages = [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": full_prompt} | |
| ] | |
| text = tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| model_inputs = tokenizer([text], return_tensors="pt").to(zero.device) | |
| start_time = time.time() | |
| total_tokens = 0 | |
| print(output_dict) | |
| output_key = f"output_{len(output_dict) + 1}" | |
| print(output_key) | |
| output_dict[output_key] = { | |
| "input_prompt": prompt, | |
| "full_prompt": full_prompt, | |
| "use_rag": use_rag, | |
| "generated_text": "", | |
| "tokens_per_second": 0, | |
| "ram_usage": "", | |
| "doc_ids": doc_ids if doc_ids else "N/A", | |
| "model_details": get_model_details(model), | |
| "tokenizer_details": get_tokenizer_details(tokenizer), | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) | |
| } | |
| print(output_dict) | |
| if stream: | |
| streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True) | |
| generation_kwargs = dict( | |
| model_inputs, | |
| streamer=streamer, | |
| max_new_tokens=512, | |
| temperature=0.7, | |
| ) | |
| thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| for new_text in streamer: | |
| output_dict[output_key]["generated_text"] += new_text | |
| total_tokens += 1 | |
| current_time = time.time() | |
| tokens_per_second = total_tokens / (current_time - start_time) | |
| ram_usage = get_ram_usage() | |
| output_dict[output_key]["tokens_per_second"] = f"{tokens_per_second:.2f}" | |
| output_dict[output_key]["ram_usage"] = ram_usage | |
| yield (output_dict[output_key]["generated_text"], | |
| output_dict[output_key]["tokens_per_second"], | |
| output_dict[output_key]["ram_usage"], | |
| output_dict[output_key]["doc_ids"]) | |
| else: | |
| generated_ids = model.generate( | |
| model_inputs.input_ids, | |
| max_new_tokens=512 | |
| ) | |
| generated_ids = [ | |
| output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) | |
| ] | |
| response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| total_tokens = len(generated_ids[0]) | |
| end_time = time.time() | |
| tokens_per_second = total_tokens / (end_time - start_time) | |
| ram_usage = get_ram_usage() | |
| output_dict[output_key]["generated_text"] = response | |
| output_dict[output_key]["tokens_per_second"] = f"{tokens_per_second:.2f}" | |
| output_dict[output_key]["ram_usage"] = ram_usage | |
| print(output_dict) | |
| yield (output_dict[output_key]["generated_text"], | |
| output_dict[output_key]["tokens_per_second"], | |
| output_dict[output_key]["ram_usage"], | |
| output_dict[output_key]["doc_ids"]) | |
| def generate_image(prompt): | |
| global output_dict, flux_pipe | |
| print(dir(flux_pipe)) | |
| # Generate image using FLUX | |
| image = flux_pipe( | |
| prompt, | |
| guidance_scale=0.0, | |
| num_inference_steps=4, | |
| max_sequence_length=256, | |
| generator=torch.Generator("cpu").manual_seed(0) | |
| ).images[0] | |
| image_path = f"flux_output_{time.time()}.png" | |
| print(image_path) | |
| image.save(image_path) | |
| ram_usage = get_ram_usage() | |
| return image_path, ram_usage, image_path | |
| def get_output_details(output_key): | |
| if output_key in output_dict: | |
| return output_dict[output_key] | |
| else: | |
| return f"No output found for key: {output_key}" | |
| # Update the switch_model function to return the load_model message | |
| def switch_model(choice): | |
| global modelname | |
| modelname = choice | |
| load_message = load_model(modelname) | |
| return load_message, f"Current model: {modelname}" | |
| # Update the model_change_handler function | |
| def model_change_handler(choice): | |
| message, current_model = switch_model(choice) | |
| return message, current_model, message # Use the same message for both outputs | |
| def format_output_dict(): | |
| global output_dict | |
| formatted_output = "" | |
| for key, value in output_dict.items(): | |
| formatted_output += f"Key: {key}\n" | |
| formatted_output += json.dumps(value, indent=2) | |
| formatted_output += "\n\n" | |
| print(formatted_output) | |
| return formatted_output |