Grafana_like / app.py
cwadayi's picture
Update app.py
da7c345 verified
# app.py
import json
import re
import tempfile
from datetime import datetime, timedelta
from dateutil import tz
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.dates as mdates
import folium
from matplotlib import cm
import branca.colormap as bcm
from grafanalib.core import (
Dashboard, Graph, Row, Target, YAxis, YAxes, Time
)
TAIPEI = tz.gettz("Asia/Taipei")
# -----------------------------
# Google Drive 連結處理
# -----------------------------
DRIVE_PRESETS = [
"https://drive.google.com/file/d/15yZ4QicICKZCnX6vjcD9JNXjnJmMFJD4/view?usp=drivesdk",
"https://drive.google.com/file/d/1dqazYh_YzNNMbkUpgLRKSE9Y3ioPhtFu/view?usp=drivesdk",
"https://drive.google.com/file/d/1A23f4q8DXHpoRIN5UQsDd6eM8jJ_Ruf8/view?usp=drivesdk",
]
def normalize_drive_url(url: str) -> str:
"""
接受 Google Drive / Google Sheets 各式分享連結,回傳可直接給 pandas 讀取 CSV 的 URL。
- Sheets: .../spreadsheets/d/<ID>/edit → .../export?format=csv
- Drive File: .../file/d/<ID>/view → https://drive.google.com/uc?export=download&id=<ID>
"""
if not isinstance(url, str) or not url.strip():
raise ValueError("請提供有效的 Google 連結")
url = url.strip()
# Sheets
m = re.search(r"https://docs\.google\.com/spreadsheets/d/([a-zA-Z0-9-_]+)", url)
if m:
sheet_id = m.group(1)
return f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv"
# Drive file
m = re.search(r"https://drive\.google\.com/file/d/([a-zA-Z0-9-_]+)/", url)
if m:
file_id = m.group(1)
return f"https://drive.google.com/uc?export=download&id={file_id}"
return url
# -----------------------------
# Demo / Data loading
# -----------------------------
def make_demo_dataframe() -> pd.DataFrame:
"""隨機示範資料:含經緯度 + pid"""
t0 = datetime.now(tz=TAIPEI) - timedelta(minutes=60)
times = [t0 + timedelta(minutes=i) for i in range(61)]
amp = np.random.rand(len(times))
cnt = np.random.randint(0, 11, size=len(times))
lats = np.random.uniform(21.8, 25.3, size=len(times))
lons = np.random.uniform(120.0, 122.0, size=len(times))
df = pd.DataFrame({
"time": times,
"amplitude": amp,
"count": cnt,
"lat": lats,
"lon": lons
})
df["pid"] = np.arange(len(df))
return df
def _finalize_time(df: pd.DataFrame) -> pd.DataFrame:
"""確保 time 欄位有時區、排序"""
time_col = next((c for c in ["time", "timestamp", "datetime", "date"] if c in df.columns), None)
if time_col is None:
raise ValueError("資料需包含時間欄位(time/timestamp/datetime/date 其一)")
df[time_col] = pd.to_datetime(df[time_col])
df = df.rename(columns={time_col: "time"})
if getattr(df["time"].dt, "tz", None) is None:
df["time"] = df["time"].dt.tz_localize(TAIPEI)
else:
df["time"] = df["time"].dt.tz_convert(TAIPEI)
return df.sort_values("time").reset_index(drop=True)
def load_csv(file: gr.File | None) -> pd.DataFrame:
"""讀上傳 CSV"""
df = pd.read_csv(file.name)
df = _finalize_time(df)
# 若無 lat/lon,補隨機(避免地圖空白)
if "lat" not in df.columns or "lon" not in df.columns:
n = len(df)
df["lat"] = np.random.uniform(21.8, 25.3, size=n)
df["lon"] = np.random.uniform(120.0, 122.0, size=n)
if "pid" not in df.columns:
df["pid"] = np.arange(len(df))
return df
def load_drive_csv(sheet_or_file_url: str) -> pd.DataFrame:
"""從 Google Sheets 或 Google Drive File 讀 CSV"""
url = normalize_drive_url(sheet_or_file_url)
df = pd.read_csv(url)
df = _finalize_time(df)
if "lat" not in df.columns or "lon" not in df.columns:
n = len(df)
df["lat"] = np.random.uniform(21.8, 25.3, size=n)
df["lon"] = np.random.uniform(120.0, 122.0, size=n)
if "pid" not in df.columns:
df["pid"] = np.arange(len(df))
return df
def load_data(source: str, file: gr.File | None = None, sheet_url: str = "") -> pd.DataFrame:
"""依來源載入資料:demo / upload / drive"""
if source == "drive":
if not sheet_url:
raise ValueError("請選擇 Google 連結")
return load_drive_csv(sheet_url)
elif source == "upload":
if file is None:
raise ValueError("請上傳 CSV 檔")
return load_csv(file)
else:
return make_demo_dataframe()
# -----------------------------
# grafanalib JSON builder
# -----------------------------
def build_grafanalib_dashboard(series_columns: list[str], dual_axis: bool, rolling_window: int) -> dict:
panels = []
panels.append(
Graph(
title=f"{series_columns[0]}",
dataSource="(example)",
targets=[Target(expr=f"{series_columns[0]}", legendFormat=series_columns[0])],
lines=True, bars=False, points=False,
yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")),
)
)
if len(series_columns) > 1:
targets = [Target(expr=f"{series_columns[1]}", legendFormat=series_columns[1])]
lines, bars, title = False, True, f"{series_columns[1]} (bar)"
if dual_axis:
targets.append(Target(expr=f"{series_columns[0]}", legendFormat=f"{series_columns[0]} (line)"))
lines, bars = True, True
title = f"{series_columns[1]} (bar) + {series_columns[0]} (line)"
panels.append(
Graph(
title=title,
dataSource="(example)",
targets=targets,
lines=lines, bars=bars, points=False,
yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")),
)
)
panels.append(
Graph(
title=f"{series_columns[0]} rolling({rolling_window})",
dataSource="(example)",
targets=[Target(expr=f"{series_columns[0]}_rolling{rolling_window}",
legendFormat=f"{series_columns[0]}_rolling{rolling_window}")],
lines=True, bars=False, points=False,
yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")),
)
)
return Dashboard(
title="Grafana-like Demo (grafanalib + Gradio)",
rows=[Row(panels=panels)],
timezone="browser",
time=Time("now-1h", "now"),
).to_json_data()
# -----------------------------
# Matplotlib helpers
# -----------------------------
def _style_time_axis(ax):
locator = mdates.AutoDateLocator(minticks=3, maxticks=6)
formatter = mdates.ConciseDateFormatter(locator)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
ax.tick_params(axis="x", labelrotation=20, labelsize=9)
ax.tick_params(axis="y", labelsize=9)
ax.grid(True, which="major", alpha=0.25)
plt.margins(x=0.02, y=0.05)
def _normalize_times(series: pd.Series) -> pd.Series:
s = series.copy()
if getattr(s.dt, "tz", None) is not None:
s = s.dt.tz_convert("UTC").dt.tz_localize(None)
return s
def render_line(df, col):
times = _normalize_times(df["time"])
fig, ax = plt.subplots(figsize=(6.5, 3.6))
ax.plot(times, df[col], linewidth=1.6)
ax.set_title(col, fontsize=12, pad=8)
ax.set_xlabel("Time")
ax.set_ylabel(col)
_style_time_axis(ax)
fig.tight_layout()
return fig
def render_bar_or_dual(df, second_col, first_col, dual_axis):
times = _normalize_times(df["time"])
x = mdates.date2num(times.dt.to_pydatetime().tolist())
fig, ax = plt.subplots(figsize=(6.5, 3.6))
width = max(10, (times.astype("int64").diff().median() or 60) / 1e9 * 0.8) / 86400
ax.bar(x, df[second_col], width=width, align="center", label=second_col)
title = f"{second_col} (bar)"
if dual_axis:
ax2 = ax.twinx()
ax2.plot(times, df[first_col], linewidth=1.6, label=f"{first_col} (line)")
title = f"{second_col} (bar) + {first_col} (line)"
h1, l1 = ax.get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
ax.legend(h1 + h2, l1 + l2, loc="upper left")
else:
ax.legend(loc="upper left")
ax.set_title(title, fontsize=12, pad=8)
_style_time_axis(ax)
fig.tight_layout()
return fig
def render_rolling(df, col, window=5):
times = _normalize_times(df["time"])
roll_col = f"{col}_rolling{window}"
if roll_col not in df.columns:
df[roll_col] = df[col].rolling(window=window, min_periods=1).mean()
fig, ax = plt.subplots(figsize=(6.5, 3.6))
ax.plot(times, df[roll_col], linewidth=1.6)
ax.set_title(f"{col} rolling({window})", fontsize=12, pad=8)
ax.set_xlabel("Time")
ax.set_ylabel(roll_col)
_style_time_axis(ax)
fig.tight_layout()
return fig, df
# -----------------------------
# Folium helpers (map + legend)
# -----------------------------
def _to_hex_color(value: float, cmap=cm.viridis) -> str:
rgba = cmap(value)
return "#{:02x}{:02x}{:02x}".format(int(rgba[0]*255), int(rgba[1]*255), int(rgba[2]*255))
def render_map_folium(
df: pd.DataFrame,
value_col: str = "amplitude",
size_col: str = "count",
cmap_name: str = "viridis",
tiles: str = "OpenStreetMap",
) -> str:
center_lat, center_lon = df["lat"].mean(), df["lon"].mean()
m = folium.Map(location=[center_lat, center_lon], zoom_start=7, tiles=tiles)
vmin, vmax = df[value_col].min(), df[value_col].max()
cmap = getattr(cm, cmap_name)
colormap = bcm.LinearColormap(
[_to_hex_color(i, cmap) for i in np.linspace(0, 1, 256)],
vmin=vmin, vmax=vmax
)
colormap.caption = f"{value_col} (color scale)"
colormap.add_to(m)
for _, row in df.iterrows():
norm_val = (row[value_col] - vmin) / (vmax - vmin + 1e-9)
popup_html = (
f"<b>#ID:</b> {int(row['pid'])}<br>"
f"<b>time:</b> {pd.to_datetime(row['time']).strftime('%Y-%m-%d %H:%M:%S')}<br>"
f"<b>{value_col}:</b> {row[value_col]:.4f}<br>"
f"<b>{size_col}:</b> {row[size_col]}<br>"
f"<b>lat/lon:</b> {row['lat']:.5f}, {row['lon']:.5f}"
)
folium.CircleMarker(
location=[row["lat"], row["lon"]],
radius=row[size_col] + 3,
color="black",
weight=1,
fill=True,
fill_opacity=0.7,
fill_color=_to_hex_color(norm_val, cmap),
popup=folium.Popup(popup_html, max_width=300),
).add_to(m)
return m._repr_html_()
# -----------------------------
# Detail helpers
# -----------------------------
def make_point_choices(df: pd.DataFrame) -> list[str]:
labels = []
for _, r in df.iterrows():
t = pd.to_datetime(r["time"]).strftime("%H:%M:%S")
labels.append(f"#{int(r['pid'])} | {t} | amp={r['amplitude']:.3f} cnt={int(r['count'])}")
return labels
def pick_detail(df: pd.DataFrame, choice: str) -> pd.DataFrame:
if not choice:
return pd.DataFrame()
try:
pid_str = choice.split("|")[0].strip().lstrip("#")
pid = int(pid_str)
row = df[df["pid"] == pid]
return row.reset_index(drop=True)
except Exception:
return pd.DataFrame()
# -----------------------------
# Main pipeline
# -----------------------------
def pipeline(source, file, sheet_url, series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice):
df = load_data(source, file, sheet_url)
numeric_cols = [c for c in df.columns if c not in ["time", "lat", "lon", "pid"] and pd.api.types.is_numeric_dtype(df[c])]
chosen = [c for c in (series_choice or numeric_cols[:2]) if c in numeric_cols]
if not chosen:
chosen = numeric_cols[:2]
dash_json = build_grafanalib_dashboard(chosen, bool(dual_axis), int(rolling_window))
dash_json_str = json.dumps(dash_json, ensure_ascii=False, indent=2, default=str)
with tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") as f:
f.write(dash_json_str)
json_path = f.name
fig1 = render_line(df, chosen[0])
fig2 = render_bar_or_dual(df, chosen[1], chosen[0], bool(dual_axis)) if len(chosen) > 1 else plt.figure()
fig3, df_with_roll = render_rolling(df.copy(), chosen[0], int(rolling_window))
map_html = render_map_folium(df, value_col=chosen[0], size_col="count",
cmap_name=cmap_choice, tiles=tiles_choice)
point_choices = make_point_choices(df)
default_choice = point_choices[0] if point_choices else ""
detail_df = pick_detail(df, default_choice)
demo_df = make_demo_dataframe()
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", encoding="utf-8") as f:
demo_df.to_csv(f, index=False)
demo_csv_path = f.name
return (
fig1, fig2, fig3, map_html,
dash_json_str, json_path, df_with_roll,
demo_csv_path,
gr.Dropdown(choices=point_choices, value=default_choice),
detail_df,
)
def regenerate_demo(series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice, current_choice):
return pipeline("demo", None, "", series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice)
def update_detail(df: pd.DataFrame, choice: str):
return pick_detail(df, choice)
# -----------------------------
# UI(將 Google 來源改成只有下拉選單)
# -----------------------------
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("## Grafana-like Demo + Folium Map(支援 Google Drive / Sheets,下拉選單選擇來源)")
source_radio = gr.Radio(["upload", "drive", "demo"], label="資料來源", value="demo")
file_in = gr.File(label="上傳 CSV(選 upload 時使用)", file_types=[".csv"])
# 只保留下拉選單,不再顯示可編輯的文字框
preset_dd = gr.Dropdown(
label="Google 預設來源(3 個連結)",
choices=DRIVE_PRESETS,
value=DRIVE_PRESETS[0]
)
series_multiselect = gr.CheckboxGroup(label="數值欄位", choices=[])
dual_axis_chk = gr.Checkbox(label="第二面板啟用雙軸", value=False)
rolling_dd = gr.Dropdown(label="Rolling window", choices=["3", "5", "10", "20"], value="5")
cmap_dd = gr.Dropdown(label="地圖配色 (colormap)",
choices=["viridis", "plasma", "inferno", "magma", "cividis", "coolwarm"],
value="viridis")
tiles_dd = gr.Dropdown(label="地圖底圖 (tiles)",
choices=["OpenStreetMap", "Stamen Terrain", "Stamen Toner",
"CartoDB positron", "CartoDB dark_matter"],
value="OpenStreetMap")
with gr.Row():
run_btn = gr.Button("產生 Dashboard", scale=1)
regen_btn = gr.Button("🔁 重新產生示範資料", scale=1)
plot1 = gr.Plot(label="1:Line")
plot2 = gr.Plot(label="2:Bar / Dual Axis")
plot3 = gr.Plot(label="3:Rolling Mean")
map_out = gr.HTML(label="4:Geo Map (Interactive + Legend)")
json_box = gr.Code(label="grafanalib Dashboard JSON", language="json")
json_file = gr.File(label="下載 dashboard.json")
demo_csv_file = gr.File(label="下載示範資料 demo.csv")
df_view = gr.Dataframe(label="資料預覽(含 rolling)", wrap=True)
gr.Markdown("### 🔎 點位詳情(對應地圖彈窗中的 #ID)")
point_selector = gr.Dropdown(label="選擇點位(#ID | 時間 | 值)", choices=[], value=None)
detail_view = gr.Dataframe(label="選取點詳細資料", wrap=True)
# 根據來源探勘欄位(drive 時讀取下拉的 URL)
def probe_columns(source, file, preset_url):
sheet_url = preset_url if source == "drive" else ""
df = load_data(source, file, sheet_url)
numeric_cols = [c for c in df.columns if c not in ["time", "lat", "lon", "pid"] and pd.api.types.is_numeric_dtype(df[c])]
return gr.CheckboxGroup(choices=numeric_cols, value=numeric_cols[:2]), df
source_radio.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view])
file_in.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view])
preset_dd.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view])
# 初次載入:預設用第一個 Google 連結
demo.load(
lambda: pipeline("drive", None, DRIVE_PRESETS[0], [], False, "5", "viridis", "OpenStreetMap"),
inputs=None,
outputs=[
plot1, plot2, plot3, map_out,
json_box, json_file, df_view,
demo_csv_file,
point_selector, detail_view
]
)
# 產生 / 重新產生
run_btn.click(
pipeline,
inputs=[source_radio, file_in, preset_dd, series_multiselect, dual_axis_chk, rolling_dd, cmap_dd, tiles_dd],
outputs=[
plot1, plot2, plot3, map_out,
json_box, json_file, df_view,
demo_csv_file,
point_selector, detail_view
]
)
regen_btn.click(
regenerate_demo,
inputs=[series_multiselect, dual_axis_chk, rolling_dd, cmap_dd, tiles_dd, point_selector],
outputs=[
plot1, plot2, plot3, map_out,
json_box, json_file, df_view,
demo_csv_file,
point_selector, detail_view
]
)
point_selector.change(
update_detail,
inputs=[df_view, point_selector],
outputs=[detail_view]
)
if __name__ == "__main__":
demo.launch()