import os import sys import importlib from collections import defaultdict from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime, timedelta import pandas as pd from langsmith import Client from tqdm.auto import tqdm import chainlit as cl async def get_trace(apiKey,task_list): try: client = Client(api_key=apiKey) project_name = "agent-collaboratif-avid" num_days = 30 # List all tool runs tool_runs = client.list_runs( project_name=project_name, start_time=datetime.now() - timedelta(days=num_days), is_root=True, # We don't need to fetch inputs, outputs, and other values that # may increase the query time select=["inputs","trace_id"], ) data = [] futures: list[Future] = [] trace_cursor = 0 trace_batch_size = 20 tool_runs_by_parent = defaultdict(lambda: defaultdict(set)) # Do not exceed rate limit with ThreadPoolExecutor(max_workers=2) as executor: # Group tool runs by parent run ID task2 = cl.Task(title="Grouper les outils invoqués dans une trace et les organiser par parent run ID") await task_list.add_task(task2) for run in tqdm(tool_runs): # Collect all tools invoked within a given trace tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name) # maybe send a batch of parent run IDs to the server # this lets us query for the root runs in batches # while still processing the tool runs if len(tool_runs_by_parent) % trace_batch_size == 0: if this_batch := list(tool_runs_by_parent.keys())[ trace_cursor : trace_cursor + trace_batch_size ]: trace_cursor += trace_batch_size futures.append( executor.submit( client.list_runs, project_name=project_name, run_ids=this_batch, select=["inputs","trace_id"], ) ) await task_list.send() if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]: futures.append( executor.submit( client.list_runs, project_name=project_name, run_ids=this_batch, select=["inputs","trace_id"], ) ) task2.status = cl.TaskStatus.DONE await task_list.send() task3 = cl.Task(title="Rechercher les données d'actions des utilisateurs de l'agent collabroatif AVID et les organiser par parent run ID dans un DataFrame") await task_list.add_task(task3) for future in tqdm(futures): root_runs = future.result() for root_run in root_runs: root_data = tool_runs_by_parent[root_run.id] data.append( { "inputs": root_run.inputs, "start_time": root_run.start_time, "end_time": root_run.end_time, } ) # (Optional): Convert to a pandas DataFrame task3.status = cl.TaskStatus.DONE await task_list.send() df_inputs = pd.DataFrame(data) df_inputs['query'] = df_inputs.apply(lambda x: x.get('inputs', {}).get('query'), axis=1) df_inputs['latency'] = df_inputs['end_time'] - df_inputs['start_time'] df_inputs['latency'] = df_inputs['latency'].apply(lambda x: x.total_seconds()) df_inputs=df_inputs[["query","latency","start_time"]].copy() task4 = cl.Task(title="Conversion des données d'actions des utilisateurs de l'agent collabroatif AVID et les afficher au format texte") await task_list.add_task(task4) list_inputs = df_inputs.head(20).values.tolist() str_inputs="".join(['* Requête : ' + str(item[0]) + '\nDate : ' + str(item[2]) + '\nDurée de la requête : ' + str(item[1]) + '\n\n' for item in list_inputs]) task4.status = cl.TaskStatus.DONE await task_list.send() return str_inputs except Exception as e: return f"Aucune connexion à LangSmith"