datavid / trace_agent_collaboratif.py
datacipen's picture
Upload 2 files
a284a70 verified
raw
history blame
4.63 kB
import os
import sys
import importlib
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta
import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm
import chainlit as cl
async def get_trace(apiKey,task_list):
try:
client = Client(api_key=apiKey)
project_name = "agent-collaboratif-avid"
num_days = 30
# List all tool runs
tool_runs = client.list_runs(
project_name=project_name,
start_time=datetime.now() - timedelta(days=num_days),
is_root=True,
# We don't need to fetch inputs, outputs, and other values that # may increase the query time
select=["inputs","trace_id"],
)
data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 20
tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
# Group tool runs by parent run ID
task2 = cl.Task(title="Grouper les outils invoqués dans une trace et les organiser par parent run ID")
await task_list.add_task(task2)
for run in tqdm(tool_runs):
# Collect all tools invoked within a given trace
tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
# maybe send a batch of parent run IDs to the server
# this lets us query for the root runs in batches
# while still processing the tool runs
if len(tool_runs_by_parent) % trace_batch_size == 0:
if this_batch := list(tool_runs_by_parent.keys())[
trace_cursor : trace_cursor + trace_batch_size
]:
trace_cursor += trace_batch_size
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["inputs","trace_id"],
)
)
await task_list.send()
if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
futures.append(
executor.submit(
client.list_runs,
project_name=project_name,
run_ids=this_batch,
select=["inputs","trace_id"],
)
)
task2.status = cl.TaskStatus.DONE
await task_list.send()
task3 = cl.Task(title="Rechercher les données d'actions des utilisateurs de l'agent collabroatif AVID et les organiser par parent run ID dans un DataFrame")
await task_list.add_task(task3)
for future in tqdm(futures):
root_runs = future.result()
for root_run in root_runs:
root_data = tool_runs_by_parent[root_run.id]
data.append(
{
"inputs": root_run.inputs,
"start_time": root_run.start_time,
"end_time": root_run.end_time,
}
)
# (Optional): Convert to a pandas DataFrame
task3.status = cl.TaskStatus.DONE
await task_list.send()
df_inputs = pd.DataFrame(data)
df_inputs['query'] = df_inputs.apply(lambda x: x.get('inputs', {}).get('query'), axis=1)
df_inputs['latency'] = df_inputs['end_time'] - df_inputs['start_time']
df_inputs['latency'] = df_inputs['latency'].apply(lambda x: x.total_seconds())
df_inputs=df_inputs[["query","latency","start_time"]].copy()
task4 = cl.Task(title="Conversion des données d'actions des utilisateurs de l'agent collabroatif AVID et les afficher au format texte")
await task_list.add_task(task4)
list_inputs = df_inputs.head(20).values.tolist()
str_inputs="".join(['* Requête : ' + str(item[0]) + '\nDate : ' + str(item[2]) + '\nDurée de la requête : ' + str(item[1]) + '\n\n' for item in list_inputs])
task4.status = cl.TaskStatus.DONE
await task_list.send()
return str_inputs
except Exception as e:
return f"Aucune connexion à LangSmith"