datavid / app.py
datacipen's picture
Update app.py
16c5cf6 verified
"""
Application Chainlit pour l'Agent Collaboratif LangGraph
========================================================
Intégration complète avec:
- Chainlit 2.8.1
- Official Data Layer (PostgreSQL/Supabase)
- LangSmith monitoring
- Starters avec icônes
- Chain of Thought visible
- Style personnalisé (dark theme)
"""
import os
import json
import asyncio
from typing import Dict, Any, List, Optional
import chainlit as cl
from chainlit.types import ThreadDict, Starter
from langsmith import traceable
# Import du module agent (votre code existant)
# On suppose que le code est dans agent_collaboratif_avid.py
from agent_collaboratif_avid import (
run_collaborative_agent,
retriever_manager,
PINECONE_INDEX_NAME,
OPENAI_MODEL_NAME,
SIMILARITY_TOP_K,
MAX_VALIDATION_LOOPS
)
import bcrypt
from trace_agent_collaboratif import get_trace
from dataviz_avid import display_barplotcorrelation, display_barplotpublication, display_barplotformation
# =============================================================================
# CONFIGURATION DE L'AUTHENTIFICATION (Optionnel)
# =============================================================================
@cl.password_auth_callback
def auth_callback(username: str, password: str) -> Optional[cl.User]:
"""
Callback d'authentification (optionnel).
À configurer selon vos besoins.
"""
# Exemple simple (à remplacer par votre logique)
auth = json.loads(os.environ.get("CHAINLIT_AUTH_LOGIN"))
auth_iter = iter(auth)
while True:
# item will be "end" if iteration is complete
connexion = next(auth_iter, "end")
if bcrypt.checkpw(username.encode('utf-8'), bcrypt.hashpw(connexion['ident'].encode('utf-8'), bcrypt.gensalt())) == True and bcrypt.checkpw(password.encode('utf-8'), bcrypt.hashpw(connexion['pwd'].encode('utf-8'), bcrypt.gensalt())) == True:
print("OK")
return cl.User(
identifier=connexion['ident'],
metadata={"role": connexion['role'], "provider": "credentials"}
)
if connexion == "end":
break
return None
# =============================================================================
# CONFIGURATION LANGSMITH
# =============================================================================
LANGCHAIN_API_KEY = os.environ.get("LANGCHAIN_API_KEY")
LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT")
if LANGCHAIN_API_KEY:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
#os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = LANGCHAIN_API_KEY
os.environ["LANGCHAIN_PROJECT"] = LANGSMITH_PROJECT
#langsmith_client = Client()
print(f"✅ LangSmith activé - Projet: {LANGSMITH_PROJECT}")
else:
print("⚠️ LANGCHAIN_API_KEY non définie - Monitoring désactivé")
#langsmith_client = None
@cl.action_callback("display_trace")
async def on_action(action: cl.Action):
# Create the TaskList
task_list = cl.TaskList()
task_list.status = "Running..."
task1 = cl.Task(title="Connexion au serveur d'historique des données d'actions des utilisateurs de l'agent collabroatif AVID.", status=cl.TaskStatus.RUNNING)
await task_list.add_task(task1)
await task_list.send()
trace = await get_trace(LANGCHAIN_API_KEY, task_list)
task1.status = cl.TaskStatus.DONE
await task_list.send()
await cl.ElementSidebar.set_elements([cl.Text(content=trace, name="trace")])
await cl.ElementSidebar.set_title(" ⌛ Historique de conversation :")
#await cl.Message(content="Afficher l'historique des actions",elements=[cl.Text(content=trace, name="trace", display="side")]).send()
# =============================================================================
# FONCTIONS AUXILIAIRES POUR L'AFFICHAGE
# =============================================================================
async def send_cot_step(step_name: str, content: str, status: str = "running"):
"""Envoie une étape du Chain of Thought."""
step = cl.Step(
name=step_name,
type="tool",
show_input=True
)
step.output = content
if status == "done":
step.is_error = False
elif status == "error":
step.is_error = True
await step.send()
return step
async def display_query_analysis(analysis: Dict[str, Any]):
"""Affiche l'analyse de la requête."""
content = f"""**Bases identifiées:** {', '.join(analysis.get('databases_to_query', []))}
**Priorités:**
{json.dumps(analysis.get('priorities', {}), indent=2, ensure_ascii=False)}
**Résumé:** {analysis.get('analysis_summary', 'N/A')}
"""
await send_cot_step("🔍 Analyse de la requête", content, "done")
async def display_collection(info_list: List[Dict[str, Any]]):
"""Affiche les informations collectées."""
content_parts = []
for info in info_list:
content_parts.append(f"""
**📦 Base:** {info['database']}
**Catégorie:** {info['category']}
**Priorité:** {info['priority']}
**Résultats:** {info['results_count']}
""")
content = "\n".join(content_parts)
await send_cot_step("📊 Collecte d'informations", content, "done")
async def display_validation(validation: Dict[str, Any], iteration: int):
"""Affiche les résultats de validation."""
content = f"""**Itération:** {iteration}
**Score de confiance:** {validation.get('confidence_score', 0)}%
**Validé:** {'✅ Oui' if validation.get('is_valid') else '❌ Non'}
**Hallucinations détectées:** {len(validation.get('hallucinations_detected', []))}
"""
if validation.get('hallucinations_detected'):
content += "\n**Problèmes:**\n"
for hall in validation['hallucinations_detected']:
content += f"- {hall}\n"
status = "done" if validation.get('is_valid') else "error"
await send_cot_step(f"✅ Validation (#{iteration})", content, status)
async def display_similar_info(similar_info: List[Dict[str, Any]]):
"""Affiche les informations similaires."""
if not similar_info:
return
# Regrouper par base
grouped = {}
for item in similar_info:
db = item['database']
if db not in grouped:
grouped[db] = []
grouped[db].append(item)
elements = []
for db_name, items in grouped.items():
content_parts = [f"### 📚 {db_name.upper()}\n"]
content_parts.append(f"**Catégorie:** {items[0]['category']}")
content_parts.append(f"**Résultats:** {len(items)}\n")
for idx, item in enumerate(items[:3], 1): # Limiter à 3 par base
score = item.get('score', 'N/A')
content_parts.append(f"**{idx}. Score:** {score}")
content_preview = item['content'][:200]
if len(item['content']) > 200:
content_preview += "..."
content_parts.append(f"**Contenu:** {content_preview}\n")
# Créer un élément Chainlit
element = cl.Text(
content="\n".join(content_parts),
display="side"
)
elements.append(element)
#if elements:
# await cl.Message(
# content="💡 **Informations similaires trouvées dans d'autres bases**",
# elements=elements
# ).send()
async def display_web_search_results(web_search_results: List[Dict[str, Any]]):
"""Affiche les résultats de recherche web."""
if not web_search_results:
return
elements = []
content_parts = []
content_parts.append(f"### 🌐 Résultats de la recherche web\n")
content_parts.append(f"**Nombre de résultats:** {len(web_search_results)}\n")
for idx, item in enumerate(web_search_results[:5], 1): # Limiter à 5 résultats
content_parts.append(f"**{idx}. Titre:** {item['title']}")
content_parts.append(f"**Lien:** {item['markdown_link']}")
content_parts.append(f"**Résumé:** {item['summary']}\n")
element = cl.Text(
content="\n".join(content_parts),
display="side"
)
elements.append(element)
if elements:
await cl.Message(
content="🌐 **Informations trouvées sur le web**",
elements=elements
).send()
# =============================================================================
# FONCTIONS D'AFFICHAGE STREAMING PAR NŒUD
# =============================================================================
async def stream_response(content: str, msg: cl.Message, chunk_size: int = 50):
"""Stream du contenu progressivement dans un message."""
for i in range(0, len(content), chunk_size):
chunk = content[i:i + chunk_size]
msg.content += chunk
await msg.update()
await asyncio.sleep(0.25) # Petit délai pour un effet visuel
async def display_node_update(node_name: str, state: Dict[str, Any]):
"""Affiche les mises à jour d'état après l'exécution d'un nœud."""
if node_name == "analyze_query":
if state.get("query_analysis"):
await display_query_analysis(state["query_analysis"])
elif node_name == "collect_information":
if state.get("collected_information"):
await display_collection(state["collected_information"])
elif node_name == "generate_response":
if state.get("final_response"):
content = f"**Réponse générée** ({len(state['final_response'])} caractères)\n\nLa réponse complète sera affichée à la fin du workflow."
await send_cot_step("✏️ Génération de la réponse", content, "done")
elif node_name == "validate_response":
if state.get("validation_results"):
iteration = state.get("iteration_count", len(state["validation_results"]))
last_validation = state["validation_results"][-1]
await display_validation(last_validation, iteration)
elif node_name == "refine_response":
content = f"**Itération:** {state.get('iteration_count', 0)}\n**Correction en cours...**"
await send_cot_step("⚙️ Refinement", content, "done")
elif node_name == "collect_similar_information":
if state.get("additional_information"):
await display_similar_info(state["additional_information"])
# =============================================================================
# FONCTION PRINCIPALE TRACÉE AVEC LANGSMITH
# =============================================================================
@traceable(name="agent_collaboratif_query", project_name=LANGSMITH_PROJECT)
async def process_query_with_tracing(query: str, thread_id: str) -> Dict[str, Any]:
"""Traite la requête avec traçage LangSmith et streaming en temps réel."""
# Import du workflow
from agent_collaboratif_avid import AgentState, create_agent_workflow
from langchain_core.messages import HumanMessage
app = create_agent_workflow()
initial_state = {
"messages": [HumanMessage(content=query)],
"user_query": query,
"query_analysis": {},
"collected_information": [],
"validation_results": [],
"final_response": "",
"iteration_count": 0,
"errors": [],
"additional_information": [],
"similar_info_response":"",
"web_search_results": []
}
# Message de démarrage
await send_cot_step("🔄 Démarrage", "Initialisation du workflow LangGraph...", "done")
# Variables pour suivre l'état
final_state = None
# STREAMING: Utilisation de app.astream() pour obtenir les mises à jour après chaque nœud
try:
#async for event in app.astream(initial_state, {"callbacks": [langfuse_handler]}):
#app.invoke(intial_state, config={"callbacks": [langfuse_handler]})
async for event in app.astream(initial_state):
# event est un dictionnaire avec les nœuds comme clés
for node_name, node_state in event.items():
# Ignorer le nœud spécial __start__
task_list = cl.TaskList()
task_list.status = "Running..."
task1 = cl.Task(title="Traitement de la requête utilisateur de l'agent collaboratif AVID par une action conditionnelle.")
task1.status = cl.TaskStatus.RUNNING
await task_list.add_task(task1)
task2 = cl.Task(title="Collecte d'informations dans les bases de données et caractérisation des informations par un score de pertinence par rapport à la requête utilisateur.")
await task_list.add_task(task2)
task3 = cl.Task(title="Génération d'une première réponse par l'agent collaboratif AVID et vérification de sa validité en corrélant les informations collectées.")
await task_list.add_task(task3)
task4 = cl.Task(title="Vérification de la validité de la réponse par l'agent collaboratif AVID et correction si nécessaire pour éviter les réponses erronées.")
await task_list.add_task(task4)
task5 = cl.Task(title="Génération d'une réponse finale par l'agent collaboratif AVID et vérification de sa validité grâce à un dernier traitement de vérification anti-hallucination.")
await task_list.add_task(task5)
task6 = cl.Task(title="Affichage de la réponse finale et collecte d'informations similaires dans les bases de données non prises en compte dans la réponse finale.")
await task_list.add_task(task6)
task7 = cl.Task(title="Recherche sur le web des informations complémentaires par rapport à la requête utilisateur.")
await task_list.add_task(task7)
await task_list.send()
if node_name == "__start__":
# Create the TaskList
continue
# Afficher un message de progression pour le nœud actuel
node_display_names = {
"analyze_query": "🔍 Analyse de la requête",
"collect_information": "📊 Collecte d'informations",
"generate_response": "✏️ Génération de la réponse",
"validate_response": "✅ Validation anti-hallucination",
"refine_response": "⚙️ Refinement de la réponse",
"collect_similar_information": "🔗 Collecte d'informations similaires"
}
if node_name == "analyze_query":
task1.status = cl.TaskStatus.DONE
task2.status = cl.TaskStatus.RUNNING
await task_list.send()
if node_name == "collect_information":
task2.status = cl.TaskStatus.DONE
task3.status = cl.TaskStatus.RUNNING
await task_list.send()
if node_name == "generate_response":
task3.status = cl.TaskStatus.DONE
task4.status = cl.TaskStatus.RUNNING
await task_list.send()
if node_name == "validate_response":
task4.status = cl.TaskStatus.DONE
task5.status = cl.TaskStatus.RUNNING
await task_list.send()
if node_name == "refine_response":
task5.status = cl.TaskStatus.DONE
task6.status = cl.TaskStatus.RUNNING
await task_list.send()
if node_name == "collect_similar_information":
task6.status = cl.TaskStatus.DONE
task7.status = cl.TaskStatus.RUNNING
await task_list.send()
if node_name == "web_search":
task7.status = cl.TaskStatus.DONE
task7.status = cl.TaskStatus.READY
await task_list.send()
display_name = node_display_names.get(node_name, f"⚙️ {node_name}")
# Message de progression
await send_cot_step(
f"🔄 {display_name}",
f"Nœud exécuté avec succès",
"done"
)
# Afficher les détails spécifiques du nœud
await display_node_update(node_name, node_state)
# Sauvegarder l'état final
final_state = node_state
except Exception as e:
error_msg = f"Erreur lors du streaming: {str(e)}"
await send_cot_step("❌ Erreur", error_msg, "error")
raise
# Si le streaming n'a pas retourné d'état final, utiliser la méthode classique
if final_state is None:
final_state = initial_state
result = {
"query": query,
"query_analysis": final_state.get("query_analysis", {}),
"collected_information": final_state.get("collected_information", []),
"validation_results": final_state.get("validation_results", []),
"final_response": final_state.get("final_response", ""),
"iteration_count": final_state.get("iteration_count", 0),
"errors": final_state.get("errors", []),
"additional_information": final_state.get("additional_information", []),
"similar_info_response": final_state.get("similar_info_response", ""),
"web_search_results": final_state.get("web_search_results", []),
"sources_used": [
info["database"]
for info in final_state.get("collected_information", [])
],
"pinecone_index": PINECONE_INDEX_NAME
}
return result
# =============================================================================
# CALLBACKS CHAINLIT
# =============================================================================
@cl.set_chat_profiles
async def chat_profile(current_user: cl.User):
return [
cl.ChatProfile(
name="Avid Agent",
markdown_description="🎓 Avid Agent permet de converser avec un agent collaboratif entre 4 bases de données pour extraire les informations pertinentes afin de générer une réponse en réduisant les hallucations, par relecture et redéfinition des éléments.",
icon="/public/sparkles-gustaveia.png",
starters=[
cl.Starter(
label= "🔬 Laboratoires & Mobilité",
message= "Quels sont les laboratoires de l'université Gustave Eiffel travaillant sur la mobilité urbaine durable?",
#icon= "/public/icons/lab.svg"
),
cl.Starter(
label= "🎓 Formations Master",
message= "Je cherche des formations en master sur l'aménagement urbain et le développement durable",
#icon= "/public/icons/education.svg"
),
cl.Starter(
label= "🤝 Collaborations Recherche",
message= "Quels laboratoires ont des axes de recherche similaires en énergie et pourraient collaborer?",
#icon= "/public/icons/collaboration.svg"
),
cl.Starter(
label= "⚙️ Équipements Lab",
message= "Liste les équipements disponibles dans les laboratoires travaillant sur la qualité de l'air",
#icon= "/public/icons/equipment.svg"
),
cl.Starter(
label= "📚 Publications Récentes",
message= "Trouve des publications récentes sur la transition énergétique dans les villes",
#icon= "/public/icons/publications.svg"
),
cl.Starter(
label= "👥 Auteurs & Labs",
message= "Qui sont les auteurs qui publient sur la mobilité douce et dans quels laboratoires?",
#icon= "/public/icons/authors.svg"
),
cl.Starter(
label= "📖 Urbanisme Durable",
message= "Quelles publications traitent de l'urbanisme durable et quand ont-elles été publiées?",
#icon= "/public/icons/urban.svg"
),
cl.Starter(
label= "🏙️ Ville Intelligente",
message= "Compare les formations et les laboratoires sur le thème de la ville intelligente",
#icon= "/public/icons/smart-city.svg"
),
cl.Starter(
label= "🌍 Résilience Urbaine",
message= "Identifie les opportunités de partenariats entre laboratoires sur la résilience urbaine",
#icon= "/public/icons/resilience.svg"
),
cl.Starter(
label= "♻️ Économie Circulaire",
message= "Quelles sont les compétences enseignées dans les formations liées à l'économie circulaire?",
#icon= "/public/icons/circular.svg"
)
]
),cl.ChatProfile(
name="Avid Dataviz",
markdown_description="💡 Avid Dataviz permet d'avoir recours à des éléments statistiques et de corrélation entre les données laboratoires et les thématiques Ville Durable",
icon="/public/charts.png",
),cl.ChatProfile(
name="Avid Historique",
markdown_description="💡 RUNNING...\nAvid Historique permet d'avoir recours aux actions des utilisateurs sur l'agent collaboratif AVID",
icon="/public/threads.png",
)
]
@cl.on_chat_start
async def start():
"""Initialisation de la session chat."""
commands = [
{"id": "Search", "icon": "globe", "description": "Rechercher sur le web"},
]
await cl.context.emitter.set_commands(commands)
user = cl.user_session.get("user")
chat_profile = cl.user_session.get("chat_profile")
if chat_profile == "Avid Dataviz":
await cl.Message(
content=f"Bienvenue {user.identifier}!\n\nL'environnement {chat_profile} vous restitue les données sous forme d'objets statistiques."
).send()
actions = [cl.Action(name="display_trace",icon="mouse-pointer-click",payload={"value": "trace"},label="Afficher l'historique des actions")]
await cl.Message(content="Fil des conversations", actions=actions).send()
elements = []
figure_correlation = await display_barplotcorrelation()
figure_publication = await display_barplotpublication()
figure_formation = await display_barplotformation()
elements.append(cl.Plotly(name="chart_correlation", figure=figure_correlation, size="large", display="inline"))
elements.append(cl.Plotly(name="chart_publication", figure=figure_publication, size="large", display="inline"))
elements.append(cl.Plotly(name="chart_formation", figure=figure_formation, size="large", display="inline"))
await cl.Message(content="Datavisualisation des thématiques Ville Durable en fonction des laboratoires, en fonction des publications de recherche et en fonction des formations", elements=elements).send()
elif chat_profile == "Avid Historique":
# Create the TaskList
task_list = cl.TaskList()
task_list.status = "Running..."
task1 = cl.Task(title="Connexion au serveur d'historique des données d'actions des utilisateurs de l'agent collabroatif AVID.", status=cl.TaskStatus.RUNNING)
await task_list.add_task(task1)
await task_list.send()
trace = await get_trace(LANGCHAIN_API_KEY,task_list)
task1.status = cl.TaskStatus.DONE
await task_list.send()
elements = [cl.Text(content=trace, display="inline")]
await cl.Message(content="Fil des conversations", elements=elements).send()
# Message de bienvenue avec style
# welcome_msg = f"""# 🎓 Agent Collaboratif - Université Gustave Eiffel
#Bienvenue ! Je suis votre assistant spécialisé en **Ville Durable**.
## 🔧 Configuration
#- **Index Pinecone:** `{PINECONE_INDEX_NAME}`
#- **Modèle:** `{OPENAI_MODEL_NAME}`
#- **Top K résultats:** `{SIMILARITY_TOP_K}`
#- **Max validations:** `{MAX_VALIDATION_LOOPS}`
## 💡 Fonctionnalités
#✅ Recherche multi-bases vectorielles
#✅ Validation anti-hallucination
#✅ Suggestions d'informations connexes
#✅ Traçage LangSmith actif
#**Choisissez un starter ou posez votre question !**
#"""
# await cl.Message(content=welcome_msg).send()
# Sauvegarder les métadonnées de session
# cl.user_session.set("session_started", True)
# cl.user_session.set("query_count", 0)
@cl.on_message
async def main(message: cl.Message):
"""Traitement du message utilisateur."""
query = message.content
thread_id = cl.context.session.thread_id
# Incrémenter le compteur
query_count = cl.user_session.get("query_count", 0) + 1
cl.user_session.set("query_count", query_count)
# Message de traitement
processing_msg = cl.Message(content="")
await processing_msg.send()
try:
# Traitement avec affichage du COT
result = await process_query_with_tracing(query, thread_id)
# Réponse finale en streaming
final_response = result["final_response"]
# Afficher un séparateur
await send_cot_step("📝 Réponse finale", "Affichage de la réponse complète en streaming...", "done")
# Créer un nouveau message pour la réponse finale
response_msg = cl.Message(content="")
await response_msg.send()
# Streamer la réponse complète
await stream_response(final_response, response_msg, chunk_size=50)
# Afficher les informations similaires collectées par le nœud 6
if result.get("similar_info_response"):
similar_msg = cl.Message(content="")
await similar_msg.send()
# Streamer la réponse similaire
await stream_response(result["similar_info_response"], similar_msg, chunk_size=50)
#await display_similar_info(result["similar_info_response"])
# Afficher les résultats de recherche web collectés par le nœud 7
web_msg = cl.Message(content="Résultats de recherche complémentaires sur le web : \n\n")
await web_msg.send()
for result_web in result["web_search_results"]:
web_search = "- " + result_web['markdown_link'] + " : " + result_web['summary'] + "\n\n"
await stream_response(web_search, web_msg, chunk_size=50)
#await display_web_search_results(result["web_search_results"])
# Métadonnées
metadata_parts = [
f"\n\n---\n### 📊 Métadonnées du traitement",
f"**Sources consultées:** {', '.join(result['sources_used']) if result['sources_used'] else 'Aucune'}",
f"**Itérations:** {result['iteration_count']}",
]
if result['validation_results']:
last_val = result['validation_results'][-1]
metadata_parts.append(f"**Confiance finale:** {last_val.get('confidence_score', 0)}%")
metadata_parts.append(f"**Requête n°:** {query_count}")
# Ajouter les métadonnées en streaming
metadata_text = "\n".join(metadata_parts)
await stream_response(metadata_text, response_msg, chunk_size=100)
# Supprimer le message de traitement initial vide
processing_msg.content = "✅ Traitement terminé"
await processing_msg.update()
actions = [cl.Action(name="display_trace",icon="mouse-pointer-click",payload={"value": "trace"},label="Afficher l'historique des actions")]
await cl.Message(content="Fil des conversations", actions=actions).send()
# Sauvegarder dans l'historique de session
cl.user_session.set(f"query_{query_count}", {
"query": query,
"response": final_response,
"sources": result['sources_used']
})
except Exception as e:
error_msg = f"❌ **Erreur lors du traitement:**\n\n```\n{str(e)}\n```"
processing_msg.content = error_msg
await processing_msg.update()
# Log dans LangSmith si disponible
#if langsmith_client:
# langsmith_client.create_feedback(
# run_id=thread_id,
# key="error",
# score=0,
# comment=str(e)
# )
@cl.on_shared_thread_view
async def on_shared_thread_view(thread: ThreadDict, viewer: Optional[cl.User]) -> bool:
return True
@cl.on_chat_resume
async def on_chat_resume(thread: ThreadDict):
"""Reprise d'une conversation existante."""
thread_id = thread["id"]
resume_msg = f"""# 🔄 Conversation reprise
**Thread ID:** `{thread_id}`
Vous pouvez continuer votre conversation ou poser une nouvelle question.
"""
await cl.Message(content=resume_msg).send()
@cl.on_stop
async def on_stop():
"""Callback à l'arrêt de l'exécution."""
await cl.Message(content="⏹️ Traitement interrompu par l'utilisateur.").send()
@cl.on_chat_end
async def on_chat_end():
"""Callback à la fin de la session."""
query_count = cl.user_session.get("query_count", 0)
end_msg = f"""# 👋 Session terminée
Merci d'avoir utilisé l'agent collaboratif !
**Statistiques de session:**
- **Requêtes traitées:** {query_count}
- **Index Pinecone:** {PINECONE_INDEX_NAME}
"""
await cl.Message(content=end_msg).send()
# =============================================================================
# CONFIGURATION DU DATA LAYER (Supabase/PostgreSQL)
# =============================================================================
"""
Pour activer le Data Layer avec Supabase, créez un fichier .env:
CHAINLIT_AUTH_SECRET=your-secret-key
LITERAL_API_KEY=your-literal-api-key
LITERAL_API_URL=https://cloud.getliteral.ai
Ou configurez PostgreSQL directement:
DATABASE_URL=postgresql://user:password@host:port/dbname
Le Data Layer sera automatiquement activé si ces variables sont définies.
"""