Spaces:
Sleeping
Sleeping
| import json | |
| import joblib | |
| import pandas as pd | |
| from sklearn.preprocessing import StandardScaler | |
| import numpy as np | |
| class VAVPipeline: | |
| """ | |
| A class representing a Variable Air Volume (VAV) pipeline. | |
| Attributes: | |
| rtu_id (int): The ID of the RTU (Roof Top Unit). | |
| scaler_path (str): The path to the scaler file. | |
| window_size (int): The size of the sliding window. | |
| Methods: | |
| get_scaler(scaler_path): Loads the scaler from the given path. | |
| get_window(df): Returns the sliding window of the given dataframe. | |
| transform_window(df_window): Transforms the values of the dataframe using the scaler. | |
| prepare_input(df_trans): Prepares the input for the model. | |
| get_input_output(df): Extracts the input and output column names from the dataframe. | |
| extract_data_from_message(message): Extracts data from the message payload and returns a dataframe. | |
| fit(message): Fits the model with the extracted data and returns the prepared input and transformed data. | |
| """ | |
| def __init__(self, rtu_id, scaler_path=None, window_size=30): | |
| """ | |
| Initializes a VAVPipeline object. | |
| Args: | |
| rtu_id (int): The ID of the RTU (Roof Top Unit). | |
| scaler_path (str, optional): The path to the scaler file. Defaults to None. | |
| window_size (int, optional): The size of the sliding window. Defaults to 30. | |
| """ | |
| self.get_cols = True | |
| self.window_size = window_size | |
| self.rtu_id = rtu_id | |
| if rtu_id == 1: | |
| self.zones = [69, 68, 67, 66, 65, 64, 42, 41, 40, 39, 38, 37, 36] | |
| if rtu_id == 2: | |
| self.zones = [ | |
| 72, | |
| 71, | |
| 63, | |
| 62, | |
| 60, | |
| 59, | |
| 58, | |
| 57, | |
| 50, | |
| 49, | |
| 44, | |
| 43, | |
| 35, | |
| 34, | |
| 33, | |
| 32, | |
| 31, | |
| 30, | |
| 29, | |
| 28, | |
| ] | |
| if rtu_id == 3: | |
| self.zones = [61, 56, 55, 48, 45, 26, 25, 18] | |
| if rtu_id == 4: | |
| self.zones = [16, 17, 21, 23, 24, 46, 47, 51, 52, 53, 54] | |
| self.output_col_names = [] | |
| self.input_col_names = [ | |
| f"rtu_00{rtu_id}_fltrd_sa_flow_tn", | |
| f"rtu_00{rtu_id}_sa_temp", | |
| "air_temp_set_1", | |
| "air_temp_set_2", | |
| "dew_point_temperature_set_1d", | |
| "relative_humidity_set_1", | |
| "solar_radiation_set_1", | |
| ] | |
| self.column_names = self.output_col_names + self.input_col_names | |
| self.num_inputs = len(self.input_col_names) | |
| self.num_outputs = len(self.output_col_names) | |
| if scaler_path: | |
| self.scaler = self.get_scaler(scaler_path) | |
| def get_scaler(self, scaler_path): | |
| """ | |
| Loads the scaler from the given path. | |
| Args: | |
| scaler_path (str): The path to the scaler file. | |
| Returns: | |
| StandardScaler: The loaded scaler object. | |
| """ | |
| return joblib.load(scaler_path) | |
| def get_window(self, df): | |
| """ | |
| Returns the sliding window of the given dataframe. | |
| Args: | |
| df (pd.DataFrame): The dataframe. | |
| Returns: | |
| pd.DataFrame: The sliding window dataframe. | |
| """ | |
| len_df = len(df) | |
| if len_df > self.window_size: | |
| return df[len_df - (self.window_size + 1) : len_df].astype("float32") | |
| else: | |
| return None | |
| def transform_window(self, df_window): | |
| """ | |
| Transforms the values of the dataframe using the scaler. | |
| Args: | |
| df_window (pd.DataFrame): The dataframe. | |
| Returns: | |
| np.ndarray: The transformed values. | |
| """ | |
| return self.scaler.transform(df_window.values) | |
| def prepare_input(self, df_trans): | |
| """ | |
| Prepares the input for the model. | |
| Args: | |
| df_trans (np.ndarray): The transformed values. | |
| Returns: | |
| np.ndarray: The prepared input. | |
| """ | |
| return df_trans[: self.window_size, :].reshape( | |
| (1, self.window_size, len(self.column_names)) | |
| ) | |
| def get_input_output(self, df: pd.DataFrame): | |
| """ | |
| Extracts the input and output column names from the dataframe. | |
| Args: | |
| df (pd.DataFrame): The dataframe. | |
| """ | |
| for zone in self.zones: | |
| for column in df.columns: | |
| if ( | |
| f"zone_0{zone}" in column | |
| and "co2" not in column | |
| and "hw_valve" not in column | |
| and "cooling_sp" not in column | |
| and "heating_sp" not in column | |
| ): | |
| self.output_col_names.append(column) | |
| self.input_col_names = [ | |
| f"rtu_00{self.rtu_id}_fltrd_sa_flow_tn", | |
| f"rtu_00{self.rtu_id}_sa_temp", | |
| "air_temp_set_1", | |
| "air_temp_set_2", | |
| "dew_point_temperature_set_1d", | |
| "relative_humidity_set_1", | |
| "solar_radiation_set_1", | |
| ] | |
| for zone in self.zones: | |
| for column in df.columns: | |
| if f"zone_0{zone}" in column: | |
| if "cooling_sp" in column or "heating_sp" in column: | |
| self.input_col_names.append(column) | |
| self.column_names = self.output_col_names + self.input_col_names | |
| self.num_inputs = len(self.input_col_names) | |
| self.num_outputs = len(self.output_col_names) | |
| self.df = pd.DataFrame(columns=self.column_names) | |
| def extract_data_from_message(self, df: pd.DataFrame): | |
| """ | |
| Extracts data from the message payload and returns a dataframe. | |
| Args: | |
| message: The message containing the payload. | |
| Returns: | |
| pd.DataFrame: The extracted data as a dataframe. | |
| """ | |
| if self.get_cols == True: | |
| self.get_input_output(df) | |
| self.get_cols = False | |
| df = df[self.column_names] | |
| len_df = len(self.df) | |
| if len_df != 0: | |
| self.df = pd.concat([self.df, df], axis=0) | |
| else: | |
| self.df = df | |
| if len_df > 31: | |
| self.df = self.df.iloc[len_df - 31 : len_df] | |
| self.df.loc[len_df] = self.df.mean() | |
| return self.df | |
| else: | |
| return None | |
| def fit(self, df: pd.DataFrame): | |
| """ | |
| Fits the model with the extracted data and returns the prepared input and transformed data. | |
| Args: | |
| message: The message containing the data. | |
| Returns: | |
| tuple: A tuple containing the prepared input and transformed data. | |
| """ | |
| df_window = self.extract_data_from_message(df) | |
| if df_window is not None: | |
| df_trans = self.transform_window(df_window) | |
| df_new = self.prepare_input(df_trans) | |
| else: | |
| df_new = None | |
| df_trans = None | |
| return df_new, df_trans | |