import streamlit as st from langchain_mistralai.chat_models import ChatMistralAI from langchain_community.utilities import SQLDatabase from langgraph.graph import StateGraph from typing import TypedDict, Annotated, Literal, List import os from dotenv import load_dotenv from sqlalchemy import create_engine, text import pandas as pd from typing import Dict import mysql.connector import os from dotenv import load_dotenv from langchain_mistralai.chat_models import ChatMistralAI from langchain_community.utilities import SQLDatabase from sqlalchemy import create_engine import mysql.connector # đŸ”č Charger les variables d'environnement load_dotenv() MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") server = os.getenv("DB_HOST") database = os.getenv("DB_NAME") user = os.getenv("DB_USER") password = os.getenv("DB_PASSWORD") # VĂ©rifier que les variables d'environnement sont chargĂ©es if not all([MISTRAL_API_KEY, server, database, user, password]): raise ValueError("Veuillez vĂ©rifier les variables d'environnement dans les paramĂštres Hugging Face.") # đŸ”č Configuration du modĂšle Mistral model_name = "mistral-large-latest" llm = ChatMistralAI( model=model_name, api_key=MISTRAL_API_KEY, temperature=0, stream=True, verbose=True ) # đŸ”č Configuration de la base de donnĂ©es MySQL # Utiliser mysql-connector-python avec SQLAlchemy db_url = f"mysql+mysqlconnector://{user}:{password}@{server}/{database}" try: engine = create_engine(db_url) db = SQLDatabase.from_uri(db_url) print("Connexion Ă  la base de donnĂ©es rĂ©ussie !") except Exception as e: print(f"Erreur lors de la connexion Ă  la base de donnĂ©es : {e}") # Si tu veux tester une connexion directe sans SQLAlchemy try: conn = mysql.connector.connect( host=server, user=user, password=password, database=database ) print("Connexion rĂ©ussie Ă  la base de donnĂ©es MySQL !") except mysql.connector.Error as err: print(f"Erreur de connexion : {err}") # đŸ”č DĂ©finition du graphe LangGraph class QueryState(TypedDict): query: Annotated[str, "RequĂȘte en langage naturel"] sql_query: Annotated[str, "RequĂȘte SQL gĂ©nĂ©rĂ©e"] result: Annotated[List[dict], "RĂ©sultat SQL"] user_intent: Literal["calculation", "visualization", "other"] visualization: Annotated[str, "Type de visualisation recommandĂ©e"] graph = StateGraph(QueryState) # đŸ”č Étape 1 : Analyser l'intention de l'utilisateur def analyze_intent(state: QueryState) -> QueryState: prompt = f""" Tu es un assistant SQL expert. Analyse l'intention de l'utilisateur Ă  partir de sa question. L'intention peut ĂȘtre : - "calculation" : Si la question nĂ©cessite un calcul ou une agrĂ©gation (ex : somme, moyenne, etc.). - "visualization" : Si la question demande une visualisation (ex : graphique, tableau, etc.). - "other" : Pour toute autre intention. **Question de l'utilisateur :** {state["query"]} **Intention dĂ©tectĂ©e :** """ intent = llm.invoke(prompt).content.strip().lower() return {**state, "user_intent": intent} # đŸ”č Étape 2 : GĂ©nĂ©rer la requĂȘte SQL avec cache et affichage en temps rĂ©el def generate_sql(state: QueryState) -> QueryState: if isinstance(state, dict) and state.get("sql_query"): # Si la requĂȘte SQL existe dĂ©jĂ , ne pas la regĂ©nĂ©rer return state prompt = f""" Tu es un assistant SQL expert. L'utilisateur te pose une question en langage naturel sur une base de donnĂ©es. **Tables disponibles :** - 'user' (colonnes : firstname, name, role, created_date, last_login, first_login, instagram_url, nb_instagram_followers) **Consignes :** - GĂ©nĂšre uniquement une requĂȘte SQL **valide**. - N'ajoute **aucun texte explicatif**, retourne seulement la requĂȘte SQL. - Assure-toi que la requĂȘte ne contient **aucune erreur de syntaxe**. - **Ne mets pas de point-virgule Ă  la fin de la requĂȘte**. - Utilise des noms de tables et de colonnes sans caractĂšres spĂ©ciaux, et utilise des crochets `[]` si nĂ©cessaire pour SQL Server. **Question de l'utilisateur :** {state["query"]} **RequĂȘte SQL :** """ response = llm.stream(prompt) sql_query = "".join(chunk.content for chunk in response).strip() return {**state, "sql_query": sql_query} # đŸ”č Étape 3 : Valider et corriger la requĂȘte SQL avec cache et affichage en temps rĂ©el def validate_and_fix_sql(state: QueryState) -> QueryState: if "sql_query" in state and state["sql_query"]: return state prompt = f""" Tu es un assistant SQL expert. Valide et corrige la requĂȘte SQL suivante si nĂ©cessaire. **RequĂȘte SQL :** {state["sql_query"]} **SchĂ©ma de la base de donnĂ©es :** - 'user' (colonnes : firstname, name, role, created_date, last_login, first_login, instagram_url, nb_instagram_followers) **Consignes :** - Si la requĂȘte est valide, retourne-la telle quelle. - Si la requĂȘte contient des erreurs, corrige-la et retourne la version corrigĂ©e. - N'ajoute **aucun texte explicatif**, retourne seulement la requĂȘte SQL. - Utilise des crochets `[]` pour dĂ©limiter les noms de tables ou de colonnes si nĂ©cessaire, car la base de donnĂ©es est SQL Server. - Remplace `LIMIT` par `TOP` pour SQL Server. - Assure-toi que la requĂȘte ne contient **aucune erreur de syntaxe**. **RequĂȘte SQL corrigĂ©e :** """ response = llm.stream(prompt) corrected_query = "".join(chunk.content for chunk in response).strip() return {**state, "sql_query": corrected_query} # đŸ”č Étape 4 : ExĂ©cuter la requĂȘte SQL def execute_query(state: Dict) -> Dict: try: with engine.connect() as conn: result = conn.execute(text(state["sql_query"])).fetchall() result_dict = [dict(row._mapping) for row in result] if result_dict: df = pd.DataFrame(result_dict) st.dataframe(df) # Afficher les rĂ©sultats sous forme de tableau else: st.write("⚠ Aucun rĂ©sultat trouvĂ©.") return {**state, "result": result_dict} except Exception as e: return {**state, "result": [{"error": str(e)}]} # đŸ”č Configuration du graphe LangGraph graph.add_node("analyze_intent", analyze_intent) graph.add_node("generate_sql", generate_sql) graph.add_node("validate_and_fix_sql", validate_and_fix_sql) graph.add_node("execute_query", execute_query) graph.set_entry_point("analyze_intent") graph.add_edge("analyze_intent", "generate_sql") graph.add_edge("generate_sql", "validate_and_fix_sql") graph.add_edge("validate_and_fix_sql", "execute_query") agent = graph.compile() # đŸ”č Initialisation de l'historique des interactions if "chat_history" not in st.session_state: st.session_state.chat_history = [] # Fonction pour afficher les messages avec le bon style (droite pour utilisateur, gauche pour assistant) def display_chat_history(): for message in st.session_state.chat_history: if message["role"] == "user": st.markdown( f'
' f'{message["content"]}
', unsafe_allow_html=True ) else: st.markdown( f'
' f'{message["content"]}
', unsafe_allow_html=True ) # đŸ”č Interface Streamlit st.title("🧠 Assistant SQL") # Affichage de l'historique display_chat_history() # Barre latĂ©rale pour ajouter des descriptions with st.sidebar: st.title("â„č À propos") st.write("Posez vos questions SQL en langage naturel") # Champ de saisie EN BAS (comme ChatGPT) query = st.chat_input("Comment puis-je vous aider ?") if query: # Ajouter la question de l'utilisateur Ă  l'historique st.session_state.chat_history.append({"role": "user", "content": query}) # ExĂ©cution de l'agent SQL initial_state = {"query": query, "sql_query": "", "result": [], "user_intent": "other"} output = agent.invoke(initial_state) # Ajouter les rĂ©sultats Ă  l'historique sous forme de Markdown if "result" in output and output["result"]: if isinstance(output["result"], list) and output["result"]: # VĂ©rifie que les rĂ©sultats sont une liste non vide result_markdown = " " for row in output["result"]: if isinstance(row, dict): # Si les rĂ©sultats sont des dictionnaires result_markdown += "\n".join(f"{value}" for value in row.values()) + "\n" # Afficher seulement les valeurs sans parenthĂšses else: # Si les rĂ©sultats sont des chaĂźnes ou d'autres types result_markdown += f"- {row},\n" st.session_state.chat_history.append({"role": "assistant", "content": result_markdown}) else: st.session_state.chat_history.append({"role": "assistant", "content": "⚠ Aucun rĂ©sultat trouvĂ©."}) # RafraĂźchir la page pour afficher immĂ©diatement les nouvelles rĂ©ponses st.rerun()