import pandas as pd import glob import os import sys import numpy as np from sklearn.neighbors import BallTree from datetime import datetime EARTH_RADIUS_M = 6371000 PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.append(PROJECT_ROOT) from roadmap.utils import add_weather_to_df def compute_sensor_id(df: pd.DataFrame, lat_col: str = "Latitude", lon_col: str = "Longitude", decimals: int = 6, out_col: str = "sensor_id") -> pd.DataFrame: df[out_col] = ( df[lat_col].round(decimals).astype(str) + ";" + df[lon_col].round(decimals).astype(str) ) return df def prepare_data_df(df_data: pd.DataFrame, coordinate: pd.DataFrame, date: str): """" first remove points with no observations, add date to the table and weather """ #df_data.drop(df_data[df_data["% Observed"] < 50].index, inplace=True) df_data["Time"] = pd.to_datetime(date + " " + df_data["Time"].astype(str),format="%Y-%m-%d %H:%M") df_data = add_coordinate(coordinate, df_data) df_data = compute_sensor_id(df_data) df_data["Time_hour"] = df_data["Time"].dt.round("h") try: df_data = enrich_weather_hourly(df_data) #TODO find better way to do this except Exception as e: print(f"Error enriching weather: {e}") df_data["weather"] = None df_data["Day"] = df_data["Time"].dt.dayofweek return df_data def add_coordinate(df_coord: pd.DataFrame, df_data: pd.DataFrame): df_coord = df_coord.sort_values(by="Abs PM").reset_index(drop=True) df_data = df_data.sort_values(by="Postmile (Abs)").reset_index(drop=True) coord_abs_pm = df_coord["Abs PM"].values coord_lat = df_coord["Latitude"].values coord_lon = df_coord["Longitude"].values def find_closest_index(target): return np.abs(coord_abs_pm - target).argmin() closest_indices = df_data["Postmile (Abs)"].apply(find_closest_index) df_data["Latitude"] = closest_indices.apply(lambda idx: coord_lat[idx]) # type: ignore df_data["Longitude"] = closest_indices.apply(lambda idx: coord_lon[idx]) # type: ignore return df_data def enrich_weather_hourly(full_df: pd.DataFrame) -> pd.DataFrame: pieces = [] for t_hour, chunk in full_df.groupby("Time_hour", sort=False): enriched = add_weather_to_df(chunk.copy(),time=t_hour.to_pydatetime()) # type: ignore pieces.append(enriched) return pd.concat(pieces, ignore_index=True).sort_values("Time") def build_sensor_index(data_df: pd.DataFrame) -> pd.DataFrame: sensors = ( data_df .drop_duplicates(subset=["Latitude", "Longitude"]) .loc[:, ["Latitude", "Longitude"]] .copy() .reset_index(drop=True) ) sensors = compute_sensor_id(sensors) # adds 'sensor_id' as "lat;lon" # keep only what we need; you can also keep an integer 'sensor_idx' if you like return sensors[["sensor_id", "Latitude", "Longitude"]] def build_enriched_time_series(data_df: pd.DataFrame, sensor_map: pd.DataFrame, sensors: pd.DataFrame) -> pd.DataFrame: # Ensure both sides have the same sensor_id key data_df = compute_sensor_id(data_df) # from its own lat/lon enriched = ( sensor_map[["sensor_id", "Latitude", "Longitude", "lanes", "maxspeed", "ref", "direction"]] .merge( data_df[["sensor_id", "Time", "AggSpeed", "% Observed", "weather"]], on="sensor_id", how="left" ) .sort_values(["sensor_id", "Time"]) .reset_index(drop=True) ) return enriched def normalize_lanes(value): if isinstance(value, list): try: return min(int(x) for x in value) except ValueError: return None try: return int(value) except ValueError: return None def map_pms_to_sensors(network_df: pd.DataFrame, sensors: pd.DataFrame, max_distance_m: float | None = None) -> pd.DataFrame: net = network_df.dropna(subset=["Latitude", "Longitude"]).copy() sensor_rad = np.radians(sensors[["Latitude", "Longitude"]].to_numpy()) net_rad = np.radians(net[["Latitude", "Longitude"]].to_numpy()) tree = BallTree(sensor_rad, metric="haversine") dist_rad, idx = tree.query(net_rad,k=1) dist_m = dist_rad[:, 0] * EARTH_RADIUS_M matched = net.copy() matched["sensor_id"] = sensors.iloc[idx[:, 0]].sensor_id.values matched["matched_sensor_lat"] = sensors.iloc[idx[:, 0]].Latitude.values matched["matched_sensor_lon"] = sensors.iloc[idx[:, 0]].Longitude.values matched["distance_m"] = dist_m if max_distance_m is not None: matched = matched[matched["distance_m"] <= max_distance_m].copy() return matched