Spaces:
Runtime error
Runtime error
| import io | |
| import os | |
| from datetime import datetime, date | |
| from typing import Dict, List, Optional, Tuple | |
| import smtplib | |
| import ssl | |
| from email.message import EmailMessage | |
| import pandas as pd | |
| import plotly.express as px | |
| import streamlit as st | |
| # ----------------------------- | |
| # App Configuration | |
| # ----------------------------- | |
| st.set_page_config( | |
| page_title="Tableau de bord des inscriptions", | |
| page_icon="🧭", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| ) | |
| # ----------------------------- | |
| # Utilities | |
| # ----------------------------- | |
| def try_parse_datetime(series: pd.Series) -> pd.Series: | |
| """Attempt to parse a pandas Series as datetimes, returning original on failure.""" | |
| if pd.api.types.is_datetime64_any_dtype(series): | |
| return series | |
| try: | |
| parsed = pd.to_datetime(series, errors="coerce") | |
| if parsed.notna().sum() >= max(3, int(0.2 * len(parsed))): | |
| return parsed | |
| except Exception: | |
| pass | |
| return series | |
| def make_unique_columns(columns: List[str]) -> List[str]: | |
| """Ensure column names are unique by appending suffixes (2), (3), ...""" | |
| seen: Dict[str, int] = {} | |
| unique_cols: List[str] = [] | |
| for name in columns: | |
| base = str(name) | |
| if base not in seen: | |
| seen[base] = 1 | |
| unique_cols.append(base) | |
| else: | |
| seen[base] += 1 | |
| unique_cols.append(f"{base} ({seen[base]})") | |
| return unique_cols | |
| def normalize_label(text: str) -> str: | |
| t = str(text).lower().strip() | |
| t = t.replace("\u00a0", " ").replace(" ", " ") | |
| t = " ".join(t.split()) | |
| return t | |
| def find_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]: | |
| """Return the first matching column by normalized name from candidates.""" | |
| norm_to_col = {normalize_label(c): c for c in df.columns} | |
| for cand in candidates: | |
| n = normalize_label(cand) | |
| if n in norm_to_col: | |
| return norm_to_col[n] | |
| return None | |
| def infer_pandas_types(df: pd.DataFrame) -> Dict[str, str]: | |
| """Return a mapping of column -> inferred logical type: 'categorical' | 'numeric' | 'date' | 'text'.""" | |
| type_map: Dict[str, str] = {} | |
| for col in df.columns: | |
| s = df[col] | |
| if pd.api.types.is_datetime64_any_dtype(s): | |
| type_map[col] = "date" | |
| elif pd.api.types.is_bool_dtype(s): | |
| type_map[col] = "categorical" | |
| elif pd.api.types.is_numeric_dtype(s): | |
| type_map[col] = "numeric" | |
| else: | |
| # try parse datetime heuristic | |
| parsed = try_parse_datetime(s) | |
| if pd.api.types.is_datetime64_any_dtype(parsed): | |
| type_map[col] = "date" | |
| else: | |
| # if low cardinality, treat as categorical | |
| nunique = s.astype(str).nunique(dropna=True) | |
| type_map[col] = "categorical" if nunique <= max(50, len(s) * 0.05) else "text" | |
| return type_map | |
| def dynamic_filters(df: pd.DataFrame, type_map: Dict[str, str]) -> pd.DataFrame: | |
| """Render dynamic filters for all columns and return the filtered DataFrame.""" | |
| filtered = df.copy() | |
| st.sidebar.markdown("### 🔎 Filtres dynamiques") | |
| for col in filtered.columns: | |
| logical = type_map.get(col, "text") | |
| if logical == "numeric" and pd.api.types.is_numeric_dtype(filtered[col]): | |
| series_num = pd.to_numeric(filtered[col], errors="coerce") | |
| valid = series_num.dropna() | |
| if valid.empty: | |
| st.sidebar.caption(f"{col}: aucune valeur numérique exploitable") | |
| continue | |
| min_v = float(valid.min()) | |
| max_v = float(valid.max()) | |
| if min_v == max_v: | |
| st.sidebar.caption(f"{col}: valeur unique {min_v}") | |
| # Filtrage inutile car une seule valeur | |
| continue | |
| vmin, vmax = st.sidebar.slider(f"{col} (min-max)", min_value=min_v, max_value=max_v, value=(min_v, max_v)) | |
| filtered = filtered[(series_num >= vmin) & (series_num <= vmax)] | |
| elif logical == "date": | |
| parsed = try_parse_datetime(filtered[col]) | |
| if pd.api.types.is_datetime64_any_dtype(parsed): | |
| dmin = parsed.min() | |
| dmax = parsed.max() | |
| start_end = st.sidebar.date_input(f"{col} (période)", value=(dmin.date() if pd.notna(dmin) else date.today(), dmax.date() if pd.notna(dmax) else date.today())) | |
| if isinstance(start_end, tuple) and len(start_end) == 2: | |
| start, end = start_end | |
| mask = (parsed.dt.date >= start) & (parsed.dt.date <= end) | |
| filtered = filtered[mask] | |
| else: | |
| # categorical or text -> multiselect of unique values (with limit) | |
| uniques = filtered[col].dropna().astype(str).unique().tolist() | |
| uniques = sorted(uniques)[:200] | |
| selected = st.sidebar.multiselect(f"{col}", options=uniques, default=[]) | |
| if selected: | |
| filtered = filtered[filtered[col].astype(str).isin(selected)] | |
| return filtered | |
| def apply_search(df: pd.DataFrame, query: str) -> pd.DataFrame: | |
| if not query: | |
| return df | |
| q = query.strip().lower() | |
| mask = pd.Series(False, index=df.index) | |
| for col in df.columns: | |
| col_values = df[col].astype(str).str.lower() | |
| mask = mask | col_values.str.contains(q, na=False) | |
| return df[mask] | |
| def to_excel_bytes(df: pd.DataFrame) -> bytes: | |
| buffer = io.BytesIO() | |
| with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer: | |
| df.to_excel(writer, index=False, sheet_name="inscriptions") | |
| return buffer.getvalue() | |
| def kpi_card(label: str, value: str): | |
| st.markdown( | |
| f""" | |
| <div class="card kpi"> | |
| <div class="card-label">{label}</div> | |
| <div class="card-value">{value}</div> | |
| </div> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| def chart_card(title: str, fig): | |
| st.markdown(f"<div class=\"card\"><div class=\"card-title\">{title}</div>", unsafe_allow_html=True) | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| def inject_base_css(): | |
| # Créer le dossier assets s'il n'existe pas | |
| if not os.path.exists("assets"): | |
| os.makedirs("assets") | |
| # Créer le fichier CSS s'il n'existe pas | |
| css_file = os.path.join("assets", "styles.css") | |
| if not os.path.exists(css_file): | |
| with open(css_file, "w", encoding="utf-8") as f: | |
| f.write(""" | |
| .card { | |
| background-color: var(--card); | |
| border-radius: 0.5rem; | |
| padding: 1rem; | |
| margin-bottom: 1rem; | |
| box-shadow: 0 1px 3px rgba(0,0,0,0.12), 0 1px 2px rgba(0,0,0,0.24); | |
| } | |
| .card-title { | |
| font-weight: bold; | |
| font-size: 1.2rem; | |
| margin-bottom: 0.5rem; | |
| color: var(--primary); | |
| } | |
| .kpi { | |
| text-align: center; | |
| padding: 1rem; | |
| } | |
| .card-label { | |
| font-size: 1rem; | |
| color: var(--muted); | |
| } | |
| .card-value { | |
| font-size: 2rem; | |
| font-weight: bold; | |
| color: var(--primary); | |
| } | |
| """) | |
| # Lire et injecter le CSS | |
| with open(css_file, "r", encoding="utf-8") as f: | |
| css = f.read() | |
| st.markdown(f"<style>{css}</style>", unsafe_allow_html=True) | |
| def safe_format_template(template: str, row: Dict[str, object]) -> str: | |
| class SafeDict(dict): | |
| def __missing__(self, key): | |
| return "" | |
| flat = {str(k): ("" if v is None else str(v)) for k, v in row.items()} | |
| try: | |
| return template.format_map(SafeDict(flat)) | |
| except Exception: | |
| return template | |
| def send_email_smtp( | |
| smtp_host: str, | |
| smtp_port: int, | |
| sender_email: str, | |
| sender_password: str, | |
| use_tls: bool, | |
| to_email: str, | |
| subject: str, | |
| body_text: str, | |
| reply_to: Optional[str] = None, | |
| ) -> None: | |
| message = EmailMessage() | |
| message["From"] = sender_email | |
| message["To"] = to_email | |
| message["Subject"] = subject | |
| if reply_to: | |
| message["Reply-To"] = reply_to | |
| message.set_content(body_text) | |
| if use_tls: | |
| context = ssl.create_default_context() | |
| with smtplib.SMTP(smtp_host, smtp_port) as server: | |
| server.starttls(context=context) | |
| if sender_password: | |
| server.login(sender_email, sender_password) | |
| server.send_message(message) | |
| else: | |
| with smtplib.SMTP_SSL(smtp_host, smtp_port) as server: | |
| if sender_password: | |
| server.login(sender_email, sender_password) | |
| server.send_message(message) | |
| def set_theme_variables(mode: str): | |
| # Adjust CSS variables for light/dark for cards and text; Plotly handled via template | |
| palette = { | |
| "light": { | |
| "--bg": "#f7f9fc", | |
| "--card": "#ffffff", | |
| "--text": "#0f172a", | |
| "--muted": "#64748b", | |
| "--primary": "#0ea5e9", | |
| "--accent": "#10b981", | |
| "--border": "#e5e7eb", | |
| }, | |
| "dark": { | |
| "--bg": "#0b1220", | |
| "--card": "#111827", | |
| "--text": "#e5e7eb", | |
| "--muted": "#94a3b8", | |
| "--primary": "#38bdf8", | |
| "--accent": "#34d399", | |
| "--border": "#1f2937", | |
| }, | |
| } | |
| colors = palette.get(mode, palette["light"]) | |
| styles = ":root{" + ";".join([f"{k}:{v}" for k, v in colors.items()]) + "}" | |
| st.markdown(f"<style>{styles}</style>", unsafe_allow_html=True) | |
| def get_plotly_template(mode: str) -> str: | |
| return "plotly_dark" if mode == "dark" else "plotly_white" | |
| # ----------------------------- | |
| # Sidebar: Logo, Upload, Theme, Column mapping | |
| # ----------------------------- | |
| def sidebar_controls() -> Tuple[Optional[pd.DataFrame], Dict[str, str], str, Dict[str, str], List[str]]: | |
| st.sidebar.markdown("## ⚙️ Contrôles") | |
| # Theme | |
| mode = st.sidebar.radio("Thème", options=["clair", "sombre"], horizontal=True, index=0) | |
| theme_mode = "dark" if mode == "sombre" else "light" | |
| set_theme_variables(theme_mode) | |
| # Logo (optional) | |
| logo_path = os.path.join("assets", "logo.png") | |
| if os.path.exists(logo_path): | |
| st.sidebar.image(logo_path, use_column_width=True) | |
| uploaded = st.sidebar.file_uploader("Importer un fichier Excel (.xlsx)", type=["xlsx"]) | |
| df: Optional[pd.DataFrame] = None | |
| if uploaded is not None: | |
| try: | |
| # Read first sheet by default | |
| df = pd.read_excel(uploaded, sheet_name=0) | |
| # Strip column names | |
| df.columns = [str(c).strip() for c in df.columns] | |
| # Ensure unique column names | |
| if pd.Index(df.columns).has_duplicates: | |
| df.columns = make_unique_columns(list(df.columns)) | |
| # Stocker dans session state pour les autres onglets | |
| st.session_state['df'] = df | |
| st.session_state['filtered_df'] = df.copy() | |
| except Exception as e: | |
| st.sidebar.error(f"Erreur de lecture du fichier: {e}") | |
| else: | |
| # Récupérer les données du session state si disponible | |
| if 'df' in st.session_state: | |
| df = st.session_state['df'] | |
| logical_types: Dict[str, str] = {} | |
| coercions: Dict[str, str] = {} | |
| unique_keys: List[str] = [] | |
| if df is not None and not df.empty: | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### 🧹 Nettoyage & types") | |
| # Global cleaning options | |
| trim_spaces = st.sidebar.checkbox("Supprimer les espaces autour du texte", value=True) | |
| lower_case = st.sidebar.checkbox("Mettre le texte en minuscules", value=False) | |
| drop_dupes = st.sidebar.checkbox("Supprimer les doublons", value=False) | |
| dedup_subset_cols: List[str] = [] | |
| dedup_keep_choice = "first" | |
| if drop_dupes: | |
| dedup_subset_cols = st.sidebar.multiselect( | |
| "Colonnes à considérer (vide = toutes)", options=list(df.columns), help="Sélectionnez les colonnes sur lesquelles détecter les doublons." | |
| ) | |
| dedup_keep_choice = st.sidebar.selectbox( | |
| "Conserver", | |
| options=["first", "last", "none"], | |
| index=0, | |
| help="Quelle occurrence conserver pour chaque doublon détecté", | |
| ) | |
| fillna_blank = st.sidebar.checkbox("Remplacer NaN texte par vide", value=True) | |
| # Remove selected columns | |
| drop_columns = st.sidebar.multiselect( | |
| "Enlever des colonnes", | |
| options=list(df.columns), | |
| default=[], | |
| help="Supprimer des champs du jeu de données avant l'analyse", | |
| key="clean_drop_cols", | |
| ) | |
| if drop_columns: | |
| df.drop(columns=drop_columns, inplace=True, errors="ignore") | |
| # Infer and allow override per column | |
| inferred = infer_pandas_types(df) | |
| for col in df.columns: | |
| logical_types[col] = st.sidebar.selectbox( | |
| f"Type pour {col}", options=["categorical", "numeric", "date", "text"], index=["categorical", "numeric", "date", "text"].index(inferred.get(col, "text")) | |
| ) | |
| # Optional coercion | |
| if logical_types[col] in ("numeric", "date"): | |
| coercions[col] = logical_types[col] | |
| # Apply cleaning | |
| for col in df.columns: | |
| if df[col].dtype == object: | |
| if trim_spaces: | |
| df[col] = df[col].astype(str).str.strip() | |
| if lower_case: | |
| df[col] = df[col].astype(str).str.lower() | |
| if fillna_blank: | |
| df[col] = df[col].replace({pd.NA: "", None: ""}) | |
| # Coerce types | |
| if coercions.get(col) == "numeric": | |
| df[col] = pd.to_numeric(df[col], errors="coerce") | |
| elif coercions.get(col) == "date": | |
| df[col] = try_parse_datetime(df[col]) | |
| if drop_dupes: | |
| keep_arg = None if dedup_keep_choice == "none" else dedup_keep_choice | |
| df.drop_duplicates(subset=(dedup_subset_cols if dedup_subset_cols else None), keep=keep_arg, inplace=True) | |
| # Unique person keys | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### 👤 Personne unique") | |
| # Heuristic suggestions | |
| hints = ["email", "e-mail", "mail", "id", "identifiant", "cin", "passport", "matricule", "phone", "téléphone", "telephone", "tel"] | |
| suggested = [c for c in df.columns if any(h in c.lower() for h in hints)] | |
| unique_keys = st.sidebar.multiselect( | |
| "Champs d'unicité (sélection multiple)", options=list(df.columns), default=suggested, help="Sélectionnez les champs qui identifient de façon unique une personne." | |
| ) | |
| # Stocker les types et clés dans session state | |
| st.session_state['logical_types'] = logical_types | |
| st.session_state['unique_keys'] = unique_keys | |
| st.session_state['filtered_df'] = df.copy() | |
| return df, logical_types, theme_mode, coercions, unique_keys | |
| # ----------------------------- | |
| # Page: Tableau de bord | |
| # ----------------------------- | |
| def page_tableau_de_bord(): | |
| st.markdown("<h2>📊 Tableau de bord</h2>", unsafe_allow_html=True) | |
| if 'df' not in st.session_state or st.session_state['df'] is None: | |
| st.markdown( | |
| """ | |
| <div class="card"> | |
| <div class="card-title">Bienvenue 👋</div> | |
| <p>Importez un fichier <b>.xlsx</b> contenant vos inscriptions pour commencer l'analyse.</p> | |
| <ul> | |
| <li>Assurez-vous que les colonnes principales (pays, formation, statut, date) sont présentes.</li> | |
| <li>Vous pourrez mapper les colonnes dans la barre latérale.</li> | |
| </ul> | |
| </div> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| return | |
| df = st.session_state['df'] | |
| type_map = st.session_state.get('logical_types', {}) | |
| unique_keys = st.session_state.get('unique_keys', []) | |
| theme_mode = "dark" if st.session_state.get('theme_mode') == "dark" else "light" | |
| plotly_template = get_plotly_template(theme_mode) | |
| # Filters (dynamic for all columns) | |
| st.sidebar.markdown("---") | |
| filtered_df = dynamic_filters(df, type_map) | |
| # Optional unique-person filtering using selected keys | |
| st.sidebar.markdown("### 👤 Filtrer par personne unique") | |
| if unique_keys: | |
| person_filter = st.sidebar.checkbox("Activer le filtre d'unicité (drop_duplicates)", value=False, key="unique_filter_toggle") | |
| keep_strategy = st.sidebar.selectbox("Conserver", options=["first", "last"], index=0, key="unique_filter_keep") | |
| if person_filter: | |
| try: | |
| filtered_df = filtered_df.drop_duplicates(subset=unique_keys, keep=keep_strategy) | |
| except Exception: | |
| st.sidebar.warning("Impossible d'appliquer le filtre d'unicité. Vérifiez les champs choisis.") | |
| # Mettre à jour le dataframe filtré dans session state | |
| st.session_state['filtered_df'] = filtered_df | |
| # KPIs | |
| total_count = len(filtered_df) | |
| total_columns = filtered_df.shape[1] | |
| total_missing = int(filtered_df.isna().sum().sum()) | |
| approx_dupes = int(filtered_df.duplicated().sum()) | |
| c1, c2, c3, c4 = st.columns(4) | |
| with c1: | |
| kpi_card("Lignes", f"{total_count:,}") | |
| with c2: | |
| kpi_card("Colonnes", f"{total_columns:,}") | |
| with c3: | |
| kpi_card("Valeurs manquantes", f"{total_missing:,}") | |
| with c4: | |
| kpi_card("Doublons (approx)", f"{approx_dupes:,}") | |
| # Unique persons KPI (based on selected keys) | |
| if unique_keys: | |
| try: | |
| uniq = ( | |
| filtered_df.dropna(subset=unique_keys)[unique_keys] | |
| .astype(str) | |
| .drop_duplicates() | |
| .shape[0] | |
| ) | |
| except Exception: | |
| uniq = 0 | |
| c5, _ = st.columns([1, 3]) | |
| with c5: | |
| kpi_card("Personnes uniques", f"{uniq:,}") | |
| # Charts row 1: Program distribution, Country distribution | |
| st.markdown("<div class=\"card\"><div class=\"card-title\">Répartitions clés</div>", unsafe_allow_html=True) | |
| ctrl1, ctrl2, ctrl3 = st.columns([1,1,2]) | |
| with ctrl1: | |
| topn = st.slider("Top N", min_value=3, max_value=50, value=10, step=1) | |
| with ctrl2: | |
| sort_dir = st.selectbox("Tri", options=["desc", "asc"], index=0) | |
| with ctrl3: | |
| st.caption("Appliqué aux graphiques de répartition ci-dessous") | |
| charts_row_1 = st.columns(2) | |
| # Choose any categorical column for distribution 1 | |
| cat_cols_all = [c for c in filtered_df.columns if type_map.get(c) in ("categorical", "text")] | |
| if cat_cols_all and not filtered_df.empty: | |
| dim1 = st.selectbox("Dimension 1 (répartition)", options=cat_cols_all, key="rep_dim1") | |
| program_counts = ( | |
| filtered_df.groupby(dim1).size().reset_index(name="count").sort_values("count", ascending=(sort_dir=="asc")) | |
| .head(topn) | |
| ) | |
| fig_prog = px.bar( | |
| program_counts, | |
| x=dim1, | |
| y="count", | |
| template=plotly_template, | |
| color_continuous_scale="Blues", | |
| ) | |
| fig_prog.update_layout(margin=dict(l=10, r=10, t=10, b=10)) | |
| with charts_row_1[0]: | |
| chart_card("Répartition (dimension 1)", fig_prog) | |
| if cat_cols_all and not filtered_df.empty: | |
| dim2 = st.selectbox("Dimension 2 (répartition)", options=[c for c in cat_cols_all], index=min(1, len(cat_cols_all)-1), key="rep_dim2") | |
| country_counts = ( | |
| filtered_df.groupby(dim2).size().reset_index(name="count").sort_values("count", ascending=(sort_dir=="asc")) | |
| .head(topn) | |
| ) | |
| fig_country = px.pie( | |
| country_counts, | |
| names=dim2, | |
| values="count", | |
| template=plotly_template, | |
| hole=0.35, | |
| ) | |
| fig_country.update_layout(margin=dict(l=10, r=10, t=10, b=10)) | |
| with charts_row_1[1]: | |
| chart_card("Répartition (dimension 2)", fig_country) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Charts row 2: Status distribution | |
| charts_row_2 = st.columns(2) | |
| if cat_cols_all and not filtered_df.empty: | |
| dim3 = st.selectbox("Dimension 3", options=cat_cols_all, key="rep_dim3") | |
| status_counts = ( | |
| filtered_df.groupby(dim3).size().reset_index(name="count").sort_values("count", ascending=False) | |
| ) | |
| fig_status = px.bar( | |
| status_counts, | |
| x=dim3, | |
| y="count", | |
| template=plotly_template, | |
| color=dim3, | |
| ) | |
| fig_status.update_layout(showlegend=False, margin=dict(l=10, r=10, t=10, b=10)) | |
| with charts_row_2[0]: | |
| chart_card("Répartition (dimension 3)", fig_status) | |
| # Affichage des données | |
| search_query = st.text_input("Recherche globale", key="search_dashboard") | |
| df_searched = apply_search(filtered_df, search_query) | |
| st.dataframe(df_searched, use_container_width=True, hide_index=True) | |
| # Downloads | |
| csv_bytes = df_searched.to_csv(index=False).encode("utf-8-sig") | |
| xlsx_bytes = to_excel_bytes(df_searched) | |
| dc1, dc2 = st.columns(2) | |
| with dc1: | |
| st.download_button( | |
| "Télécharger CSV", | |
| data=csv_bytes, | |
| file_name="inscriptions_filtrees.csv", | |
| mime="text/csv", | |
| use_container_width=True, | |
| ) | |
| with dc2: | |
| st.download_button( | |
| "Télécharger Excel", | |
| data=xlsx_bytes, | |
| file_name="inscriptions_filtrees.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| use_container_width=True, | |
| ) | |
| # ----------------------------- | |
| # Page: Zone d'analyse | |
| # ----------------------------- | |
| def page_analyses(): | |
| st.markdown("<h2>📋 Analyses avancées</h2>", unsafe_allow_html=True) | |
| if 'filtered_df' not in st.session_state or st.session_state['filtered_df'] is None: | |
| st.warning("Veuillez d'abord importer et configurer des données dans l'onglet Tableau de bord.") | |
| return | |
| filtered_df = st.session_state['filtered_df'] | |
| type_map = st.session_state.get('logical_types', {}) | |
| theme_mode = "dark" if st.session_state.get('theme_mode') == "dark" else "light" | |
| plotly_template = get_plotly_template(theme_mode) | |
| # Ad-hoc analysis builder | |
| st.markdown("<div class=\"card\"><div class=\"card-title\">Zone d'analyse</div>", unsafe_allow_html=True) | |
| cat_cols = [c for c in filtered_df.columns if type_map.get(c) in ("categorical", "text")] | |
| if cat_cols: | |
| ac1, ac2, ac3 = st.columns([2,1,1]) | |
| with ac1: | |
| dim_col = st.selectbox("Dimension", options=cat_cols) | |
| with ac2: | |
| chart_type = st.selectbox("Type de graphique", options=["Barres", "Camembert"], index=0) | |
| with ac3: | |
| topn_dim = st.slider("Top N (dimension)", 3, 50, 10) | |
| agg = filtered_df.groupby(dim_col).size().reset_index(name="count").sort_values("count", ascending=False).head(topn_dim) | |
| if chart_type == "Barres": | |
| fig = px.bar(agg, x=dim_col, y="count", template=plotly_template) | |
| else: | |
| fig = px.pie(agg, names=dim_col, values="count", template=plotly_template, hole=0.35) | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Drilldown option (simple): filtrer sur une dimension/valeur | |
| st.markdown("<div class=\"card\"><div class=\"card-title\">Drilldown</div>", unsafe_allow_html=True) | |
| dd_cols = cat_cols | |
| dd1, dd2 = st.columns([1,2]) | |
| with dd1: | |
| dd_dim = st.selectbox("Drilldown - dimension", options=[None] + dd_cols) | |
| drill_df = filtered_df.copy() | |
| if dd_dim: | |
| values = [x for x in filtered_df[dd_dim].dropna().astype(str).unique()] | |
| with dd2: | |
| dd_val = st.selectbox("Valeur", options=[None] + values) | |
| if dd_val: | |
| drill_df = filtered_df[filtered_df[dd_dim].astype(str) == dd_val] | |
| search_query = st.text_input("Recherche globale", key="search_analysis") | |
| df_searched = apply_search(drill_df, search_query) | |
| st.dataframe(df_searched, use_container_width=True, hide_index=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Decision Maker View (field-aware, optional) | |
| st.markdown("<div class=\"card\"><div class=\"card-title\">Vue Décideur (si champs disponibles)</div>", unsafe_allow_html=True) | |
| # Candidate fields based on provided list | |
| col_email = find_column(filtered_df, ["Email"]) or find_column(filtered_df, ["E-mail"]) | |
| col_gender = find_column(filtered_df, ["Genre", "Autre genre (Veuillez préciser) : "]) | |
| col_nat = find_column(filtered_df, ["Nationalité"]) | |
| col_country = find_column(filtered_df, ["Pays de résidence"]) or find_column(filtered_df, ["D'où préférez-vous participer à l'événement ?"]) | |
| col_role = find_column(filtered_df, ["Votre profession / statut", "Autre profession (veuillez préciser)"]) | |
| col_aff = find_column(filtered_df, ["Affiliation", "Autre affiliation (Veuillez préciser) : "]) | |
| col_particip = find_column(filtered_df, ["Avez-vous déjà participé à un événement Indaba X Togo ?"]) | |
| col_mode_formation = find_column(filtered_df, ["Comment voulez-vous participer aux formations ?"]) | |
| col_what_do = find_column(filtered_df, ["Que voulez-vous faire ?"]) | |
| col_skills = { | |
| "Python": find_column(filtered_df, ["Quel est votre niveau en [Python]", "Quel est votre niveau en [Python]"]), | |
| "Numpy": find_column(filtered_df, ["Quel est votre niveau en [Numpy]", "Quel est votre niveau en [Numpy]"]), | |
| "Pandas": find_column(filtered_df, ["Quel est votre niveau en [Pandas]", "Quel est votre niveau en [Pandas]"]), | |
| "Scikit Learn": find_column(filtered_df, ["Quel est votre niveau en [Scikit Learn]", "Quel est votre niveau en [Scikit Learn]"]), | |
| "Pytorch": find_column(filtered_df, ["Quel est votre niveau en [Pytorch]", "Quel est votre niveau en [Pytorch]"]), | |
| "Deep Learning": find_column(filtered_df, ["Quel est votre niveau en [Deep Learning]", "Quel est votre niveau en [Deep Learning]"]), | |
| } | |
| # KPIs for decision maker | |
| kcols = st.columns(4) | |
| with kcols[0]: | |
| kpi_card("Inscriptions", f"{len(filtered_df):,}") | |
| with kcols[1]: | |
| if col_email: | |
| uniq_people = filtered_df[col_email].astype(str).str.strip().str.lower().dropna().nunique() | |
| kpi_card("Personnes uniques (email)", f"{uniq_people:,}") | |
| else: | |
| kpi_card("Personnes uniques", "-") | |
| with kcols[2]: | |
| if col_country and col_country in filtered_df.columns: | |
| kpi_card("Pays (distincts)", f"{filtered_df[col_country].astype(str).nunique():,}") | |
| else: | |
| kpi_card("Pays (distincts)", "-") | |
| with kcols[3]: | |
| if col_role and col_role in filtered_df.columns: | |
| kpi_card("Profils (distincts)", f"{filtered_df[col_role].astype(str).nunique():,}") | |
| else: | |
| kpi_card("Profils (distincts)", "-") | |
| # Row 1 charts: Gender, Country | |
| dm1 = st.columns(2) | |
| if col_gender and col_gender in filtered_df.columns and not filtered_df.empty: | |
| gcounts = filtered_df.groupby(col_gender).size().reset_index(name="count").sort_values("count", ascending=False) | |
| fig_g = px.pie(gcounts, names=col_gender, values="count", template=get_plotly_template(theme_mode), hole=0.35) | |
| with dm1[0]: | |
| chart_card("Répartition par genre", fig_g) | |
| if col_country and col_country in filtered_df.columns and not filtered_df.empty: | |
| ccounts = filtered_df.groupby(col_country).size().reset_index(name="count").sort_values("count", ascending=False).head(15) | |
| fig_c = px.bar(ccounts, x=col_country, y="count", template=get_plotly_template(theme_mode)) | |
| with dm1[1]: | |
| chart_card("Top 15 pays de résidence", fig_c) | |
| # Row 2: Participation history and roles | |
| dm2 = st.columns(2) | |
| if col_particip and col_particip in filtered_df.columns and not filtered_df.empty: | |
| pcounts = filtered_df.groupby(col_particip).size().reset_index(name="count") | |
| fig_p = px.bar(pcounts, x=col_particip, y="count", template=get_plotly_template(theme_mode)) | |
| with dm2[0]: | |
| chart_card("A déjà participé ?", fig_p) | |
| if col_role and col_role in filtered_df.columns and not filtered_df.empty: | |
| rcounts = filtered_df.groupby(col_role).size().reset_index(name="count").sort_values("count", ascending=False).head(15) | |
| fig_r = px.bar(rcounts, x=col_role, y="count", template=get_plotly_template(theme_mode)) | |
| with dm2[1]: | |
| chart_card("Professions / Statuts (Top 15)", fig_r) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # ----------------------------- | |
| # Page: Constructeur de graphiques | |
| # ----------------------------- | |
| def page_constructeur_graphiques(): | |
| st.markdown("<h2>📈 Constructeur de graphiques</h2>", unsafe_allow_html=True) | |
| if 'filtered_df' not in st.session_state or st.session_state['filtered_df'] is None: | |
| st.warning("Veuillez d'abord importer et configurer des données dans l'onglet Tableau de bord.") | |
| return | |
| filtered_df = st.session_state['filtered_df'] | |
| type_map = st.session_state.get('logical_types', {}) | |
| theme_mode = "dark" if st.session_state.get('theme_mode') == "dark" else "light" | |
| plotly_template = get_plotly_template(theme_mode) | |
| # Universal Chart Builder | |
| st.markdown("<div class=\"card\"><div class=\"card-title\">Constructeur de graphiques</div>", unsafe_allow_html=True) | |
| chart_types = [ | |
| "Barres", | |
| "Barres empilées", | |
| "Lignes", | |
| "Aires", | |
| "Camembert", | |
| "Histogramme", | |
| "Nuage de points", | |
| "Boîte (Box)", | |
| "Violon", | |
| ] | |
| cA, cB, cC = st.columns([1.2, 1, 1]) | |
| with cA: | |
| chosen_chart = st.selectbox("Type de graphique", options=chart_types, key="ub_chart_type") | |
| with cB: | |
| agg_choice = st.selectbox("Agrégat", options=["count", "sum", "mean", "median", "min", "max"], index=0, key="ub_agg") | |
| with cC: | |
| topn_builder = st.number_input("Top N (optionnel)", min_value=0, value=0, step=1, help="0 pour désactiver") | |
| all_cols = list(filtered_df.columns) | |
| num_cols = [c for c in all_cols if pd.api.types.is_numeric_dtype(filtered_df[c])] | |
| date_cols_any = [c for c in all_cols if pd.api.types.is_datetime64_any_dtype(try_parse_datetime(filtered_df[c]))] | |
| cat_cols_any = [c for c in all_cols if c not in num_cols] | |
| def aggregate_df(df_src: pd.DataFrame, x_col: Optional[str], y_col: Optional[str], color_col: Optional[str]) -> pd.DataFrame: | |
| if agg_choice == "count": | |
| if x_col is not None and y_col is None: | |
| return df_src.groupby([x_col, color_col] if color_col else [x_col]).size().reset_index(name="value") | |
| elif x_col is None and y_col is not None: | |
| return df_src.groupby([y_col, color_col] if color_col else [y_col]).size().reset_index(name="value") | |
| elif x_col is not None and y_col is not None: | |
| return df_src.groupby([x_col, y_col]).size().reset_index(name="value") | |
| else: | |
| return pd.DataFrame({"value": [len(df_src)]}) | |
| else: | |
| agg_func = agg_choice | |
| measure = y_col if (y_col in num_cols) else (x_col if (x_col in num_cols) else (num_cols[0] if num_cols else None)) | |
| if measure is None: | |
| return df_src.groupby([x_col, color_col] if color_col else [x_col]).size().reset_index(name="value") if x_col else pd.DataFrame({"value": [len(df_src)]}) | |
| group_keys = [k for k in [x_col, color_col] if k] | |
| out = df_src.groupby(group_keys, dropna=False)[measure].agg(agg_func).reset_index(name="value") | |
| return out | |
| if chosen_chart in ("Barres", "Barres empilées"): | |
| x = st.selectbox("Axe X (cat/date)", options=cat_cols_any, key="ub_bar_x") | |
| color = st.selectbox("Couleur (optionnel)", options=[None] + cat_cols_any, key="ub_bar_color") | |
| measure = st.selectbox("Mesure (numérique ou count)", options=["(count)"] + num_cols, key="ub_bar_measure") | |
| data = aggregate_df(filtered_df, x, None if measure == "(count)" else measure, color) | |
| if topn_builder and topn_builder > 0 and x in data.columns: | |
| data = data.sort_values("value", ascending=False).groupby(x).head(1).head(int(topn_builder)) | |
| if chosen_chart == "Barres": | |
| fig = px.bar(data, x=x, y="value", color=color, template=plotly_template, barmode="group") | |
| else: | |
| fig = px.bar(data, x=x, y="value", color=color, template=plotly_template, barmode="relative") | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| elif chosen_chart in ("Lignes", "Aires"): | |
| x = st.selectbox("Axe X (date recommandé)", options=date_cols_any or cat_cols_any, key="ub_line_x") | |
| color = st.selectbox("Couleur (optionnel)", options=[None] + cat_cols_any, key="ub_line_color") | |
| measure = st.selectbox("Mesure (numérique ou count)", options=["(count)"] + num_cols, key="ub_line_measure") | |
| data = aggregate_df(filtered_df, x, None if measure == "(count)" else measure, color) | |
| if chosen_chart == "Lignes": | |
| fig = px.line(data, x=x, y="value", color=color, template=plotly_template) | |
| else: | |
| fig = px.area(data, x=x, y="value", color=color, template=plotly_template) | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| elif chosen_chart == "Camembert": | |
| names = st.selectbox("Noms (catégorie)", options=cat_cols_any, key="ub_pie_names") | |
| measure = st.selectbox("Mesure (numérique ou count)", options=["(count)"] + num_cols, key="ub_pie_measure") | |
| if measure == "(count)": | |
| data = filtered_df.groupby(names).size().reset_index(name="value") | |
| else: | |
| data = filtered_df.groupby(names)[measure].sum().reset_index(name="value") | |
| fig = px.pie(data, names=names, values="value", template=plotly_template, hole=0.35) | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| elif chosen_chart == "Histogramme": | |
| x = st.selectbox("Colonne numérique", options=num_cols, key="ub_hist_x") | |
| bins = st.slider("Nb de bacs (bins)", 5, 100, 30) | |
| fig = px.histogram(filtered_df, x=x, nbins=bins, template=plotly_template) | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| elif chosen_chart == "Nuage de points": | |
| x = st.selectbox("X (numérique)", options=num_cols, key="ub_scatter_x") | |
| y = st.selectbox("Y (numérique)", options=[c for c in num_cols if c != x], key="ub_scatter_y") | |
| color = st.selectbox("Couleur (optionnel)", options=[None] + cat_cols_any, key="ub_scatter_color") | |
| fig = px.scatter(filtered_df, x=x, y=y, color=color, template=plotly_template) | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| elif chosen_chart == "Boîte (Box)": | |
| y = st.selectbox("Y (numérique)", options=num_cols, key="ub_box_y") | |
| x = st.selectbox("X (catégorie optionnel)", options=[None] + cat_cols_any, key="ub_box_x") | |
| fig = px.box(filtered_df, x=x, y=y, template=plotly_template) | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| elif chosen_chart == "Violon": | |
| y = st.selectbox("Y (numérique)", options=num_cols, key="ub_violin_y") | |
| x = st.selectbox("X (catégorie optionnel)", options=[None] + cat_cols_any, key="ub_violin_x") | |
| fig = px.violin(filtered_df, x=x, y=y, template=plotly_template, box=True, points="outliers") | |
| st.plotly_chart(fig, use_container_width=True, theme=None) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # ----------------------------- | |
| # Page: Envoi d'emails | |
| # ----------------------------- | |
| def page_emails(): | |
| st.markdown("<h2>✉️ Envoi d'emails</h2>", unsafe_allow_html=True) | |
| if 'filtered_df' not in st.session_state or st.session_state['filtered_df'] is None: | |
| st.warning("Veuillez d'abord importer et configurer des données dans l'onglet Tableau de bord.") | |
| return | |
| filtered_df = st.session_state['filtered_df'] | |
| # Email Sender Section | |
| st.markdown("<div class=\"card\"><div class=\"card-title\">✉️ Envoi d'emails (CSV ou données filtrées)</div>", unsafe_allow_html=True) | |
| ecols1 = st.columns([1, 1]) | |
| with ecols1[0]: | |
| st.caption("Source des destinataires") | |
| use_current = st.radio( | |
| "Choisir la source", | |
| options=["Données filtrées actuelles", "Importer un CSV/XLSX"], | |
| horizontal=False, | |
| index=0, | |
| key="email_source_choice", | |
| ) | |
| with ecols1[1]: | |
| st.caption("Fichier (si import)") | |
| upload_mail = st.file_uploader("Importer un fichier", type=["csv", "xlsx"], key="email_upload_file") | |
| recipients_df: Optional[pd.DataFrame] = None | |
| if use_current == "Données filtrées actuelles": | |
| recipients_df = filtered_df.copy() | |
| else: | |
| if upload_mail is not None: | |
| try: | |
| if upload_mail.name.lower().endswith(".csv"): | |
| recipients_df = pd.read_csv(upload_mail) | |
| else: | |
| recipients_df = pd.read_excel(upload_mail) | |
| recipients_df.columns = [str(c).strip() for c in recipients_df.columns] | |
| except Exception as e: | |
| st.error(f"Erreur de lecture du fichier: {e}") | |
| if recipients_df is None or recipients_df.empty: | |
| st.info("Importez un fichier ou utilisez les données filtrées pour continuer.") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| return | |
| # Mapping email column | |
| email_col_guess = find_column(recipients_df, ["email", "e-mail", "mail"]) or ("Email" if "Email" in recipients_df.columns else None) | |
| email_col = st.selectbox( | |
| "Colonne email", | |
| options=list(recipients_df.columns), | |
| index=(list(recipients_df.columns).index(email_col_guess) if email_col_guess in recipients_df.columns else 0), | |
| help="Sélectionnez la colonne contenant les adresses email", | |
| key="email_col_select", | |
| ) | |
| # SMTP settings | |
| st.markdown("<div class=\"card\" style=\"margin-top: 0.75rem;\"><div class=\"card-title\">Paramètres SMTP</div>", unsafe_allow_html=True) | |
| s1, s2, s3, s4 = st.columns([1.2, 0.8, 1, 1]) | |
| with s1: | |
| smtp_host = st.text_input("Hôte SMTP", value=os.environ.get("SMTP_HOST", "smtp.gmail.com")) | |
| with s2: | |
| smtp_port = st.number_input("Port", min_value=1, max_value=65535, value=int(os.environ.get("SMTP_PORT", 587))) | |
| with s3: | |
| use_tls = st.selectbox("Sécurité", options=["STARTTLS", "SSL"], index=0) == "STARTTLS" | |
| with s4: | |
| reply_to = st.text_input("Reply-To (optionnel)", value=os.environ.get("SMTP_REPLY_TO", "")) | |
| s5, s6 = st.columns([1, 1]) | |
| with s5: | |
| sender_email = st.text_input("Adresse expéditrice", value=os.environ.get("SMTP_SENDER", "")) | |
| with s6: | |
| sender_password = st.text_input("Mot de passe/clé appli", type="password", value=os.environ.get("SMTP_PASSWORD", "")) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Composition | |
| st.markdown("<div class=\"card\" style=\"margin-top: 0.75rem;\"><div class=\"card-title\">Composer le message</div>", unsafe_allow_html=True) | |
| placeholders = ", ".join([f"{{{c}}}" for c in recipients_df.columns]) | |
| subj = st.text_input("Objet", placeholder="Objet de l'email. Vous pouvez utiliser des variables comme {Nom}") | |
| body = st.text_area( | |
| "Corps (texte)", | |
| height=180, | |
| placeholder="Bonjour {Prenom} {Nom},\n\nVotre statut: {Statut}\n...", | |
| help=f"Variables disponibles: {placeholders}", | |
| ) | |
| st.caption("Astuce: utilisez {NomColonne} pour insérer des champs du CSV.") | |
| # Preview first recipient | |
| pv1, pv2 = st.columns([1, 1]) | |
| with pv1: | |
| st.subheader("Aperçu des données (5)") | |
| st.dataframe(recipients_df.head(5), use_container_width=True, hide_index=True) | |
| with pv2: | |
| st.subheader("Aperçu email (1er destinataire)") | |
| try: | |
| if not recipients_df.empty: | |
| row0 = recipients_df.iloc[0].to_dict() | |
| st.write("À:", recipients_df[email_col].iloc[0]) | |
| st.write("Objet:", safe_format_template(subj, row0)) | |
| st.code(safe_format_template(body, row0)) | |
| except Exception: | |
| st.caption("Impossible de générer l'aperçu.") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Sending controls | |
| st.markdown("<div class=\"card\" style=\"margin-top: 0.75rem;\"><div class=\"card-title\">Envoi</div>", unsafe_allow_html=True) | |
| c_left, c_mid, c_right = st.columns([1, 1, 1]) | |
| with c_left: | |
| limit_send = st.number_input("Limiter (0 = tout)", min_value=0, value=0, help="Pour tester, limiter le nombre d'emails envoyés") | |
| with c_mid: | |
| start_at = st.number_input("Début à l'index", min_value=0, value=0) | |
| with c_right: | |
| confirm = st.checkbox("Je confirme vouloir envoyer ces emails", value=False) | |
| do_send = st.button("Envoyer", type="primary", use_container_width=True, disabled=not confirm) | |
| if do_send: | |
| if not sender_email or not smtp_host or not subj or not body: | |
| st.error("Veuillez remplir l'hôte SMTP, l'adresse expéditrice, l'objet et le corps.") | |
| else: | |
| total = len(recipients_df) | |
| indices = list(range(start_at, total)) | |
| if limit_send and limit_send > 0: | |
| indices = indices[: int(limit_send)] | |
| progress = st.progress(0) | |
| sent_ok = 0 | |
| log_container = st.container() | |
| for idx_i, i in enumerate(indices, start=1): | |
| try: | |
| row = recipients_df.iloc[i] | |
| to_addr = str(row[email_col]).strip() | |
| if not to_addr or "@" not in to_addr: | |
| raise ValueError("Adresse email invalide") | |
| row_dict = row.to_dict() | |
| subject_i = safe_format_template(subj, row_dict) | |
| body_i = safe_format_template(body, row_dict) | |
| send_email_smtp( | |
| smtp_host=smtp_host, | |
| smtp_port=int(smtp_port), | |
| sender_email=sender_email, | |
| sender_password=sender_password, | |
| use_tls=use_tls, | |
| to_email=to_addr, | |
| subject=subject_i, | |
| body_text=body_i, | |
| reply_to=(reply_to or None), | |
| ) | |
| sent_ok += 1 | |
| log_container.success(f"Envoyé à {to_addr}") | |
| except Exception as e: | |
| log_container.error(f"Échec pour index {i}: {e}") | |
| progress.progress(int(idx_i * 100 / max(1, len(indices)))) | |
| st.info(f"Terminé. Succès: {sent_ok}/{len(indices)}") | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # ----------------------------- | |
| # Main App | |
| # ----------------------------- | |
| def main(): | |
| inject_base_css() | |
| # Header | |
| col_logo, col_title, col_right = st.columns([1, 3, 1]) | |
| with col_logo: | |
| logo_path = os.path.join("assets", "logo.png") | |
| if os.path.exists(logo_path): | |
| st.image(logo_path, width=72) | |
| with col_title: | |
| st.markdown("<h1 style='text-align:center; margin-top: 0;'>Tableau de bord des inscriptions</h1>", unsafe_allow_html=True) | |
| with col_right: | |
| st.write("") | |
| # Charger les contrôles de la barre latérale | |
| # (ces contrôles sont partagés entre tous les onglets) | |
| df, type_map, theme_mode, _, unique_keys = sidebar_controls() | |
| # Stocker les types dans session_state pour les autres onglets | |
| if df is not None: | |
| st.session_state['logical_types'] = type_map | |
| st.session_state['unique_keys'] = unique_keys | |
| st.session_state['theme_mode'] = theme_mode | |
| # Onglets de l'application | |
| tab1, tab2, tab3, tab4 = st.tabs([ | |
| "📊 Tableau de bord", | |
| "📋 Analyses avancées", | |
| "📈 Constructeur graphiques", | |
| "✉️ Envoi emails" | |
| ]) | |
| with tab1: | |
| page_tableau_de_bord() | |
| with tab2: | |
| page_analyses() | |
| with tab3: | |
| page_constructeur_graphiques() | |
| with tab4: | |
| page_emails() | |
| if __name__ == "__main__": | |
| main() |