Spaces:
Sleeping
Sleeping
| # app.py | |
| import json | |
| import re | |
| import tempfile | |
| from datetime import datetime, timedelta | |
| from dateutil import tz | |
| import gradio as gr | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import matplotlib.dates as mdates | |
| import folium | |
| from matplotlib import cm | |
| import branca.colormap as bcm | |
| from grafanalib.core import ( | |
| Dashboard, Graph, Row, Target, YAxis, YAxes, Time | |
| ) | |
| TAIPEI = tz.gettz("Asia/Taipei") | |
| # ----------------------------- | |
| # Google Drive 連結處理 | |
| # ----------------------------- | |
| DRIVE_PRESETS = [ | |
| "https://drive.google.com/file/d/15yZ4QicICKZCnX6vjcD9JNXjnJmMFJD4/view?usp=drivesdk", | |
| "https://drive.google.com/file/d/1dqazYh_YzNNMbkUpgLRKSE9Y3ioPhtFu/view?usp=drivesdk", | |
| "https://drive.google.com/file/d/1A23f4q8DXHpoRIN5UQsDd6eM8jJ_Ruf8/view?usp=drivesdk", | |
| ] | |
| def normalize_drive_url(url: str) -> str: | |
| """ | |
| 接受 Google Drive / Google Sheets 各式分享連結,回傳可直接給 pandas 讀取 CSV 的 URL。 | |
| - Sheets: .../spreadsheets/d/<ID>/edit → .../export?format=csv | |
| - Drive File: .../file/d/<ID>/view → https://drive.google.com/uc?export=download&id=<ID> | |
| """ | |
| if not isinstance(url, str) or not url.strip(): | |
| raise ValueError("請提供有效的 Google 連結") | |
| url = url.strip() | |
| # Sheets | |
| m = re.search(r"https://docs\.google\.com/spreadsheets/d/([a-zA-Z0-9-_]+)", url) | |
| if m: | |
| sheet_id = m.group(1) | |
| return f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv" | |
| # Drive file | |
| m = re.search(r"https://drive\.google\.com/file/d/([a-zA-Z0-9-_]+)/", url) | |
| if m: | |
| file_id = m.group(1) | |
| return f"https://drive.google.com/uc?export=download&id={file_id}" | |
| return url | |
| # ----------------------------- | |
| # Demo / Data loading | |
| # ----------------------------- | |
| def make_demo_dataframe() -> pd.DataFrame: | |
| """隨機示範資料:含經緯度 + pid""" | |
| t0 = datetime.now(tz=TAIPEI) - timedelta(minutes=60) | |
| times = [t0 + timedelta(minutes=i) for i in range(61)] | |
| amp = np.random.rand(len(times)) | |
| cnt = np.random.randint(0, 11, size=len(times)) | |
| lats = np.random.uniform(21.8, 25.3, size=len(times)) | |
| lons = np.random.uniform(120.0, 122.0, size=len(times)) | |
| df = pd.DataFrame({ | |
| "time": times, | |
| "amplitude": amp, | |
| "count": cnt, | |
| "lat": lats, | |
| "lon": lons | |
| }) | |
| df["pid"] = np.arange(len(df)) | |
| return df | |
| def _finalize_time(df: pd.DataFrame) -> pd.DataFrame: | |
| """確保 time 欄位有時區、排序""" | |
| time_col = next((c for c in ["time", "timestamp", "datetime", "date"] if c in df.columns), None) | |
| if time_col is None: | |
| raise ValueError("資料需包含時間欄位(time/timestamp/datetime/date 其一)") | |
| df[time_col] = pd.to_datetime(df[time_col]) | |
| df = df.rename(columns={time_col: "time"}) | |
| if getattr(df["time"].dt, "tz", None) is None: | |
| df["time"] = df["time"].dt.tz_localize(TAIPEI) | |
| else: | |
| df["time"] = df["time"].dt.tz_convert(TAIPEI) | |
| return df.sort_values("time").reset_index(drop=True) | |
| def load_csv(file: gr.File | None) -> pd.DataFrame: | |
| """讀上傳 CSV""" | |
| df = pd.read_csv(file.name) | |
| df = _finalize_time(df) | |
| # 若無 lat/lon,補隨機(避免地圖空白) | |
| if "lat" not in df.columns or "lon" not in df.columns: | |
| n = len(df) | |
| df["lat"] = np.random.uniform(21.8, 25.3, size=n) | |
| df["lon"] = np.random.uniform(120.0, 122.0, size=n) | |
| if "pid" not in df.columns: | |
| df["pid"] = np.arange(len(df)) | |
| return df | |
| def load_drive_csv(sheet_or_file_url: str) -> pd.DataFrame: | |
| """從 Google Sheets 或 Google Drive File 讀 CSV""" | |
| url = normalize_drive_url(sheet_or_file_url) | |
| df = pd.read_csv(url) | |
| df = _finalize_time(df) | |
| if "lat" not in df.columns or "lon" not in df.columns: | |
| n = len(df) | |
| df["lat"] = np.random.uniform(21.8, 25.3, size=n) | |
| df["lon"] = np.random.uniform(120.0, 122.0, size=n) | |
| if "pid" not in df.columns: | |
| df["pid"] = np.arange(len(df)) | |
| return df | |
| def load_data(source: str, file: gr.File | None = None, sheet_url: str = "") -> pd.DataFrame: | |
| """依來源載入資料:demo / upload / drive""" | |
| if source == "drive": | |
| if not sheet_url: | |
| raise ValueError("請選擇 Google 連結") | |
| return load_drive_csv(sheet_url) | |
| elif source == "upload": | |
| if file is None: | |
| raise ValueError("請上傳 CSV 檔") | |
| return load_csv(file) | |
| else: | |
| return make_demo_dataframe() | |
| # ----------------------------- | |
| # grafanalib JSON builder | |
| # ----------------------------- | |
| def build_grafanalib_dashboard(series_columns: list[str], dual_axis: bool, rolling_window: int) -> dict: | |
| panels = [] | |
| panels.append( | |
| Graph( | |
| title=f"{series_columns[0]}", | |
| dataSource="(example)", | |
| targets=[Target(expr=f"{series_columns[0]}", legendFormat=series_columns[0])], | |
| lines=True, bars=False, points=False, | |
| yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")), | |
| ) | |
| ) | |
| if len(series_columns) > 1: | |
| targets = [Target(expr=f"{series_columns[1]}", legendFormat=series_columns[1])] | |
| lines, bars, title = False, True, f"{series_columns[1]} (bar)" | |
| if dual_axis: | |
| targets.append(Target(expr=f"{series_columns[0]}", legendFormat=f"{series_columns[0]} (line)")) | |
| lines, bars = True, True | |
| title = f"{series_columns[1]} (bar) + {series_columns[0]} (line)" | |
| panels.append( | |
| Graph( | |
| title=title, | |
| dataSource="(example)", | |
| targets=targets, | |
| lines=lines, bars=bars, points=False, | |
| yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")), | |
| ) | |
| ) | |
| panels.append( | |
| Graph( | |
| title=f"{series_columns[0]} rolling({rolling_window})", | |
| dataSource="(example)", | |
| targets=[Target(expr=f"{series_columns[0]}_rolling{rolling_window}", | |
| legendFormat=f"{series_columns[0]}_rolling{rolling_window}")], | |
| lines=True, bars=False, points=False, | |
| yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")), | |
| ) | |
| ) | |
| return Dashboard( | |
| title="Grafana-like Demo (grafanalib + Gradio)", | |
| rows=[Row(panels=panels)], | |
| timezone="browser", | |
| time=Time("now-1h", "now"), | |
| ).to_json_data() | |
| # ----------------------------- | |
| # Matplotlib helpers | |
| # ----------------------------- | |
| def _style_time_axis(ax): | |
| locator = mdates.AutoDateLocator(minticks=3, maxticks=6) | |
| formatter = mdates.ConciseDateFormatter(locator) | |
| ax.xaxis.set_major_locator(locator) | |
| ax.xaxis.set_major_formatter(formatter) | |
| ax.tick_params(axis="x", labelrotation=20, labelsize=9) | |
| ax.tick_params(axis="y", labelsize=9) | |
| ax.grid(True, which="major", alpha=0.25) | |
| plt.margins(x=0.02, y=0.05) | |
| def _normalize_times(series: pd.Series) -> pd.Series: | |
| s = series.copy() | |
| if getattr(s.dt, "tz", None) is not None: | |
| s = s.dt.tz_convert("UTC").dt.tz_localize(None) | |
| return s | |
| def render_line(df, col): | |
| times = _normalize_times(df["time"]) | |
| fig, ax = plt.subplots(figsize=(6.5, 3.6)) | |
| ax.plot(times, df[col], linewidth=1.6) | |
| ax.set_title(col, fontsize=12, pad=8) | |
| ax.set_xlabel("Time") | |
| ax.set_ylabel(col) | |
| _style_time_axis(ax) | |
| fig.tight_layout() | |
| return fig | |
| def render_bar_or_dual(df, second_col, first_col, dual_axis): | |
| times = _normalize_times(df["time"]) | |
| x = mdates.date2num(times.dt.to_pydatetime().tolist()) | |
| fig, ax = plt.subplots(figsize=(6.5, 3.6)) | |
| width = max(10, (times.astype("int64").diff().median() or 60) / 1e9 * 0.8) / 86400 | |
| ax.bar(x, df[second_col], width=width, align="center", label=second_col) | |
| title = f"{second_col} (bar)" | |
| if dual_axis: | |
| ax2 = ax.twinx() | |
| ax2.plot(times, df[first_col], linewidth=1.6, label=f"{first_col} (line)") | |
| title = f"{second_col} (bar) + {first_col} (line)" | |
| h1, l1 = ax.get_legend_handles_labels() | |
| h2, l2 = ax2.get_legend_handles_labels() | |
| ax.legend(h1 + h2, l1 + l2, loc="upper left") | |
| else: | |
| ax.legend(loc="upper left") | |
| ax.set_title(title, fontsize=12, pad=8) | |
| _style_time_axis(ax) | |
| fig.tight_layout() | |
| return fig | |
| def render_rolling(df, col, window=5): | |
| times = _normalize_times(df["time"]) | |
| roll_col = f"{col}_rolling{window}" | |
| if roll_col not in df.columns: | |
| df[roll_col] = df[col].rolling(window=window, min_periods=1).mean() | |
| fig, ax = plt.subplots(figsize=(6.5, 3.6)) | |
| ax.plot(times, df[roll_col], linewidth=1.6) | |
| ax.set_title(f"{col} rolling({window})", fontsize=12, pad=8) | |
| ax.set_xlabel("Time") | |
| ax.set_ylabel(roll_col) | |
| _style_time_axis(ax) | |
| fig.tight_layout() | |
| return fig, df | |
| # ----------------------------- | |
| # Folium helpers (map + legend) | |
| # ----------------------------- | |
| def _to_hex_color(value: float, cmap=cm.viridis) -> str: | |
| rgba = cmap(value) | |
| return "#{:02x}{:02x}{:02x}".format(int(rgba[0]*255), int(rgba[1]*255), int(rgba[2]*255)) | |
| def render_map_folium( | |
| df: pd.DataFrame, | |
| value_col: str = "amplitude", | |
| size_col: str = "count", | |
| cmap_name: str = "viridis", | |
| tiles: str = "OpenStreetMap", | |
| ) -> str: | |
| center_lat, center_lon = df["lat"].mean(), df["lon"].mean() | |
| m = folium.Map(location=[center_lat, center_lon], zoom_start=7, tiles=tiles) | |
| vmin, vmax = df[value_col].min(), df[value_col].max() | |
| cmap = getattr(cm, cmap_name) | |
| colormap = bcm.LinearColormap( | |
| [_to_hex_color(i, cmap) for i in np.linspace(0, 1, 256)], | |
| vmin=vmin, vmax=vmax | |
| ) | |
| colormap.caption = f"{value_col} (color scale)" | |
| colormap.add_to(m) | |
| for _, row in df.iterrows(): | |
| norm_val = (row[value_col] - vmin) / (vmax - vmin + 1e-9) | |
| popup_html = ( | |
| f"<b>#ID:</b> {int(row['pid'])}<br>" | |
| f"<b>time:</b> {pd.to_datetime(row['time']).strftime('%Y-%m-%d %H:%M:%S')}<br>" | |
| f"<b>{value_col}:</b> {row[value_col]:.4f}<br>" | |
| f"<b>{size_col}:</b> {row[size_col]}<br>" | |
| f"<b>lat/lon:</b> {row['lat']:.5f}, {row['lon']:.5f}" | |
| ) | |
| folium.CircleMarker( | |
| location=[row["lat"], row["lon"]], | |
| radius=row[size_col] + 3, | |
| color="black", | |
| weight=1, | |
| fill=True, | |
| fill_opacity=0.7, | |
| fill_color=_to_hex_color(norm_val, cmap), | |
| popup=folium.Popup(popup_html, max_width=300), | |
| ).add_to(m) | |
| return m._repr_html_() | |
| # ----------------------------- | |
| # Detail helpers | |
| # ----------------------------- | |
| def make_point_choices(df: pd.DataFrame) -> list[str]: | |
| labels = [] | |
| for _, r in df.iterrows(): | |
| t = pd.to_datetime(r["time"]).strftime("%H:%M:%S") | |
| labels.append(f"#{int(r['pid'])} | {t} | amp={r['amplitude']:.3f} cnt={int(r['count'])}") | |
| return labels | |
| def pick_detail(df: pd.DataFrame, choice: str) -> pd.DataFrame: | |
| if not choice: | |
| return pd.DataFrame() | |
| try: | |
| pid_str = choice.split("|")[0].strip().lstrip("#") | |
| pid = int(pid_str) | |
| row = df[df["pid"] == pid] | |
| return row.reset_index(drop=True) | |
| except Exception: | |
| return pd.DataFrame() | |
| # ----------------------------- | |
| # Main pipeline | |
| # ----------------------------- | |
| def pipeline(source, file, sheet_url, series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice): | |
| df = load_data(source, file, sheet_url) | |
| numeric_cols = [c for c in df.columns if c not in ["time", "lat", "lon", "pid"] and pd.api.types.is_numeric_dtype(df[c])] | |
| chosen = [c for c in (series_choice or numeric_cols[:2]) if c in numeric_cols] | |
| if not chosen: | |
| chosen = numeric_cols[:2] | |
| dash_json = build_grafanalib_dashboard(chosen, bool(dual_axis), int(rolling_window)) | |
| dash_json_str = json.dumps(dash_json, ensure_ascii=False, indent=2, default=str) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") as f: | |
| f.write(dash_json_str) | |
| json_path = f.name | |
| fig1 = render_line(df, chosen[0]) | |
| fig2 = render_bar_or_dual(df, chosen[1], chosen[0], bool(dual_axis)) if len(chosen) > 1 else plt.figure() | |
| fig3, df_with_roll = render_rolling(df.copy(), chosen[0], int(rolling_window)) | |
| map_html = render_map_folium(df, value_col=chosen[0], size_col="count", | |
| cmap_name=cmap_choice, tiles=tiles_choice) | |
| point_choices = make_point_choices(df) | |
| default_choice = point_choices[0] if point_choices else "" | |
| detail_df = pick_detail(df, default_choice) | |
| demo_df = make_demo_dataframe() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", encoding="utf-8") as f: | |
| demo_df.to_csv(f, index=False) | |
| demo_csv_path = f.name | |
| return ( | |
| fig1, fig2, fig3, map_html, | |
| dash_json_str, json_path, df_with_roll, | |
| demo_csv_path, | |
| gr.Dropdown(choices=point_choices, value=default_choice), | |
| detail_df, | |
| ) | |
| def regenerate_demo(series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice, current_choice): | |
| return pipeline("demo", None, "", series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice) | |
| def update_detail(df: pd.DataFrame, choice: str): | |
| return pick_detail(df, choice) | |
| # ----------------------------- | |
| # UI(將 Google 來源改成只有下拉選單) | |
| # ----------------------------- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("## Grafana-like Demo + Folium Map(支援 Google Drive / Sheets,下拉選單選擇來源)") | |
| source_radio = gr.Radio(["upload", "drive", "demo"], label="資料來源", value="demo") | |
| file_in = gr.File(label="上傳 CSV(選 upload 時使用)", file_types=[".csv"]) | |
| # 只保留下拉選單,不再顯示可編輯的文字框 | |
| preset_dd = gr.Dropdown( | |
| label="Google 預設來源(3 個連結)", | |
| choices=DRIVE_PRESETS, | |
| value=DRIVE_PRESETS[0] | |
| ) | |
| series_multiselect = gr.CheckboxGroup(label="數值欄位", choices=[]) | |
| dual_axis_chk = gr.Checkbox(label="第二面板啟用雙軸", value=False) | |
| rolling_dd = gr.Dropdown(label="Rolling window", choices=["3", "5", "10", "20"], value="5") | |
| cmap_dd = gr.Dropdown(label="地圖配色 (colormap)", | |
| choices=["viridis", "plasma", "inferno", "magma", "cividis", "coolwarm"], | |
| value="viridis") | |
| tiles_dd = gr.Dropdown(label="地圖底圖 (tiles)", | |
| choices=["OpenStreetMap", "Stamen Terrain", "Stamen Toner", | |
| "CartoDB positron", "CartoDB dark_matter"], | |
| value="OpenStreetMap") | |
| with gr.Row(): | |
| run_btn = gr.Button("產生 Dashboard", scale=1) | |
| regen_btn = gr.Button("🔁 重新產生示範資料", scale=1) | |
| plot1 = gr.Plot(label="1:Line") | |
| plot2 = gr.Plot(label="2:Bar / Dual Axis") | |
| plot3 = gr.Plot(label="3:Rolling Mean") | |
| map_out = gr.HTML(label="4:Geo Map (Interactive + Legend)") | |
| json_box = gr.Code(label="grafanalib Dashboard JSON", language="json") | |
| json_file = gr.File(label="下載 dashboard.json") | |
| demo_csv_file = gr.File(label="下載示範資料 demo.csv") | |
| df_view = gr.Dataframe(label="資料預覽(含 rolling)", wrap=True) | |
| gr.Markdown("### 🔎 點位詳情(對應地圖彈窗中的 #ID)") | |
| point_selector = gr.Dropdown(label="選擇點位(#ID | 時間 | 值)", choices=[], value=None) | |
| detail_view = gr.Dataframe(label="選取點詳細資料", wrap=True) | |
| # 根據來源探勘欄位(drive 時讀取下拉的 URL) | |
| def probe_columns(source, file, preset_url): | |
| sheet_url = preset_url if source == "drive" else "" | |
| df = load_data(source, file, sheet_url) | |
| numeric_cols = [c for c in df.columns if c not in ["time", "lat", "lon", "pid"] and pd.api.types.is_numeric_dtype(df[c])] | |
| return gr.CheckboxGroup(choices=numeric_cols, value=numeric_cols[:2]), df | |
| source_radio.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view]) | |
| file_in.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view]) | |
| preset_dd.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view]) | |
| # 初次載入:預設用第一個 Google 連結 | |
| demo.load( | |
| lambda: pipeline("drive", None, DRIVE_PRESETS[0], [], False, "5", "viridis", "OpenStreetMap"), | |
| inputs=None, | |
| outputs=[ | |
| plot1, plot2, plot3, map_out, | |
| json_box, json_file, df_view, | |
| demo_csv_file, | |
| point_selector, detail_view | |
| ] | |
| ) | |
| # 產生 / 重新產生 | |
| run_btn.click( | |
| pipeline, | |
| inputs=[source_radio, file_in, preset_dd, series_multiselect, dual_axis_chk, rolling_dd, cmap_dd, tiles_dd], | |
| outputs=[ | |
| plot1, plot2, plot3, map_out, | |
| json_box, json_file, df_view, | |
| demo_csv_file, | |
| point_selector, detail_view | |
| ] | |
| ) | |
| regen_btn.click( | |
| regenerate_demo, | |
| inputs=[series_multiselect, dual_axis_chk, rolling_dd, cmap_dd, tiles_dd, point_selector], | |
| outputs=[ | |
| plot1, plot2, plot3, map_out, | |
| json_box, json_file, df_view, | |
| demo_csv_file, | |
| point_selector, detail_view | |
| ] | |
| ) | |
| point_selector.change( | |
| update_detail, | |
| inputs=[df_view, point_selector], | |
| outputs=[detail_view] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |