Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from tensorflow.keras.models import load_model | |
| import joblib | |
| class RTUAnomalizer2: | |
| """ | |
| Class for performing anomaly detection on RTU (Roof Top Unit) data. | |
| """ | |
| def __init__( | |
| self, | |
| prediction_model_path=None, | |
| clustering_model_paths=None, | |
| pca_model_paths=None, | |
| num_inputs=None, | |
| num_outputs=None, | |
| ): | |
| """ | |
| Initialize the RTUAnomalizer object. | |
| Args: | |
| prediction_model_path (str): Path to the prediction model file. | |
| clustering_model_paths (list): List of paths to the clustering model files. | |
| num_inputs (int): Number of input features. | |
| num_outputs (int): Number of output features. | |
| """ | |
| self.model = None | |
| self.kmeans_models = [] | |
| self.pca_models = [] | |
| self.num_inputs = num_inputs | |
| self.num_outputs = num_outputs | |
| if ( | |
| prediction_model_path is not None | |
| and clustering_model_paths is not None | |
| and pca_model_paths is not None | |
| ): | |
| self.load_models( | |
| prediction_model_path, clustering_model_paths, pca_model_paths | |
| ) | |
| ( | |
| self.actual_list, | |
| self.pred_list, | |
| self.resid_list, | |
| self.resid_pca_list, | |
| self.distance_list, | |
| ) = self.initialize_lists() | |
| self.fault_1 = 0 | |
| self.fault_2 = 0 | |
| def initialize_lists(self, size=30): | |
| """ | |
| Initialize lists for storing actual, predicted, and residual values. | |
| Args: | |
| size (int): Size of the lists. | |
| Returns: | |
| tuple: A tuple containing three lists initialized with zeros. | |
| """ | |
| initial_values = [[0] * self.num_outputs] * size | |
| initial_values1 = [[0] * 4] * size | |
| initial_values2 = [[0] * 2] * size * 2 | |
| return ( | |
| initial_values.copy(), | |
| initial_values.copy(), | |
| initial_values.copy(), | |
| initial_values1.copy(), | |
| initial_values2.copy(), | |
| ) | |
| def load_models( | |
| self, prediction_model_path, clustering_model_paths, pca_model_paths | |
| ): | |
| """ | |
| Load the prediction and clustering models. | |
| Args: | |
| prediction_model_path (str): Path to the prediction model file. | |
| clustering_model_paths (list): List of paths to the clustering model files. | |
| """ | |
| self.model = load_model(prediction_model_path) | |
| for path in clustering_model_paths: | |
| self.kmeans_models.append(joblib.load(path)) | |
| for path in pca_model_paths: | |
| self.pca_models.append(joblib.load(path)) | |
| def predict(self, df_new): | |
| """ | |
| Make predictions using the prediction model. | |
| Args: | |
| df_new (DataFrame): Input data for prediction. | |
| Returns: | |
| array: Predicted values. | |
| """ | |
| return self.model.predict(df_new, verbose=0) | |
| def calculate_residuals(self, df_trans, pred): | |
| """ | |
| Calculate the residuals between actual and predicted values. | |
| Args: | |
| df_trans (DataFrame): Transformed input data. | |
| pred (array): Predicted values. | |
| Returns: | |
| tuple: A tuple containing the actual values and residuals. | |
| """ | |
| actual = df_trans[30, : self.num_outputs] | |
| resid = actual - pred | |
| return actual, resid | |
| def resize_prediction(self, pred, df_trans): | |
| """ | |
| Resize the predicted values to match the shape of the transformed input data. | |
| Args: | |
| pred (array): Predicted values. | |
| df_trans (DataFrame): Transformed input data. | |
| Returns: | |
| array: Resized predicted values. | |
| """ | |
| pred = np.resize( | |
| pred, (pred.shape[0], pred.shape[1] + len(df_trans[30, self.num_outputs :])) | |
| ) | |
| pred[:, -len(df_trans[30, self.num_outputs :]) :] = df_trans[ | |
| 30, self.num_outputs : | |
| ] | |
| return pred | |
| def inverse_transform(self, scaler, pred, df_trans): | |
| """ | |
| Inverse transform the predicted and actual values. | |
| Args: | |
| scaler (object): Scaler object for inverse transformation. | |
| pred (array): Predicted values. | |
| df_trans (DataFrame): Transformed input data. | |
| Returns: | |
| tuple: A tuple containing the actual and predicted values after inverse transformation. | |
| """ | |
| pred = scaler.inverse_transform(np.array(pred)) | |
| actual = scaler.inverse_transform(np.array([df_trans[30, :]])) | |
| return actual, pred | |
| def update_lists(self, actual, pred, resid): | |
| """ | |
| Update the lists of actual, predicted, and residual values. | |
| Args: | |
| actual_list (list): List of actual values. | |
| pred_list (list): List of predicted values. | |
| resid_list (list): List of residual values. | |
| actual (array): Actual values. | |
| pred (array): Predicted values. | |
| resid (array): Residual values. | |
| Returns: | |
| tuple: A tuple containing the updated lists of actual, predicted, and residual values. | |
| """ | |
| self.actual_list.pop(0) | |
| self.pred_list.pop(0) | |
| self.resid_list.pop(0) | |
| self.actual_list.append(actual.flatten().tolist()) | |
| self.pred_list.append(pred.flatten().tolist()) | |
| self.resid_list.append(resid.flatten().tolist()) | |
| return self.actual_list, self.pred_list, self.resid_list | |
| def calculate_distances(self, resid): | |
| """ | |
| Calculate the distances between residuals and cluster centers. | |
| Args: | |
| resid (array): Residual values. | |
| Returns: | |
| array: Array of distances. | |
| """ | |
| dist = [] | |
| resid_pcas = [] | |
| for i, model in enumerate(self.kmeans_models): | |
| resid_pca = self.pca_models[i].transform( | |
| resid[:, (i * 7) + 1 : (i * 7) + 8] | |
| ) | |
| resid_pcas = resid_pcas + resid_pca.tolist() | |
| dist.append( | |
| np.linalg.norm( | |
| resid_pca - model.cluster_centers_[0], | |
| ord=2, | |
| axis=1, | |
| ) | |
| ) | |
| self.distance_list.pop(0) | |
| self.distance_list.append(np.concatenate(dist).tolist()) | |
| resid_pcas = np.array(resid_pcas).flatten().tolist() | |
| self.resid_pca_list.pop(0) | |
| self.resid_pca_list.append(resid_pcas) | |
| return np.array(dist) | |
| def fault_windowing(self): | |
| rtu_1_dist = np.array(self.distance_list).T[0] > 1.5 # rtu_3_threshold | |
| rtu_1_dist = [int(x) for x in rtu_1_dist] | |
| if sum(rtu_1_dist) > 0.8 * 60: # 80% of the 60 min window | |
| self.fault_1 = 1 | |
| else: | |
| self.fault_1 = 0 | |
| rtu_2_dist = np.array(self.distance_list).T[1] > 1 # rtu_4_threshold | |
| rtu_2_dist = [int(x) for x in rtu_2_dist] | |
| if sum(rtu_2_dist) > 0.8 * 60: # 80% of the 60 min window | |
| self.fault_2 = 1 | |
| else: | |
| self.fault_2 = 0 | |
| def pipeline(self, df_new, df_trans, scaler): | |
| """ | |
| Perform the anomaly detection pipeline. | |
| Args: | |
| df_new (DataFrame): Input data for prediction. | |
| df_trans (DataFrame): Transformed input data. | |
| scaler (object): Scaler object for inverse transformation. | |
| Returns: | |
| tuple: A tuple containing the lists of actual, predicted, and residual values, and the distances. | |
| """ | |
| pred = self.predict(df_new) | |
| actual, resid = self.calculate_residuals(df_trans, pred) | |
| pred = self.resize_prediction(pred, df_trans) | |
| actual, pred = self.inverse_transform(scaler, pred, df_trans) | |
| actual_list, pred_list, resid_list = self.update_lists(actual, pred, resid) | |
| dist = self.calculate_distances(resid) | |
| self.fault_windowing() | |
| return ( | |
| actual_list, | |
| pred_list, | |
| resid_list, | |
| self.resid_pca_list, | |
| dist, | |
| np.array(self.distance_list[30:]).T[0] > 1.5, # rtu_3_threshold | |
| np.array(self.distance_list[30:]).T[1] > 1, # rtu_4_threshold | |
| self.fault_1, | |
| self.fault_2, | |
| ) | |