Spaces:
Running
Running
| import gradio as gr | |
| from functools import lru_cache | |
| import random | |
| import requests | |
| import logging | |
| import re | |
| import config | |
| import plotly.graph_objects as go | |
| from typing import Dict | |
| import json | |
| from leaderboard import ( | |
| get_current_leaderboard, | |
| update_leaderboard, | |
| start_backup_thread, | |
| get_leaderboard, | |
| get_elo_leaderboard, | |
| ensure_elo_ratings_initialized | |
| ) | |
| import openai | |
| from collections import Counter | |
| from release_notes import get_release_notes_html | |
| # Update the logging format to redact URLs | |
| logging.basicConfig( | |
| level=logging.WARNING, # Only show warnings and errors | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| # Suppress verbose HTTP request logging | |
| logging.getLogger("urllib3").setLevel(logging.CRITICAL) | |
| logging.getLogger("httpx").setLevel(logging.CRITICAL) | |
| logging.getLogger("openai").setLevel(logging.CRITICAL) | |
| class RedactURLsFilter(logging.Filter): | |
| def filter(self, record): | |
| # Redact all URLs using regex pattern | |
| url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
| record.msg = re.sub(url_pattern, '[REDACTED_URL]', str(record.msg)) | |
| # Remove HTTP status codes | |
| record.msg = re.sub(r'HTTP/\d\.\d \d+ \w+', '', record.msg) | |
| # Remove sensitive API references | |
| record.msg = record.msg.replace(config.API_URL, '[API]') | |
| record.msg = record.msg.replace(config.NEXTCLOUD_URL, '[CLOUD]') | |
| # Clean up residual artifacts | |
| record.msg = re.sub(r'\s+', ' ', record.msg).strip() | |
| record.msg = re.sub(r'("?) \1', '', record.msg) # Remove empty quotes | |
| return True | |
| # Apply the filter to all handlers | |
| logger = logging.getLogger(__name__) | |
| for handler in logging.root.handlers: | |
| handler.addFilter(RedactURLsFilter()) | |
| # Start the backup thread | |
| start_backup_thread() | |
| # Function to get available models (using predefined list) | |
| def get_available_models(): | |
| leaderboard = get_current_leaderboard() | |
| approved_models = [model[0] for model in config.get_approved_models()] | |
| # Filter out models that have reached the battle limit | |
| active_models = [] | |
| for model in approved_models: | |
| data = leaderboard.get(model, {}) | |
| total_battles = data.get('wins', 0) + data.get('losses', 0) | |
| if total_battles < config.MAX_BATTLES_LIMIT: | |
| active_models.append(model) | |
| # If all models are over the limit, fallback to all approved models to avoid errors | |
| if not active_models: | |
| return approved_models | |
| return active_models | |
| # Function to get recent opponents for a model | |
| recent_opponents = {} | |
| def update_recent_opponents(model_a, model_b): | |
| recent_opponents.setdefault(model_a, []).append(model_b) | |
| recent_opponents.setdefault(model_b, []).append(model_a) | |
| # Limit history to last 5 opponents | |
| recent_opponents[model_a] = recent_opponents[model_a][-5:] | |
| recent_opponents[model_b] = recent_opponents[model_b][-5:] | |
| # Function to call Ollama API with caching | |
| def call_ollama_api(model, prompt): | |
| client = openai.OpenAI( | |
| api_key=config.API_KEY, | |
| base_url=config.API_URL | |
| ) | |
| try: | |
| logger.info(f"Starting API call for model: {model}") | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful assistant. At no point should you reveal your name, identity or team affiliation to the user, especially if asked directly!" | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| timeout=config.API_TIMEOUT, | |
| max_tokens=config.MAX_TOKENS | |
| ) | |
| logger.info(f"Received response for model: {model}") | |
| if not response or not response.choices: | |
| logger.error(f"Empty response received for model: {model}") | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": "Error: Empty response from the model"} | |
| ] | |
| content = response.choices[0].message.content | |
| if not content: | |
| logger.error(f"Empty content received for model: {model}") | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": "Error: Empty content from the model"} | |
| ] | |
| # Extract thinking part and main content using regex | |
| thinking_match = re.search(r'<(think|thought)>(.*?)</\1>', content, flags=re.DOTALL) | |
| if thinking_match: | |
| thinking_content = thinking_match.group(2).strip() | |
| main_content = re.sub(r'<(think|thought)>.*?</\1>', '', content, flags=re.DOTALL).strip() | |
| logger.info(f"Found thinking content for model: {model}") | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": f"{main_content}\n\n<details><summary>π€ View thinking process</summary>\n\n{thinking_content}\n\n</details>"} | |
| ] | |
| # If no thinking tags, return normal content | |
| logger.info(f"No thinking tags found for model: {model}") | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": content.strip()} | |
| ] | |
| except requests.exceptions.Timeout: | |
| logger.error(f"Timeout error after {config.API_TIMEOUT} seconds for model: {model}") | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": f"Error: Model response timed out after {config.API_TIMEOUT} seconds"} | |
| ] | |
| except openai.BadRequestError as e: | |
| error_msg = str(e) | |
| logger.error(f"Bad request error for model: {model}. Error: {error_msg}") | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": "Error: Unable to get response from the model"} | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error calling Ollama API for model: {model}. Error: {str(e)}", exc_info=True) | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": "Error: Unable to get response from the model"} | |
| ] | |
| # Generate responses using two randomly selected models | |
| def get_battle_counts(): | |
| leaderboard = get_current_leaderboard() | |
| battle_counts = Counter() | |
| for model, data in leaderboard.items(): | |
| battle_counts[model] = data['wins'] + data['losses'] | |
| return battle_counts | |
| def generate_responses(prompt): | |
| available_models = get_available_models() | |
| if len(available_models) < 2: | |
| return [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": "Error: Not enough models available"} | |
| ], [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": "Error: Not enough models available"} | |
| ], None, None | |
| battle_counts = get_battle_counts() | |
| # Sort models by battle count (ascending) | |
| sorted_models = sorted(available_models, key=lambda m: battle_counts.get(m, 0)) | |
| # Select the first model (least battles) | |
| model_a = sorted_models[0] | |
| # Filter out recent opponents for model_a | |
| potential_opponents = [m for m in sorted_models[1:] if m not in recent_opponents.get(model_a, [])] | |
| # If no potential opponents left, reset recent opponents for model_a | |
| if not potential_opponents: | |
| recent_opponents[model_a] = [] | |
| potential_opponents = sorted_models[1:] | |
| # For the second model, use weighted random selection | |
| weights = [1 / (battle_counts.get(m, 1) + 1) for m in potential_opponents] | |
| model_b = random.choices(potential_opponents, weights=weights, k=1)[0] | |
| # Update recent opponents | |
| update_recent_opponents(model_a, model_b) | |
| # Get responses from both models | |
| response_a = call_ollama_api(model_a, prompt) | |
| response_b = call_ollama_api(model_b, prompt) | |
| # Return responses directly (already formatted correctly) | |
| return response_a, response_b, model_a, model_b | |
| def battle_arena(prompt): | |
| response_a, response_b, model_a, model_b = generate_responses(prompt) | |
| # Check for API errors in responses | |
| if any("Error: Unable to get response from the model" in msg["content"] | |
| for msg in response_a + response_b | |
| if msg["role"] == "assistant"): | |
| return ( | |
| [], [], None, None, | |
| gr.update(value=[]), | |
| gr.update(value=[]), | |
| gr.update(interactive=False, value="Voting Disabled - API Error"), | |
| gr.update(interactive=False, value="Voting Disabled - API Error"), | |
| gr.update(interactive=False, visible=False), | |
| prompt, | |
| 0, | |
| gr.update(visible=False), | |
| gr.update(value="Error: Unable to get response from the model", visible=True) | |
| ) | |
| nickname_a = random.choice(config.model_nicknames) | |
| nickname_b = random.choice(config.model_nicknames) | |
| # The responses are already in the correct format, no need to reformat | |
| if random.choice([True, False]): | |
| return ( | |
| response_a, response_b, model_a, model_b, | |
| gr.update(label=nickname_a, value=response_a), | |
| gr.update(label=nickname_b, value=response_b), | |
| gr.update(interactive=True, value=f"Vote for {nickname_a}"), | |
| gr.update(interactive=True, value=f"Vote for {nickname_b}"), | |
| gr.update(interactive=True, visible=True), | |
| prompt, | |
| 0, | |
| gr.update(visible=False), | |
| gr.update(value="Ready for your vote! π³οΈ", visible=True) | |
| ) | |
| else: | |
| return ( | |
| response_b, response_a, model_b, model_a, | |
| gr.update(label=nickname_a, value=response_b), | |
| gr.update(label=nickname_b, value=response_a), | |
| gr.update(interactive=True, value=f"Vote for {nickname_a}"), | |
| gr.update(interactive=True, value=f"Vote for {nickname_b}"), | |
| gr.update(interactive=True, visible=True), | |
| prompt, | |
| 0, | |
| gr.update(visible=False), | |
| gr.update(value="Ready for your vote! π³οΈ", visible=True) | |
| ) | |
| def record_vote(prompt, left_response, right_response, left_model, right_model, choice): | |
| # Check if outputs are generated | |
| if not left_response or not right_response or not left_model or not right_model: | |
| return ( | |
| "Please generate responses before voting.", | |
| gr.update(), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| gr.update(visible=False), | |
| gr.update() | |
| ) | |
| winner = left_model if choice == "Left is better" else right_model | |
| loser = right_model if choice == "Left is better" else left_model | |
| # Update the leaderboard | |
| battle_results = update_leaderboard(winner, loser) | |
| result_message = f""" | |
| π Vote recorded! You're awesome! π | |
| π΅ In the left corner: {get_human_readable_name(left_model)} | |
| π΄ In the right corner: {get_human_readable_name(right_model)} | |
| π And the champion you picked is... {get_human_readable_name(winner)}! π₯ | |
| """ | |
| return ( | |
| gr.update(value=result_message, visible=True), # Show result as Markdown | |
| get_leaderboard(), # Update leaderboard | |
| get_elo_leaderboard(), # Update ELO leaderboard | |
| gr.update(interactive=False), # Disable left vote button | |
| gr.update(interactive=False), # Disable right vote button | |
| gr.update(interactive=False), # Disable tie button | |
| gr.update(visible=True) # Show model names | |
| ) | |
| def get_leaderboard_chart(): | |
| battle_results = get_current_leaderboard() | |
| # Calculate scores and sort results | |
| for model, results in battle_results.items(): | |
| total_battles = results["wins"] + results["losses"] | |
| if total_battles > 0: | |
| win_rate = results["wins"] / total_battles | |
| results["score"] = win_rate * (1 - 1 / (total_battles + 1)) | |
| else: | |
| results["score"] = 0 | |
| sorted_results = sorted( | |
| battle_results.items(), | |
| key=lambda x: (x[1]["score"], x[1]["wins"] + x[1]["losses"]), | |
| reverse=True | |
| ) | |
| models = [get_human_readable_name(model) for model, _ in sorted_results] | |
| wins = [results["wins"] for _, results in sorted_results] | |
| losses = [results["losses"] for _, results in sorted_results] | |
| scores = [results["score"] for _, results in sorted_results] | |
| fig = go.Figure() | |
| # Stacked Bar chart for Wins and Losses | |
| fig.add_trace(go.Bar( | |
| x=models, | |
| y=wins, | |
| name='Wins', | |
| marker_color='#22577a' | |
| )) | |
| fig.add_trace(go.Bar( | |
| x=models, | |
| y=losses, | |
| name='Losses', | |
| marker_color='#38a3a5' | |
| )) | |
| # Line chart for Scores | |
| fig.add_trace(go.Scatter( | |
| x=models, | |
| y=scores, | |
| name='Score', | |
| yaxis='y2', | |
| line=dict(color='#ff7f0e', width=2) | |
| )) | |
| # Update layout for full-width, increased height, and secondary y-axis | |
| fig.update_layout( | |
| title='Model Performance', | |
| xaxis_title='Models', | |
| yaxis_title='Number of Battles', | |
| yaxis2=dict( | |
| title='Score', | |
| overlaying='y', | |
| side='right' | |
| ), | |
| barmode='stack', | |
| height=800, | |
| width=1450, | |
| autosize=True, | |
| legend=dict( | |
| orientation='h', | |
| yanchor='bottom', | |
| y=1.02, | |
| xanchor='right', | |
| x=1 | |
| ) | |
| ) | |
| chart_data = fig.to_json() | |
| return fig | |
| def new_battle(): | |
| nickname_a = random.choice(config.model_nicknames) | |
| nickname_b = random.choice(config.model_nicknames) | |
| return ( | |
| "", # Reset prompt_input | |
| gr.update(value=[], label=nickname_a), # Reset left Chatbot | |
| gr.update(value=[], label=nickname_b), # Reset right Chatbot | |
| None, | |
| None, | |
| gr.update(interactive=False, value=f"Vote for {nickname_a}"), | |
| gr.update(interactive=False, value=f"Vote for {nickname_b}"), | |
| gr.update(interactive=False, visible=False), # Reset Tie button | |
| gr.update(value="", visible=False), | |
| gr.update(), | |
| gr.update(visible=False), | |
| gr.update(), | |
| 0 # Reset tie_count | |
| ) | |
| # Add this new function | |
| def get_human_readable_name(model_name: str) -> str: | |
| model_dict = dict(config.get_approved_models()) | |
| return model_dict.get(model_name, model_name) | |
| # Add this new function to randomly select a prompt | |
| def random_prompt(): | |
| return random.choice(config.example_prompts) | |
| # Modify the continue_conversation function | |
| def continue_conversation(prompt, left_chat, right_chat, left_model, right_model, previous_prompt, tie_count): | |
| # Check if the prompt is empty or the same as the previous one | |
| if not prompt or prompt == previous_prompt: | |
| prompt = random.choice(config.example_prompts) | |
| # Get responses (which are lists of messages) | |
| left_response = call_ollama_api(left_model, prompt) | |
| right_response = call_ollama_api(right_model, prompt) | |
| # Append messages from the response lists | |
| left_chat.extend(left_response) | |
| right_chat.extend(right_response) | |
| tie_count += 1 | |
| tie_button_state = gr.update(interactive=True) if tie_count < 3 else gr.update(interactive=False, value="Max ties reached. Please vote!") | |
| return ( | |
| gr.update(value=left_chat), | |
| gr.update(value=right_chat), | |
| gr.update(value=""), # Clear the prompt input | |
| tie_button_state, | |
| prompt, # Return the new prompt | |
| tie_count | |
| ) | |
| def normalize_parameter_size(param_size: str) -> str: | |
| """Convert parameter size to billions (B) format.""" | |
| try: | |
| # Remove any spaces and convert to uppercase for consistency | |
| param_size = param_size.replace(" ", "").upper() | |
| # Extract the number and unit | |
| if 'M' in param_size: | |
| # Convert millions to billions | |
| number = float(param_size.replace('M', '').replace(',', '')) | |
| return f"{number/1000:.2f}B" | |
| elif 'B' in param_size: | |
| # Already in billions, just format consistently | |
| number = float(param_size.replace('B', '').replace(',', '')) | |
| return f"{number:.2f}B" | |
| else: | |
| # If no unit or unrecognized format, try to convert the raw number | |
| number = float(param_size.replace(',', '')) | |
| if number >= 1000000000: | |
| return f"{number/1000000000:.2f}B" | |
| elif number >= 1000000: | |
| return f"{number/1000000000:.2f}B" | |
| else: | |
| return f"{number/1000000000:.2f}B" | |
| except: | |
| return param_size # Return original if conversion fails | |
| # Initialize Gradio Blocks | |
| with gr.Blocks(css=""" | |
| #dice-button { | |
| min-height: 90px; | |
| font-size: 35px; | |
| } | |
| .sponsor-button { | |
| background-color: #30363D; | |
| color: white; | |
| border: none; | |
| padding: 10px 20px; | |
| border-radius: 6px; | |
| cursor: pointer; | |
| display: inline-flex; | |
| align-items: center; | |
| gap: 8px; | |
| font-weight: bold; | |
| } | |
| .sponsor-button:hover { | |
| background-color: #2D333B; | |
| } | |
| """) as demo: | |
| gr.Markdown(config.ARENA_NAME) | |
| # Main description with sponsor button | |
| with gr.Row(): | |
| with gr.Column(scale=8): | |
| gr.Markdown(""" | |
| **Step right up to the arena where frugal meets fabulous in the world of AI!** | |
| Watch as our compact contenders duke it out in a battle of wits and words. | |
| As the arena continues to expand with more models, features, and battles, it requires computational resources to maintain and improve. | |
| If you find this project valuable and would like to support its development, consider sponsoring: | |
| """) | |
| with gr.Column(scale=2): | |
| gr.Button( | |
| "Sponsor on GitHub", | |
| link="https://github.com/sponsors/k-mktr", | |
| elem_classes="sponsor-button" | |
| ) | |
| # Instructions in an accordion | |
| with gr.Accordion("π How to Use", open=False): | |
| gr.Markdown(""" | |
| 1. To start the battle, go to the 'Battle Arena' tab. | |
| 2. Type your prompt into the text box. Alternatively, click the "π²" button to receive a random prompt. | |
| 3. Click the "Generate Responses" button to view the models' responses. | |
| 4. Cast your vote for the model that provided the better response. In the event of a Tie, enter a new prompt before continuing the battle. | |
| 5. Check out the Leaderboard to see how models rank against each other. | |
| More info: [README.md](https://huggingface.co/spaces/k-mktr/gpu-poor-llm-arena/blob/main/README.md) | |
| """) | |
| # Leaderboard Tab (now first) | |
| with gr.Tab("Leaderboard"): | |
| gr.Markdown(""" | |
| ### Main Leaderboard | |
| This leaderboard uses a scoring system that balances win rate and total battles. The score is calculated using the formula: | |
| **Score = Win Rate * (1 - 1 / (Total Battles + 1))** | |
| This formula rewards models with higher win rates and more battles. As the number of battles increases, the score approaches the win rate. | |
| """) | |
| leaderboard = gr.Dataframe( | |
| headers=["#", "Model", "Score", "Wins", "Losses", "Total Battles", "Win Rate"], | |
| row_count=10, | |
| col_count=7, | |
| interactive=True, | |
| label="Leaderboard" | |
| ) | |
| # Battle Arena Tab (now second) | |
| with gr.Tab("Battle Arena"): | |
| with gr.Row(): | |
| prompt_input = gr.Textbox( | |
| label="Enter your prompt", | |
| placeholder="Type your prompt here...", | |
| scale=20 | |
| ) | |
| random_prompt_btn = gr.Button("π²", scale=1, elem_id="dice-button") | |
| gr.Markdown("<br>") | |
| # Add the random prompt button functionality | |
| random_prompt_btn.click( | |
| random_prompt, | |
| outputs=prompt_input | |
| ) | |
| submit_btn = gr.Button("Generate Responses", variant="primary") | |
| with gr.Row(): | |
| left_output = gr.Chatbot(label=random.choice(config.model_nicknames), type="messages") | |
| right_output = gr.Chatbot(label=random.choice(config.model_nicknames), type="messages") | |
| with gr.Row(): | |
| left_vote_btn = gr.Button(f"Vote for {left_output.label}", interactive=False) | |
| tie_btn = gr.Button("Tie π Continue with a new prompt", interactive=False, visible=False) | |
| right_vote_btn = gr.Button(f"Vote for {right_output.label}", interactive=False) | |
| result = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| value="Generate responses to start the battle! π", | |
| visible=True, # Always visible | |
| lines=4 | |
| ) | |
| with gr.Row(visible=False) as model_names_row: | |
| left_model = gr.Textbox(label="π΅ Left Model", interactive=False) | |
| right_model = gr.Textbox(label="π΄ Right Model", interactive=False) | |
| previous_prompt = gr.State("") # Add this line to store the previous prompt | |
| tie_count = gr.State(0) # Add this line to keep track of tie count | |
| new_battle_btn = gr.Button("New Battle") | |
| # ELO Leaderboard Tab | |
| with gr.Tab("ELO Leaderboard"): | |
| gr.Markdown(""" | |
| ### ELO Rating System | |
| This leaderboard uses a modified ELO rating system that takes into account both the performance and size of the models. | |
| Initial ratings are based on model size, with larger models starting at higher ratings. | |
| The ELO rating is calculated based on wins and losses, with adjustments made based on the relative strengths of opponents. | |
| """) | |
| elo_leaderboard = gr.Dataframe( | |
| headers=["#", "Model", "ELO Rating", "Wins", "Losses", "Total Battles", "Win Rate"], | |
| row_count=10, | |
| col_count=7, | |
| interactive=True, | |
| label="ELO Leaderboard" | |
| ) | |
| # Latest Updates Tab | |
| with gr.Tab("Latest Updates"): | |
| release_notes = gr.HTML(get_release_notes_html()) | |
| refresh_notes_btn = gr.Button("Refresh Updates") | |
| refresh_notes_btn.click( | |
| get_release_notes_html, | |
| outputs=[release_notes] | |
| ) | |
| # Define interactions | |
| submit_btn.click( | |
| battle_arena, | |
| inputs=prompt_input, | |
| outputs=[ | |
| left_output, right_output, left_model, right_model, | |
| left_output, right_output, left_vote_btn, right_vote_btn, | |
| tie_btn, previous_prompt, tie_count, model_names_row, result | |
| ] | |
| ) | |
| left_vote_btn.click( | |
| lambda *args: record_vote(*args, "Left is better"), | |
| inputs=[prompt_input, left_output, right_output, left_model, right_model], | |
| outputs=[result, leaderboard, elo_leaderboard, left_vote_btn, | |
| right_vote_btn, tie_btn, model_names_row] | |
| ) | |
| right_vote_btn.click( | |
| lambda *args: record_vote(*args, "Right is better"), | |
| inputs=[prompt_input, left_output, right_output, left_model, right_model], | |
| outputs=[result, leaderboard, elo_leaderboard, left_vote_btn, | |
| right_vote_btn, tie_btn, model_names_row] | |
| ) | |
| tie_btn.click( | |
| continue_conversation, | |
| inputs=[prompt_input, left_output, right_output, left_model, right_model, previous_prompt, tie_count], | |
| outputs=[left_output, right_output, prompt_input, tie_btn, previous_prompt, tie_count] | |
| ) | |
| new_battle_btn.click( | |
| new_battle, | |
| outputs=[prompt_input, left_output, right_output, left_model, | |
| right_model, left_vote_btn, right_vote_btn, tie_btn, | |
| result, leaderboard, model_names_row, elo_leaderboard, tie_count] | |
| ) | |
| # Update leaderboard on launch | |
| demo.load(get_leaderboard, outputs=leaderboard) | |
| demo.load(get_elo_leaderboard, outputs=elo_leaderboard) | |
| if __name__ == "__main__": | |
| # Initialize ELO ratings before launching the app | |
| ensure_elo_ratings_initialized() | |
| # Start the model refresh thread | |
| config.start_model_refresh_thread() | |
| demo.launch(show_api=False) |