""" Application Chainlit pour l'Agent Collaboratif LangGraph ======================================================== Intégration complète avec: - Chainlit 2.8.1 - Official Data Layer (PostgreSQL/Supabase) - LangSmith monitoring - Starters avec icônes - Chain of Thought visible - Style personnalisé (dark theme) """ import os import json import asyncio from typing import Dict, Any, List, Optional import chainlit as cl from chainlit.types import ThreadDict, Starter from langsmith import traceable # Import du module agent (votre code existant) # On suppose que le code est dans agent_collaboratif_avid.py from agent_collaboratif_avid import ( run_collaborative_agent, retriever_manager, PINECONE_INDEX_NAME, OPENAI_MODEL_NAME, SIMILARITY_TOP_K, MAX_VALIDATION_LOOPS ) import bcrypt from trace_agent_collaboratif import get_trace from dataviz_avid import display_barplotcorrelation, display_barplotpublication, display_barplotformation # ============================================================================= # CONFIGURATION DE L'AUTHENTIFICATION (Optionnel) # ============================================================================= @cl.password_auth_callback def auth_callback(username: str, password: str) -> Optional[cl.User]: """ Callback d'authentification (optionnel). À configurer selon vos besoins. """ # Exemple simple (à remplacer par votre logique) auth = json.loads(os.environ.get("CHAINLIT_AUTH_LOGIN")) auth_iter = iter(auth) while True: # item will be "end" if iteration is complete connexion = next(auth_iter, "end") if bcrypt.checkpw(username.encode('utf-8'), bcrypt.hashpw(connexion['ident'].encode('utf-8'), bcrypt.gensalt())) == True and bcrypt.checkpw(password.encode('utf-8'), bcrypt.hashpw(connexion['pwd'].encode('utf-8'), bcrypt.gensalt())) == True: print("OK") return cl.User( identifier=connexion['ident'], metadata={"role": connexion['role'], "provider": "credentials"} ) if connexion == "end": break return None # ============================================================================= # CONFIGURATION LANGSMITH # ============================================================================= LANGCHAIN_API_KEY = os.environ.get("LANGCHAIN_API_KEY") LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT") if LANGCHAIN_API_KEY: os.environ["LANGCHAIN_TRACING_V2"] = "true" #os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" os.environ["LANGCHAIN_API_KEY"] = LANGCHAIN_API_KEY os.environ["LANGCHAIN_PROJECT"] = LANGSMITH_PROJECT #langsmith_client = Client() print(f"✅ LangSmith activé - Projet: {LANGSMITH_PROJECT}") else: print("⚠️ LANGCHAIN_API_KEY non définie - Monitoring désactivé") #langsmith_client = None @cl.action_callback("display_trace") async def on_action(action: cl.Action): # Create the TaskList task_list = cl.TaskList() task_list.status = "Running..." task1 = cl.Task(title="Connexion au serveur d'historique des données d'actions des utilisateurs de l'agent collabroatif AVID.", status=cl.TaskStatus.RUNNING) await task_list.add_task(task1) await task_list.send() trace = await get_trace(LANGCHAIN_API_KEY, task_list) task1.status = cl.TaskStatus.DONE await task_list.send() await cl.ElementSidebar.set_elements([cl.Text(content=trace, name="trace")]) await cl.ElementSidebar.set_title(" ⌛ Historique de conversation :") #await cl.Message(content="Afficher l'historique des actions",elements=[cl.Text(content=trace, name="trace", display="side")]).send() # ============================================================================= # FONCTIONS AUXILIAIRES POUR L'AFFICHAGE # ============================================================================= async def send_cot_step(step_name: str, content: str, status: str = "running"): """Envoie une étape du Chain of Thought.""" step = cl.Step( name=step_name, type="tool", show_input=True ) step.output = content if status == "done": step.is_error = False elif status == "error": step.is_error = True await step.send() return step async def display_query_analysis(analysis: Dict[str, Any]): """Affiche l'analyse de la requête.""" content = f"""**Bases identifiées:** {', '.join(analysis.get('databases_to_query', []))} **Priorités:** {json.dumps(analysis.get('priorities', {}), indent=2, ensure_ascii=False)} **Résumé:** {analysis.get('analysis_summary', 'N/A')} """ await send_cot_step("🔍 Analyse de la requête", content, "done") async def display_collection(info_list: List[Dict[str, Any]]): """Affiche les informations collectées.""" content_parts = [] for info in info_list: content_parts.append(f""" **📦 Base:** {info['database']} **Catégorie:** {info['category']} **Priorité:** {info['priority']} **Résultats:** {info['results_count']} """) content = "\n".join(content_parts) await send_cot_step("📊 Collecte d'informations", content, "done") async def display_validation(validation: Dict[str, Any], iteration: int): """Affiche les résultats de validation.""" content = f"""**Itération:** {iteration} **Score de confiance:** {validation.get('confidence_score', 0)}% **Validé:** {'✅ Oui' if validation.get('is_valid') else '❌ Non'} **Hallucinations détectées:** {len(validation.get('hallucinations_detected', []))} """ if validation.get('hallucinations_detected'): content += "\n**Problèmes:**\n" for hall in validation['hallucinations_detected']: content += f"- {hall}\n" status = "done" if validation.get('is_valid') else "error" await send_cot_step(f"✅ Validation (#{iteration})", content, status) async def display_similar_info(similar_info: List[Dict[str, Any]]): """Affiche les informations similaires.""" if not similar_info: return # Regrouper par base grouped = {} for item in similar_info: db = item['database'] if db not in grouped: grouped[db] = [] grouped[db].append(item) elements = [] for db_name, items in grouped.items(): content_parts = [f"### 📚 {db_name.upper()}\n"] content_parts.append(f"**Catégorie:** {items[0]['category']}") content_parts.append(f"**Résultats:** {len(items)}\n") for idx, item in enumerate(items[:3], 1): # Limiter à 3 par base score = item.get('score', 'N/A') content_parts.append(f"**{idx}. Score:** {score}") content_preview = item['content'][:200] if len(item['content']) > 200: content_preview += "..." content_parts.append(f"**Contenu:** {content_preview}\n") # Créer un élément Chainlit element = cl.Text( content="\n".join(content_parts), display="side" ) elements.append(element) #if elements: # await cl.Message( # content="💡 **Informations similaires trouvées dans d'autres bases**", # elements=elements # ).send() async def display_web_search_results(web_search_results: List[Dict[str, Any]]): """Affiche les résultats de recherche web.""" if not web_search_results: return elements = [] content_parts = [] content_parts.append(f"### 🌐 Résultats de la recherche web\n") content_parts.append(f"**Nombre de résultats:** {len(web_search_results)}\n") for idx, item in enumerate(web_search_results[:5], 1): # Limiter à 5 résultats content_parts.append(f"**{idx}. Titre:** {item['title']}") content_parts.append(f"**Lien:** {item['markdown_link']}") content_parts.append(f"**Résumé:** {item['summary']}\n") element = cl.Text( content="\n".join(content_parts), display="side" ) elements.append(element) if elements: await cl.Message( content="🌐 **Informations trouvées sur le web**", elements=elements ).send() # ============================================================================= # FONCTIONS D'AFFICHAGE STREAMING PAR NŒUD # ============================================================================= async def stream_response(content: str, msg: cl.Message, chunk_size: int = 50): """Stream du contenu progressivement dans un message.""" for i in range(0, len(content), chunk_size): chunk = content[i:i + chunk_size] msg.content += chunk await msg.update() await asyncio.sleep(0.25) # Petit délai pour un effet visuel async def display_node_update(node_name: str, state: Dict[str, Any]): """Affiche les mises à jour d'état après l'exécution d'un nœud.""" if node_name == "analyze_query": if state.get("query_analysis"): await display_query_analysis(state["query_analysis"]) elif node_name == "collect_information": if state.get("collected_information"): await display_collection(state["collected_information"]) elif node_name == "generate_response": if state.get("final_response"): content = f"**Réponse générée** ({len(state['final_response'])} caractères)\n\nLa réponse complète sera affichée à la fin du workflow." await send_cot_step("✏️ Génération de la réponse", content, "done") elif node_name == "validate_response": if state.get("validation_results"): iteration = state.get("iteration_count", len(state["validation_results"])) last_validation = state["validation_results"][-1] await display_validation(last_validation, iteration) elif node_name == "refine_response": content = f"**Itération:** {state.get('iteration_count', 0)}\n**Correction en cours...**" await send_cot_step("⚙️ Refinement", content, "done") elif node_name == "collect_similar_information": if state.get("additional_information"): await display_similar_info(state["additional_information"]) # ============================================================================= # FONCTION PRINCIPALE TRACÉE AVEC LANGSMITH # ============================================================================= @traceable(name="agent_collaboratif_query", project_name=LANGSMITH_PROJECT) async def process_query_with_tracing(query: str, thread_id: str) -> Dict[str, Any]: """Traite la requête avec traçage LangSmith et streaming en temps réel.""" # Import du workflow from agent_collaboratif_avid import AgentState, create_agent_workflow from langchain_core.messages import HumanMessage app = create_agent_workflow() initial_state = { "messages": [HumanMessage(content=query)], "user_query": query, "query_analysis": {}, "collected_information": [], "validation_results": [], "final_response": "", "iteration_count": 0, "errors": [], "additional_information": [], "similar_info_response":"", "web_search_results": [] } # Message de démarrage await send_cot_step("🔄 Démarrage", "Initialisation du workflow LangGraph...", "done") # Variables pour suivre l'état final_state = None # STREAMING: Utilisation de app.astream() pour obtenir les mises à jour après chaque nœud try: #async for event in app.astream(initial_state, {"callbacks": [langfuse_handler]}): #app.invoke(intial_state, config={"callbacks": [langfuse_handler]}) async for event in app.astream(initial_state): # event est un dictionnaire avec les nœuds comme clés for node_name, node_state in event.items(): # Ignorer le nœud spécial __start__ task_list = cl.TaskList() task_list.status = "Running..." task1 = cl.Task(title="Traitement de la requête utilisateur de l'agent collaboratif AVID par une action conditionnelle.") task1.status = cl.TaskStatus.RUNNING await task_list.add_task(task1) task2 = cl.Task(title="Collecte d'informations dans les bases de données et caractérisation des informations par un score de pertinence par rapport à la requête utilisateur.") await task_list.add_task(task2) task3 = cl.Task(title="Génération d'une première réponse par l'agent collaboratif AVID et vérification de sa validité en corrélant les informations collectées.") await task_list.add_task(task3) task4 = cl.Task(title="Vérification de la validité de la réponse par l'agent collaboratif AVID et correction si nécessaire pour éviter les réponses erronées.") await task_list.add_task(task4) task5 = cl.Task(title="Génération d'une réponse finale par l'agent collaboratif AVID et vérification de sa validité grâce à un dernier traitement de vérification anti-hallucination.") await task_list.add_task(task5) task6 = cl.Task(title="Affichage de la réponse finale et collecte d'informations similaires dans les bases de données non prises en compte dans la réponse finale.") await task_list.add_task(task6) task7 = cl.Task(title="Recherche sur le web des informations complémentaires par rapport à la requête utilisateur.") await task_list.add_task(task7) await task_list.send() if node_name == "__start__": # Create the TaskList continue # Afficher un message de progression pour le nœud actuel node_display_names = { "analyze_query": "🔍 Analyse de la requête", "collect_information": "📊 Collecte d'informations", "generate_response": "✏️ Génération de la réponse", "validate_response": "✅ Validation anti-hallucination", "refine_response": "⚙️ Refinement de la réponse", "collect_similar_information": "🔗 Collecte d'informations similaires" } if node_name == "analyze_query": task1.status = cl.TaskStatus.DONE task2.status = cl.TaskStatus.RUNNING await task_list.send() if node_name == "collect_information": task2.status = cl.TaskStatus.DONE task3.status = cl.TaskStatus.RUNNING await task_list.send() if node_name == "generate_response": task3.status = cl.TaskStatus.DONE task4.status = cl.TaskStatus.RUNNING await task_list.send() if node_name == "validate_response": task4.status = cl.TaskStatus.DONE task5.status = cl.TaskStatus.RUNNING await task_list.send() if node_name == "refine_response": task5.status = cl.TaskStatus.DONE task6.status = cl.TaskStatus.RUNNING await task_list.send() if node_name == "collect_similar_information": task6.status = cl.TaskStatus.DONE task7.status = cl.TaskStatus.RUNNING await task_list.send() if node_name == "web_search": task7.status = cl.TaskStatus.DONE task7.status = cl.TaskStatus.READY await task_list.send() display_name = node_display_names.get(node_name, f"⚙️ {node_name}") # Message de progression await send_cot_step( f"🔄 {display_name}", f"Nœud exécuté avec succès", "done" ) # Afficher les détails spécifiques du nœud await display_node_update(node_name, node_state) # Sauvegarder l'état final final_state = node_state except Exception as e: error_msg = f"Erreur lors du streaming: {str(e)}" await send_cot_step("❌ Erreur", error_msg, "error") raise # Si le streaming n'a pas retourné d'état final, utiliser la méthode classique if final_state is None: final_state = initial_state result = { "query": query, "query_analysis": final_state.get("query_analysis", {}), "collected_information": final_state.get("collected_information", []), "validation_results": final_state.get("validation_results", []), "final_response": final_state.get("final_response", ""), "iteration_count": final_state.get("iteration_count", 0), "errors": final_state.get("errors", []), "additional_information": final_state.get("additional_information", []), "similar_info_response": final_state.get("similar_info_response", ""), "web_search_results": final_state.get("web_search_results", []), "sources_used": [ info["database"] for info in final_state.get("collected_information", []) ], "pinecone_index": PINECONE_INDEX_NAME } return result # ============================================================================= # CALLBACKS CHAINLIT # ============================================================================= @cl.set_chat_profiles async def chat_profile(current_user: cl.User): return [ cl.ChatProfile( name="Avid Agent", markdown_description="🎓 Avid Agent permet de converser avec un agent collaboratif entre 4 bases de données pour extraire les informations pertinentes afin de générer une réponse en réduisant les hallucations, par relecture et redéfinition des éléments.", icon="/public/sparkles-gustaveia.png", starters=[ cl.Starter( label= "🔬 Laboratoires & Mobilité", message= "Quels sont les laboratoires de l'université Gustave Eiffel travaillant sur la mobilité urbaine durable?", #icon= "/public/icons/lab.svg" ), cl.Starter( label= "🎓 Formations Master", message= "Je cherche des formations en master sur l'aménagement urbain et le développement durable", #icon= "/public/icons/education.svg" ), cl.Starter( label= "🤝 Collaborations Recherche", message= "Quels laboratoires ont des axes de recherche similaires en énergie et pourraient collaborer?", #icon= "/public/icons/collaboration.svg" ), cl.Starter( label= "⚙️ Équipements Lab", message= "Liste les équipements disponibles dans les laboratoires travaillant sur la qualité de l'air", #icon= "/public/icons/equipment.svg" ), cl.Starter( label= "📚 Publications Récentes", message= "Trouve des publications récentes sur la transition énergétique dans les villes", #icon= "/public/icons/publications.svg" ), cl.Starter( label= "👥 Auteurs & Labs", message= "Qui sont les auteurs qui publient sur la mobilité douce et dans quels laboratoires?", #icon= "/public/icons/authors.svg" ), cl.Starter( label= "📖 Urbanisme Durable", message= "Quelles publications traitent de l'urbanisme durable et quand ont-elles été publiées?", #icon= "/public/icons/urban.svg" ), cl.Starter( label= "🏙️ Ville Intelligente", message= "Compare les formations et les laboratoires sur le thème de la ville intelligente", #icon= "/public/icons/smart-city.svg" ), cl.Starter( label= "🌍 Résilience Urbaine", message= "Identifie les opportunités de partenariats entre laboratoires sur la résilience urbaine", #icon= "/public/icons/resilience.svg" ), cl.Starter( label= "♻️ Économie Circulaire", message= "Quelles sont les compétences enseignées dans les formations liées à l'économie circulaire?", #icon= "/public/icons/circular.svg" ) ] ),cl.ChatProfile( name="Avid Dataviz", markdown_description="💡 Avid Dataviz permet d'avoir recours à des éléments statistiques et de corrélation entre les données laboratoires et les thématiques Ville Durable", icon="/public/charts.png", ),cl.ChatProfile( name="Avid Historique", markdown_description="💡 RUNNING...\nAvid Historique permet d'avoir recours aux actions des utilisateurs sur l'agent collaboratif AVID", icon="/public/threads.png", ) ] @cl.on_chat_start async def start(): """Initialisation de la session chat.""" commands = [ {"id": "Search", "icon": "globe", "description": "Rechercher sur le web"}, ] await cl.context.emitter.set_commands(commands) user = cl.user_session.get("user") chat_profile = cl.user_session.get("chat_profile") if chat_profile == "Avid Dataviz": await cl.Message( content=f"Bienvenue {user.identifier}!\n\nL'environnement {chat_profile} vous restitue les données sous forme d'objets statistiques." ).send() actions = [cl.Action(name="display_trace",icon="mouse-pointer-click",payload={"value": "trace"},label="Afficher l'historique des actions")] await cl.Message(content="Fil des conversations", actions=actions).send() elements = [] figure_correlation = await display_barplotcorrelation() figure_publication = await display_barplotpublication() figure_formation = await display_barplotformation() elements.append(cl.Plotly(name="chart_correlation", figure=figure_correlation, size="large", display="inline")) elements.append(cl.Plotly(name="chart_publication", figure=figure_publication, size="large", display="inline")) elements.append(cl.Plotly(name="chart_formation", figure=figure_formation, size="large", display="inline")) await cl.Message(content="Datavisualisation des thématiques Ville Durable en fonction des laboratoires, en fonction des publications de recherche et en fonction des formations", elements=elements).send() elif chat_profile == "Avid Historique": # Create the TaskList task_list = cl.TaskList() task_list.status = "Running..." task1 = cl.Task(title="Connexion au serveur d'historique des données d'actions des utilisateurs de l'agent collabroatif AVID.", status=cl.TaskStatus.RUNNING) await task_list.add_task(task1) await task_list.send() trace = await get_trace(LANGCHAIN_API_KEY,task_list) task1.status = cl.TaskStatus.DONE await task_list.send() elements = [cl.Text(content=trace, display="inline")] await cl.Message(content="Fil des conversations", elements=elements).send() # Message de bienvenue avec style # welcome_msg = f"""# 🎓 Agent Collaboratif - Université Gustave Eiffel #Bienvenue ! Je suis votre assistant spécialisé en **Ville Durable**. ## 🔧 Configuration #- **Index Pinecone:** `{PINECONE_INDEX_NAME}` #- **Modèle:** `{OPENAI_MODEL_NAME}` #- **Top K résultats:** `{SIMILARITY_TOP_K}` #- **Max validations:** `{MAX_VALIDATION_LOOPS}` ## 💡 Fonctionnalités #✅ Recherche multi-bases vectorielles #✅ Validation anti-hallucination #✅ Suggestions d'informations connexes #✅ Traçage LangSmith actif #**Choisissez un starter ou posez votre question !** #""" # await cl.Message(content=welcome_msg).send() # Sauvegarder les métadonnées de session # cl.user_session.set("session_started", True) # cl.user_session.set("query_count", 0) @cl.on_message async def main(message: cl.Message): """Traitement du message utilisateur.""" query = message.content thread_id = cl.context.session.thread_id # Incrémenter le compteur query_count = cl.user_session.get("query_count", 0) + 1 cl.user_session.set("query_count", query_count) # Message de traitement processing_msg = cl.Message(content="") await processing_msg.send() try: # Traitement avec affichage du COT result = await process_query_with_tracing(query, thread_id) # Réponse finale en streaming final_response = result["final_response"] # Afficher un séparateur await send_cot_step("📝 Réponse finale", "Affichage de la réponse complète en streaming...", "done") # Créer un nouveau message pour la réponse finale response_msg = cl.Message(content="") await response_msg.send() # Streamer la réponse complète await stream_response(final_response, response_msg, chunk_size=50) # Afficher les informations similaires collectées par le nœud 6 if result.get("similar_info_response"): similar_msg = cl.Message(content="") await similar_msg.send() # Streamer la réponse similaire await stream_response(result["similar_info_response"], similar_msg, chunk_size=50) #await display_similar_info(result["similar_info_response"]) # Afficher les résultats de recherche web collectés par le nœud 7 web_msg = cl.Message(content="Résultats de recherche complémentaires sur le web : \n\n") await web_msg.send() for result_web in result["web_search_results"]: web_search = "- " + result_web['markdown_link'] + " : " + result_web['summary'] + "\n\n" await stream_response(web_search, web_msg, chunk_size=50) #await display_web_search_results(result["web_search_results"]) # Métadonnées metadata_parts = [ f"\n\n---\n### 📊 Métadonnées du traitement", f"**Sources consultées:** {', '.join(result['sources_used']) if result['sources_used'] else 'Aucune'}", f"**Itérations:** {result['iteration_count']}", ] if result['validation_results']: last_val = result['validation_results'][-1] metadata_parts.append(f"**Confiance finale:** {last_val.get('confidence_score', 0)}%") metadata_parts.append(f"**Requête n°:** {query_count}") # Ajouter les métadonnées en streaming metadata_text = "\n".join(metadata_parts) await stream_response(metadata_text, response_msg, chunk_size=100) # Supprimer le message de traitement initial vide processing_msg.content = "✅ Traitement terminé" await processing_msg.update() actions = [cl.Action(name="display_trace",icon="mouse-pointer-click",payload={"value": "trace"},label="Afficher l'historique des actions")] await cl.Message(content="Fil des conversations", actions=actions).send() # Sauvegarder dans l'historique de session cl.user_session.set(f"query_{query_count}", { "query": query, "response": final_response, "sources": result['sources_used'] }) except Exception as e: error_msg = f"❌ **Erreur lors du traitement:**\n\n```\n{str(e)}\n```" processing_msg.content = error_msg await processing_msg.update() # Log dans LangSmith si disponible #if langsmith_client: # langsmith_client.create_feedback( # run_id=thread_id, # key="error", # score=0, # comment=str(e) # ) @cl.on_shared_thread_view async def on_shared_thread_view(thread: ThreadDict, viewer: Optional[cl.User]) -> bool: return True @cl.on_chat_resume async def on_chat_resume(thread: ThreadDict): """Reprise d'une conversation existante.""" thread_id = thread["id"] resume_msg = f"""# 🔄 Conversation reprise **Thread ID:** `{thread_id}` Vous pouvez continuer votre conversation ou poser une nouvelle question. """ await cl.Message(content=resume_msg).send() @cl.on_stop async def on_stop(): """Callback à l'arrêt de l'exécution.""" await cl.Message(content="⏹️ Traitement interrompu par l'utilisateur.").send() @cl.on_chat_end async def on_chat_end(): """Callback à la fin de la session.""" query_count = cl.user_session.get("query_count", 0) end_msg = f"""# 👋 Session terminée Merci d'avoir utilisé l'agent collaboratif ! **Statistiques de session:** - **Requêtes traitées:** {query_count} - **Index Pinecone:** {PINECONE_INDEX_NAME} """ await cl.Message(content=end_msg).send() # ============================================================================= # CONFIGURATION DU DATA LAYER (Supabase/PostgreSQL) # ============================================================================= """ Pour activer le Data Layer avec Supabase, créez un fichier .env: CHAINLIT_AUTH_SECRET=your-secret-key LITERAL_API_KEY=your-literal-api-key LITERAL_API_URL=https://cloud.getliteral.ai Ou configurez PostgreSQL directement: DATABASE_URL=postgresql://user:password@host:port/dbname Le Data Layer sera automatiquement activé si ces variables sont définies. """