TrafCast / data_process /process.py
amitom's picture
Minimal app for HF Space
73e9c25
import pandas as pd
import glob
import os
import sys
import numpy as np
from sklearn.neighbors import BallTree
from datetime import datetime
EARTH_RADIUS_M = 6371000
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(PROJECT_ROOT)
from roadmap.utils import add_weather_to_df
def compute_sensor_id(df: pd.DataFrame,
lat_col: str = "Latitude",
lon_col: str = "Longitude",
decimals: int = 6,
out_col: str = "sensor_id") -> pd.DataFrame:
df[out_col] = (
df[lat_col].round(decimals).astype(str)
+ ";" +
df[lon_col].round(decimals).astype(str)
)
return df
def prepare_data_df(df_data: pd.DataFrame, coordinate: pd.DataFrame, date: str):
""""
first remove points with no observations, add date to the table and weather
"""
#df_data.drop(df_data[df_data["% Observed"] < 50].index, inplace=True)
df_data["Time"] = pd.to_datetime(date + " " + df_data["Time"].astype(str),format="%Y-%m-%d %H:%M")
df_data = add_coordinate(coordinate, df_data)
df_data = compute_sensor_id(df_data)
df_data["Time_hour"] = df_data["Time"].dt.round("h")
try:
df_data = enrich_weather_hourly(df_data) #TODO find better way to do this
except Exception as e:
print(f"Error enriching weather: {e}")
df_data["weather"] = None
df_data["Day"] = df_data["Time"].dt.dayofweek
return df_data
def add_coordinate(df_coord: pd.DataFrame, df_data: pd.DataFrame):
df_coord = df_coord.sort_values(by="Abs PM").reset_index(drop=True)
df_data = df_data.sort_values(by="Postmile (Abs)").reset_index(drop=True)
coord_abs_pm = df_coord["Abs PM"].values
coord_lat = df_coord["Latitude"].values
coord_lon = df_coord["Longitude"].values
def find_closest_index(target):
return np.abs(coord_abs_pm - target).argmin()
closest_indices = df_data["Postmile (Abs)"].apply(find_closest_index)
df_data["Latitude"] = closest_indices.apply(lambda idx: coord_lat[idx]) # type: ignore
df_data["Longitude"] = closest_indices.apply(lambda idx: coord_lon[idx]) # type: ignore
return df_data
def enrich_weather_hourly(full_df: pd.DataFrame) -> pd.DataFrame:
pieces = []
for t_hour, chunk in full_df.groupby("Time_hour", sort=False):
enriched = add_weather_to_df(chunk.copy(),time=t_hour.to_pydatetime()) # type: ignore
pieces.append(enriched)
return pd.concat(pieces, ignore_index=True).sort_values("Time")
def build_sensor_index(data_df: pd.DataFrame) -> pd.DataFrame:
sensors = (
data_df
.drop_duplicates(subset=["Latitude", "Longitude"])
.loc[:, ["Latitude", "Longitude"]]
.copy()
.reset_index(drop=True)
)
sensors = compute_sensor_id(sensors) # adds 'sensor_id' as "lat;lon"
# keep only what we need; you can also keep an integer 'sensor_idx' if you like
return sensors[["sensor_id", "Latitude", "Longitude"]]
def build_enriched_time_series(data_df: pd.DataFrame,
sensor_map: pd.DataFrame,
sensors: pd.DataFrame) -> pd.DataFrame:
# Ensure both sides have the same sensor_id key
data_df = compute_sensor_id(data_df) # from its own lat/lon
enriched = (
sensor_map[["sensor_id", "Latitude", "Longitude", "lanes", "maxspeed", "ref", "direction"]]
.merge(
data_df[["sensor_id", "Time", "AggSpeed", "% Observed", "weather"]],
on="sensor_id",
how="left"
)
.sort_values(["sensor_id", "Time"])
.reset_index(drop=True)
)
return enriched
def normalize_lanes(value):
if isinstance(value, list):
try:
return min(int(x) for x in value)
except ValueError:
return None
try:
return int(value)
except ValueError:
return None
def map_pms_to_sensors(network_df: pd.DataFrame, sensors: pd.DataFrame, max_distance_m: float | None = None) -> pd.DataFrame:
net = network_df.dropna(subset=["Latitude", "Longitude"]).copy()
sensor_rad = np.radians(sensors[["Latitude", "Longitude"]].to_numpy())
net_rad = np.radians(net[["Latitude", "Longitude"]].to_numpy())
tree = BallTree(sensor_rad, metric="haversine")
dist_rad, idx = tree.query(net_rad,k=1)
dist_m = dist_rad[:, 0] * EARTH_RADIUS_M
matched = net.copy()
matched["sensor_id"] = sensors.iloc[idx[:, 0]].sensor_id.values
matched["matched_sensor_lat"] = sensors.iloc[idx[:, 0]].Latitude.values
matched["matched_sensor_lon"] = sensors.iloc[idx[:, 0]].Longitude.values
matched["distance_m"] = dist_m
if max_distance_m is not None:
matched = matched[matched["distance_m"] <= max_distance_m].copy()
return matched