Spaces:
Running
Running
| import asyncio | |
| from openai import AsyncOpenAI | |
| from collections import defaultdict | |
| import weave | |
| from pydantic import BaseModel | |
| from abc import ABC, abstractmethod | |
| import json | |
| from typing import Dict, List | |
| from datetime import datetime | |
| import backoff | |
| from openai import APITimeoutError, APIError, RateLimitError | |
| class FailureCategory(BaseModel): | |
| category_id: int | |
| category_name: str | |
| description: str | |
| class FailureCategories(BaseModel): | |
| failure_categories: list[FailureCategory] | |
| class TaskSummary(BaseModel): | |
| task_id: str | |
| summary: str | |
| class TaskClassification(BaseModel): | |
| task_id: str | |
| category_id: str | |
| category_name: str | |
| explanation: str | |
| class OverallAnalysis(BaseModel): | |
| failure_categories: List[Dict] | |
| task_classifications: Dict[str, Dict] | |
| summary: str | |
| class AsyncLLMClient(ABC): | |
| async def generate_text(self, prompt, system_message=None, response_format=None): | |
| pass | |
| # class AsyncOpenAIClient(AsyncLLMClient): | |
| # def __init__(self, model="gpt-4o-mini"): | |
| # self.model = model | |
| # self.client = AsyncOpenAI() | |
| # async def generate_text(self, prompt, system_message=None, response_format=None): | |
| # messages = [ | |
| # {"role": "system", "content": system_message or "You are a helpful AI assistant."}, | |
| # {"role": "user", "content": prompt} | |
| # ] | |
| # if response_format: | |
| # response = await self.client.beta.chat.completions.parse(model=self.model, messages=messages, response_format=response_format) | |
| # else: | |
| # response = await self.client.chat.completions.create(model=self.model, messages=messages) | |
| # return response.choices[0].message.content | |
| class AsyncOpenAIClient(AsyncLLMClient): | |
| def __init__(self, model="gpt-4o-mini", max_tries=5, max_time=300): | |
| self.model = model | |
| self.client = AsyncOpenAI() | |
| self.max_tries = max_tries | |
| self.max_time = max_time | |
| async def _make_request(self, messages, response_format=None): | |
| if response_format: | |
| return await self.client.beta.chat.completions.parse( | |
| model=self.model, | |
| messages=messages, | |
| response_format=response_format | |
| ) | |
| else: | |
| return await self.client.chat.completions.create( | |
| model=self.model, | |
| messages=messages | |
| ) | |
| async def generate_text(self, prompt, system_message=None, response_format=None): | |
| messages = [ | |
| {"role": "system", "content": system_message or "You are a helpful AI assistant."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| try: | |
| response = await self._make_request(messages, response_format) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| raise Exception(f"Failed after {self.max_tries} attempts or {self.max_time} seconds: {str(e)}") | |
| def get_weave_calls(client): | |
| calls = client.calls() | |
| processed_calls = [] | |
| for call in calls: | |
| ChatCompletion = weave.ref(call.output).get() | |
| choices = [choice.message.content for choice in ChatCompletion.choices] | |
| output = { | |
| 'weave_task_id': call.attributes['weave_task_id'], | |
| 'trace_id': call.trace_id, | |
| 'project_id': call.project_id, | |
| 'created_timestamp': ChatCompletion.created, | |
| 'inputs': dict(call.inputs), | |
| 'id': call.id, | |
| 'outputs': {'choices' : choices}, | |
| 'exception': call.exception, | |
| 'summary': call.summary, | |
| 'display_name': call.display_name, | |
| 'attributes': dict(call.attributes), | |
| "_children": call._children, | |
| '_feedback': call._feedback, | |
| } | |
| processed_calls.append(output) | |
| return processed_calls | |
| async def analyze_agent_performance(processed_calls, failed_tasks: list, llm_client): | |
| task_calls = defaultdict(list) | |
| for call in processed_calls: | |
| if call['weave_task_id'] in failed_tasks: | |
| task_calls[call['weave_task_id']].append(call) | |
| for task_id in task_calls: | |
| task_calls[task_id].sort(key=lambda x: x['created_timestamp']) | |
| task_summaries = await asyncio.gather(*[summarize_task(task_id, calls, llm_client) for task_id, calls in task_calls.items()]) | |
| failure_categories = await identify_failure_categories(task_summaries, llm_client) | |
| task_classifications = await classify_tasks(task_summaries, failure_categories, llm_client) | |
| overall_summary = await generate_overall_summary(failure_categories, task_classifications, llm_client) | |
| task_classifications = {tc["task_id"]: tc for tc in task_classifications} | |
| return dict(OverallAnalysis( | |
| failure_categories=failure_categories, | |
| task_classifications=task_classifications, | |
| summary=overall_summary | |
| )) | |
| async def summarize_task(task_id, calls, llm_client): | |
| calls_summary = "" | |
| for i, call in enumerate(calls, 1): | |
| calls_summary += f""" | |
| Step {i}: | |
| Input: {call['inputs']} | |
| Output: {call['outputs']} | |
| Timestamp: {datetime.fromtimestamp(call['created_timestamp'])} | |
| """ | |
| prompt = f""" | |
| Summarize the AI agent's performance on the following task: | |
| Task ID: {task_id} | |
| Number of steps: {len(calls)} | |
| Detailed steps: | |
| {calls_summary} | |
| Provide a brief summary of: | |
| 1. The main goal of the task (inferred from the inputs and outputs) | |
| 2. The agent's approach, including key steps and decisions made | |
| 3. Any significant challenges or errors encountered during the task | |
| 4. The final outcome why the task failed. Be detailed about the reason for failure. | |
| Keep the summary concise (around 200 words) but include specific details about the agent's performance and any notable aspects of its problem-solving process. | |
| """ | |
| system_message = "You are an AI performance analyst tasked with summarizing an AI agent's performance on individual tasks. Focus on the most important aspects of the agent's approach and performance." | |
| summary = await llm_client.generate_text(prompt, system_message, response_format=TaskSummary) | |
| return json.loads(summary) | |
| async def identify_failure_categories(task_summaries, llm_client): | |
| summaries_text = "\n\n".join([f"Task {s['task_id']}:\n{s['summary']}" for s in task_summaries]) | |
| prompt = f""" | |
| Analyze the following summaries of an AI agent's performance across multiple tasks: | |
| {summaries_text} | |
| Identify recurring categories of failures that the agent faces across these tasks. For each category: | |
| 1. Provide a short, descriptive name (max 5 words) | |
| 2. Write a brief description explaining the nature of this failure or challenge category | |
| Focus on patterns that appear across multiple tasks and represent specific errors that impacted the agent's performance. Make sure that your categories are distinct and cover a range of recurring issues. The categories should not bee too general. | |
| Examples for categories could include: | |
| Incorrect Implementation - The agent made a change to a reasonable area but their solution didn’t correctly address the issue. | |
| Gave Up Prematurely - The agent decides to stop solving the task after encountering some difficulty. | |
| Failed Edit Recovery - The agent went into an loop, making recurrent failing edits without recovering. | |
| """ | |
| system_message = "You are an expert in AI agent analysis, tasked with identifying recurring patterns in agent performance across multiple tasks." | |
| categories = await llm_client.generate_text(prompt, system_message, response_format=FailureCategories) | |
| return [dict(category) for category in json.loads(categories)['failure_categories']] | |
| async def classify_tasks(task_summaries, failure_categories, llm_client): | |
| categories_text = "\n".join([f"{cat['category_id']}. {cat['category_name']}: {cat['description']}" for i, cat in enumerate(failure_categories)]) | |
| classifications = [] | |
| for task in task_summaries: | |
| prompt = f""" | |
| Failure Categories: | |
| {categories_text} | |
| Task Summary: | |
| {task['summary']} | |
| Classify this task into one of the failure categories listed above. Provide: | |
| 1. The number of the chosen category | |
| 2. A brief explanation of why this category best fits the task's outcome | |
| If the task doesn't clearly fit any category, you may classify it as "0. Other" and explain why. | |
| """ | |
| system_message = "You are an AI performance analyst tasked with classifying task outcomes into predefined categories." | |
| classification = await llm_client.generate_text(prompt, system_message, response_format=TaskClassification) | |
| classification = json.loads(classification) | |
| category_number = classification['category_id'] | |
| if str(category_number) == "0": | |
| category_name = "Other" | |
| else: | |
| for cat in failure_categories: | |
| if str(cat['category_id']) == str(category_number): | |
| category_name = cat['category_name'] | |
| break | |
| else: | |
| category_name = "Other" | |
| explanation = classification['explanation'] | |
| classifications.append(dict(TaskClassification( | |
| task_id=task['task_id'], | |
| category_id=category_number, | |
| category_name=category_name, | |
| explanation=explanation | |
| ))) | |
| return classifications | |
| async def generate_overall_summary(failure_categories, task_classifications, llm_client): | |
| categories_text = "\n".join([f"{cat['category_name']}: {cat['description']}" for cat in failure_categories]) | |
| classifications_text = "\n".join([f"Task {tc['task_id']}: {tc['category_name']}" for tc in task_classifications]) | |
| prompt = f""" | |
| Failure Categories: | |
| {categories_text} | |
| Task Classifications: | |
| {classifications_text} | |
| Based on the failure categories identified and the classification of tasks, provide an overall summary of the AI agent's performance across all tasks. Include: | |
| 1. The most common types of failures or challenges | |
| 2. Any patterns in the agent's performance across different tasks | |
| 3. Suggestions for areas of improvement in the agent's design or training | |
| Keep the summary concise but insightful, focusing on the most significant findings and their implications for AI agent development. Do only return the summary itself without any preceding context etc. | |
| """ | |
| system_message = "You are a senior AI researcher tasked with providing a high-level analysis of an AI agent's performance across multiple tasks." | |
| return await llm_client.generate_text(prompt, system_message) | |
| async def main(): | |
| client = weave.init("citp_agent_eval/usaco_1723148990") | |
| processed_calls = get_weave_calls(client) | |
| weave.finish() | |
| openai_client = AsyncOpenAIClient(model="gpt-4o-mini") | |
| overall_analysis = await analyze_agent_performance(processed_calls, openai_client) | |
| with open("agent_performance_analysis.json", "w") as f: | |
| json.dump(overall_analysis.model_dump(), f, indent=4) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |