|
|
import pandas as pd |
|
|
import glob |
|
|
import os |
|
|
import sys |
|
|
import numpy as np |
|
|
from sklearn.neighbors import BallTree |
|
|
from datetime import datetime |
|
|
|
|
|
EARTH_RADIUS_M = 6371000 |
|
|
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) |
|
|
sys.path.append(PROJECT_ROOT) |
|
|
|
|
|
from roadmap.utils import add_weather_to_df |
|
|
|
|
|
def compute_sensor_id(df: pd.DataFrame, |
|
|
lat_col: str = "Latitude", |
|
|
lon_col: str = "Longitude", |
|
|
decimals: int = 6, |
|
|
out_col: str = "sensor_id") -> pd.DataFrame: |
|
|
df[out_col] = ( |
|
|
df[lat_col].round(decimals).astype(str) |
|
|
+ ";" + |
|
|
df[lon_col].round(decimals).astype(str) |
|
|
) |
|
|
return df |
|
|
|
|
|
def prepare_data_df(df_data: pd.DataFrame, coordinate: pd.DataFrame, date: str): |
|
|
"""" |
|
|
first remove points with no observations, add date to the table and weather |
|
|
""" |
|
|
|
|
|
df_data["Time"] = pd.to_datetime(date + " " + df_data["Time"].astype(str),format="%Y-%m-%d %H:%M") |
|
|
|
|
|
df_data = add_coordinate(coordinate, df_data) |
|
|
df_data = compute_sensor_id(df_data) |
|
|
|
|
|
df_data["Time_hour"] = df_data["Time"].dt.round("h") |
|
|
|
|
|
try: |
|
|
df_data = enrich_weather_hourly(df_data) |
|
|
except Exception as e: |
|
|
print(f"Error enriching weather: {e}") |
|
|
df_data["weather"] = None |
|
|
|
|
|
df_data["Day"] = df_data["Time"].dt.dayofweek |
|
|
|
|
|
return df_data |
|
|
|
|
|
def add_coordinate(df_coord: pd.DataFrame, df_data: pd.DataFrame): |
|
|
df_coord = df_coord.sort_values(by="Abs PM").reset_index(drop=True) |
|
|
df_data = df_data.sort_values(by="Postmile (Abs)").reset_index(drop=True) |
|
|
|
|
|
coord_abs_pm = df_coord["Abs PM"].values |
|
|
coord_lat = df_coord["Latitude"].values |
|
|
coord_lon = df_coord["Longitude"].values |
|
|
|
|
|
def find_closest_index(target): |
|
|
return np.abs(coord_abs_pm - target).argmin() |
|
|
|
|
|
closest_indices = df_data["Postmile (Abs)"].apply(find_closest_index) |
|
|
|
|
|
df_data["Latitude"] = closest_indices.apply(lambda idx: coord_lat[idx]) |
|
|
df_data["Longitude"] = closest_indices.apply(lambda idx: coord_lon[idx]) |
|
|
|
|
|
return df_data |
|
|
|
|
|
def enrich_weather_hourly(full_df: pd.DataFrame) -> pd.DataFrame: |
|
|
pieces = [] |
|
|
for t_hour, chunk in full_df.groupby("Time_hour", sort=False): |
|
|
enriched = add_weather_to_df(chunk.copy(),time=t_hour.to_pydatetime()) |
|
|
pieces.append(enriched) |
|
|
|
|
|
return pd.concat(pieces, ignore_index=True).sort_values("Time") |
|
|
|
|
|
def build_sensor_index(data_df: pd.DataFrame) -> pd.DataFrame: |
|
|
sensors = ( |
|
|
data_df |
|
|
.drop_duplicates(subset=["Latitude", "Longitude"]) |
|
|
.loc[:, ["Latitude", "Longitude"]] |
|
|
.copy() |
|
|
.reset_index(drop=True) |
|
|
) |
|
|
sensors = compute_sensor_id(sensors) |
|
|
|
|
|
return sensors[["sensor_id", "Latitude", "Longitude"]] |
|
|
|
|
|
|
|
|
def build_enriched_time_series(data_df: pd.DataFrame, |
|
|
sensor_map: pd.DataFrame, |
|
|
sensors: pd.DataFrame) -> pd.DataFrame: |
|
|
|
|
|
data_df = compute_sensor_id(data_df) |
|
|
|
|
|
enriched = ( |
|
|
sensor_map[["sensor_id", "Latitude", "Longitude", "lanes", "maxspeed", "ref", "direction"]] |
|
|
.merge( |
|
|
data_df[["sensor_id", "Time", "AggSpeed", "% Observed", "weather"]], |
|
|
on="sensor_id", |
|
|
how="left" |
|
|
) |
|
|
.sort_values(["sensor_id", "Time"]) |
|
|
.reset_index(drop=True) |
|
|
) |
|
|
return enriched |
|
|
|
|
|
|
|
|
def normalize_lanes(value): |
|
|
if isinstance(value, list): |
|
|
try: |
|
|
return min(int(x) for x in value) |
|
|
except ValueError: |
|
|
return None |
|
|
try: |
|
|
return int(value) |
|
|
except ValueError: |
|
|
return None |
|
|
|
|
|
|
|
|
def map_pms_to_sensors(network_df: pd.DataFrame, sensors: pd.DataFrame, max_distance_m: float | None = None) -> pd.DataFrame: |
|
|
net = network_df.dropna(subset=["Latitude", "Longitude"]).copy() |
|
|
|
|
|
sensor_rad = np.radians(sensors[["Latitude", "Longitude"]].to_numpy()) |
|
|
net_rad = np.radians(net[["Latitude", "Longitude"]].to_numpy()) |
|
|
|
|
|
tree = BallTree(sensor_rad, metric="haversine") |
|
|
dist_rad, idx = tree.query(net_rad,k=1) |
|
|
dist_m = dist_rad[:, 0] * EARTH_RADIUS_M |
|
|
|
|
|
matched = net.copy() |
|
|
matched["sensor_id"] = sensors.iloc[idx[:, 0]].sensor_id.values |
|
|
matched["matched_sensor_lat"] = sensors.iloc[idx[:, 0]].Latitude.values |
|
|
matched["matched_sensor_lon"] = sensors.iloc[idx[:, 0]].Longitude.values |
|
|
matched["distance_m"] = dist_m |
|
|
|
|
|
if max_distance_m is not None: |
|
|
matched = matched[matched["distance_m"] <= max_distance_m].copy() |
|
|
|
|
|
return matched |