Spaces:
Sleeping
Sleeping
| # source myenv/bin/activate | |
| # deactivate | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| from torch.utils.data import TensorDataset | |
| import matplotlib.pyplot as plt | |
| import shap | |
| import os | |
| import torch.nn as nn | |
| import math | |
| from pytorch_lightning import LightningModule | |
| from PIL import Image | |
| from joblib import load | |
| # Display logo | |
| logo = Image.open('AI_logo.png') | |
| st.image(logo, width=100) | |
| # Model Components | |
| class PositionalEncoding(nn.Module): | |
| def __init__(self, d_model, max_len=5000): | |
| super(PositionalEncoding, self).__init__() | |
| self.dropout = nn.Dropout(p=0.1) | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len).unsqueeze(1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term) | |
| pe = pe.unsqueeze(0).transpose(0, 1) | |
| self.register_buffer('pe', pe) | |
| def forward(self, x): | |
| x = x + self.pe[:x.size(0), :] | |
| return self.dropout(x) | |
| class EQ_encoder(nn.Module): | |
| def __init__(self): | |
| super(EQ_encoder, self).__init__() | |
| self.lstm_layer = nn.LSTM(input_size=1, hidden_size=100, num_layers=10, batch_first=True) | |
| self.dense1 = nn.Linear(100, 50) | |
| self.dense2 = nn.Linear(50, 16) | |
| self.relu = nn.ReLU() | |
| def forward(self, x): | |
| output, (hidden_last, cell_last) = self.lstm_layer(x) | |
| last_output = hidden_last[-1] | |
| x = last_output.reshape(x.size(0), -1) | |
| x = self.dense1(x) | |
| x = torch.relu(x) | |
| x = self.dense2(x) | |
| x = torch.relu(x) | |
| return x | |
| class AttentionBlock(nn.Module): | |
| def __init__(self, d_model, num_heads, dropout=0.1): | |
| super(AttentionBlock, self).__init__() | |
| assert d_model % num_heads == 0, "d_model must be divisible by num_heads" | |
| self.d_k = d_model // num_heads | |
| self.num_heads = num_heads | |
| self.w_q = nn.Linear(d_model, d_model) | |
| self.w_k = nn.Linear(d_model, d_model) | |
| self.w_v = nn.Linear(d_model, d_model) | |
| self.w_o = nn.Linear(d_model, d_model) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, query, key, value, mask=None): | |
| batch_size = query.size(0) | |
| query = self.w_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) | |
| key = self.w_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) | |
| value = self.w_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) | |
| scores = torch.matmul(query, key.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) | |
| if mask is not None: | |
| scores = scores.masked_fill(mask == 0, -1e9) | |
| attention_weights = torch.softmax(scores, dim=-1) | |
| attention_weights = self.dropout(attention_weights) | |
| output = torch.matmul(attention_weights, value) | |
| output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k) | |
| output = self.w_o(output) | |
| return output | |
| class FFTAttentionReducer(nn.Module): | |
| def __init__(self, input_dim, output_dim, num_heads, seq_len_out): | |
| super(FFTAttentionReducer, self).__init__() | |
| self.positional_encoding = PositionalEncoding(d_model=64) | |
| self.embed_dim = 64 | |
| self.heads = num_heads | |
| self.head_dim = self.embed_dim // self.heads | |
| assert (self.head_dim * self.heads == self.embed_dim), "Embed dim must be divisible by number of heads" | |
| self.input_proj = nn.Linear(2, 64) | |
| self.q = nn.Linear(self.embed_dim, self.embed_dim) | |
| self.k = nn.Linear(self.embed_dim, self.embed_dim) | |
| self.v = nn.Linear(self.embed_dim, self.embed_dim) | |
| self.fc_out = nn.Linear(self.embed_dim, self.embed_dim) | |
| self.fc1 = nn.Linear(self.embed_dim, output_dim) | |
| self.pool = nn.AdaptiveAvgPool1d(seq_len_out) | |
| self.norm1 = nn.LayerNorm(self.embed_dim) | |
| def forward(self, x): | |
| x = self.input_proj(x) | |
| x = self.positional_encoding(x) | |
| batch_size, seq_len, _ = x.shape | |
| for _ in range(1): | |
| residual = x | |
| q = self.q(x).reshape(batch_size, seq_len, self.heads, self.head_dim).permute(0, 2, 1, 3) | |
| k = self.k(x).reshape(batch_size, seq_len, self.heads, self.head_dim).permute(0, 2, 1, 3) | |
| v = self.v(x).reshape(batch_size, seq_len, self.heads, self.head_dim).permute(0, 2, 1, 3) | |
| attention_scores = torch.matmul(q, k.transpose(-2, -1)) / (self.embed_dim ** (1/2)) | |
| attention_scores = torch.softmax(attention_scores, dim=-1) | |
| out = torch.matmul(attention_scores, v) | |
| out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim) | |
| x = self.norm1(out + residual) | |
| out = self.fc_out(x) | |
| out = self.fc1(out) | |
| out = out.transpose(1, 2) | |
| out = self.pool(out.contiguous()) | |
| out = out.transpose(1, 2) | |
| return out | |
| class PositionWiseFeedForward(nn.Module): | |
| def __init__(self, d_model, d_ff): | |
| super(PositionWiseFeedForward, self).__init__() | |
| self.fc1 = nn.Linear(d_model, d_ff) | |
| self.relu = nn.ReLU() | |
| self.tanh = nn.Tanh() | |
| self.fc2 = nn.Linear(d_ff, d_model) | |
| self.leaky_relu = nn.LeakyReLU(negative_slope=0.01) | |
| def forward(self, x): | |
| return self.fc2(self.leaky_relu(self.fc1(x))) | |
| class encoder(nn.Module): | |
| def __init__(self, dim=2): | |
| super(encoder, self).__init__() | |
| self.input_proj = nn.Linear(2, 64) | |
| self.dim = dim | |
| self.attention_layer = nn.MultiheadAttention(embed_dim=64, num_heads=4, dropout=0.1) | |
| self.norm1 = nn.LayerNorm(64) | |
| self.norm2 = nn.LayerNorm(64) | |
| self.dense1 = nn.Linear(40, 16) | |
| self.dense2 = nn.Linear(16, 2) | |
| self.softmax = nn.Softmax(dim=1) | |
| self.model_eq = EQ_encoder() | |
| self.positional_encoding = PositionalEncoding(d_model=64) | |
| self.feed_forward = PositionWiseFeedForward(d_model=64, d_ff=20) | |
| self.atten = AttentionBlock(d_model=64, num_heads=4, dropout=0.1) | |
| self.relu = nn.ReLU() | |
| self.tanh = nn.Tanh() | |
| self.sigmoid = nn.Sigmoid() | |
| def forward(self, x): | |
| x = self.input_proj(x) | |
| x = self.positional_encoding(x) | |
| for _ in range(1): | |
| residual = x | |
| x = self.atten(x, x, x) | |
| x = self.norm1(x) | |
| x = self.feed_forward(x) | |
| x = self.norm2(x) | |
| x = x + residual | |
| return x | |
| class encoder_LSTM(nn.Module): | |
| def __init__(self): | |
| super(encoder_LSTM, self).__init__() | |
| self.lstm_layer = nn.LSTM(input_size=4, hidden_size=20, num_layers=5, batch_first=True) | |
| self.dense1 = nn.Linear(100, 50) | |
| self.dense2 = nn.Linear(50, 16) | |
| self.softmax = nn.Softmax(dim=1) | |
| def forward(self, x): | |
| output, (hidden_last, cell_last) = self.lstm_layer(x) | |
| last_output = hidden_last[-1] | |
| x = last_output.reshape(x.size(0), -1) | |
| x = self.dense1(x) | |
| x = torch.sigmoid(x) | |
| x = self.dense2(x) | |
| return x | |
| class com_model(LightningModule): | |
| def __init__(self): | |
| super(com_model, self).__init__() | |
| self.best_val_loss = float('inf') | |
| self.best_val_acc = 0 | |
| self.train_loss_history = [] | |
| self.train_loss_accuracy = [] | |
| self.train_accuracy_history = [] | |
| self.val_loss_history = [] | |
| self.val_accuracy_history = [] | |
| self.model_eq = EQ_encoder() | |
| self.encoder = encoder(dim=6) | |
| self.flatten = nn.Flatten() | |
| self.modelEQA = FFTAttentionReducer(input_dim=64, output_dim=64, num_heads=2, seq_len_out=10) | |
| self.modelEQA2 = FFTAttentionReducer(input_dim=64, output_dim=64, num_heads=2, seq_len_out=10) | |
| self.cross_attention_layer = nn.MultiheadAttention(embed_dim=64, num_heads=8) | |
| self.encoder_LSTM = encoder_LSTM() | |
| self.dense2 = nn.Linear(2*640, 100) | |
| self.dense3 = nn.Linear(100, 30) | |
| self.dense4 = nn.Linear(34, 2) | |
| self.relu = nn.ReLU() | |
| self.dropout = torch.nn.Dropout(0.4) | |
| self.leaky_relu = nn.LeakyReLU(negative_slope=0.01) | |
| self.softmax = nn.Softmax(dim=1) | |
| def forward(self, x1, x2, x3): | |
| int1_x = self.encoder(x1) | |
| int2_x = self.modelEQA(x2) | |
| concatenated_tensor = torch.cat((int1_x, int2_x), dim=2) | |
| x = concatenated_tensor.view(-1, 2*640) | |
| x = self.dense2(x) | |
| x = self.dropout(x) | |
| x = self.dense3(x) | |
| x = self.leaky_relu(x) | |
| x = torch.cat((x, x3), dim=1) | |
| x = self.dense4(x) | |
| x = self.leaky_relu(x) | |
| out_y = self.softmax(x) | |
| return out_y | |
| def configure_optimizers(self): | |
| optimizer = torch.optim.Adam(self.parameters(), lr=1e-4, weight_decay=1e-3) | |
| return optimizer | |
| def create_waterfall_plot(shap_values, n_features, output_index, X, model, base_values, raw_data, sample_name, lique_y, test_data, df_spt=None, df_soil_type=None): | |
| """Create a waterfall plot for SHAP values""" | |
| model.eval() | |
| with torch.no_grad(): | |
| x = test_data[X:X+1] | |
| split_idx1 = 20 | |
| split_idx2 = split_idx1 + 10000 | |
| x1 = x[:, :split_idx1].view(-1, 2, 10).permute(0, 2, 1) | |
| x2 = x[:, split_idx1:split_idx2].view(-1, 2, 5000).permute(0, 2, 1) | |
| x3 = x[:, split_idx2:] | |
| predictions = model(x1, x2, x3) | |
| # Get the liquefaction probability (1 - no_liquefaction_prob) | |
| model_prob = predictions[0, output_index].item() | |
| base_value = base_values[output_index] | |
| sample_shap = shap_values[X, :, output_index].copy() # Make a copy to avoid modifying original | |
| # Scale SHAP values to match model prediction | |
| shap_sum = sample_shap.sum() | |
| target_sum = model_prob - base_value | |
| if shap_sum != 0: # Avoid division by zero | |
| scaling_factor = target_sum / shap_sum | |
| sample_shap = sample_shap * scaling_factor | |
| verification_results = { | |
| 'base_value': base_value, | |
| 'model_prediction': model_prob, | |
| 'shap_sum': sample_shap.sum(), | |
| 'final_probability': base_value + sample_shap.sum(), | |
| 'prediction_difference': abs(model_prob - (base_value + sample_shap.sum())) | |
| } | |
| # Process features | |
| feature_names = [] | |
| feature_values = [] | |
| shap_values_list = [] | |
| # Process SPT and Soil features (first 20) | |
| for idx in range(20): | |
| if idx < 10: | |
| name = f'SPT_{idx+1}' | |
| val = df_spt.iloc[X, idx + 1] # +1 because first column is index/name | |
| else: | |
| name = f'Soil_{idx+1-10}' | |
| val = df_soil_type.iloc[X, idx - 9] # -9 to get correct soil type column | |
| feature_names.append(name) | |
| feature_values.append(float(val)) | |
| shap_values_list.append(float(sample_shap[idx])) | |
| # Add combined EQ feature | |
| eq_sum = float(np.sum(sample_shap[20:5020])) | |
| if abs(eq_sum) > 0: | |
| feature_names.append('EQ') | |
| feature_values.append(0) # EQ feature is already normalized | |
| shap_values_list.append(eq_sum) | |
| # Add combined Depth feature | |
| depth_sum = float(np.sum(sample_shap[5020:10020])) | |
| if abs(depth_sum) > 0: | |
| feature_names.append('Depth') | |
| feature_values.append(df_spt.iloc[X, 17]) | |
| shap_values_list.append(depth_sum) | |
| # Add site features | |
| feature_names.extend(['WT']) | |
| feature_values.append(df_spt.iloc[X, 11]) | |
| shap_values_list.append(sample_shap[10020]) | |
| feature_names.extend(['Dist_epi']) | |
| feature_values.append(df_spt.iloc[X, 12]) | |
| shap_values_list.append(sample_shap[10021]) | |
| feature_names.extend(['Dist_Water']) | |
| feature_values.append(df_spt.iloc[X, 18]) | |
| shap_values_list.append(sample_shap[10022]) | |
| feature_names.extend(['Vs30']) | |
| feature_values.append(df_spt.iloc[X, 19]) | |
| shap_values_list.append(sample_shap[10023]) | |
| # Convert to numpy arrays for consistent handling | |
| abs_values = np.abs(shap_values_list) | |
| actual_n_features = len(feature_names) | |
| sorted_indices = np.argsort(abs_values) | |
| top_indices = sorted_indices[-actual_n_features:].tolist() | |
| # Create final arrays | |
| final_names = [] | |
| final_values = [] | |
| final_shap = [] | |
| for i in reversed(top_indices): | |
| if 0 <= i < len(feature_names): | |
| final_names.append(feature_names[i]) | |
| final_values.append(feature_values[i]) | |
| final_shap.append(shap_values_list[i]) | |
| # Create SHAP explanation | |
| explainer = shap.Explanation( | |
| values=np.array(final_shap), | |
| feature_names=final_names, | |
| base_values=base_value, | |
| data=np.array(final_values) | |
| ) | |
| # Create plot | |
| plt.clf() | |
| plt.close('all') | |
| fig = plt.figure(figsize=(12, 16)) | |
| shap.plots.waterfall(explainer, max_display=len(final_names), show=False) | |
| plt.title( | |
| f'Sample {X+1}, {sample_name[X][0]} ({lique_y[X][0]})', | |
| fontsize=16, | |
| pad=20, | |
| fontweight='bold' | |
| ) | |
| # Save plot | |
| os.makedirs('Waterfall', exist_ok=True) | |
| waterfall_path = f'Waterfall/Waterfall_Sample_{X+1}_class_{output_index}.png' | |
| fig.savefig(waterfall_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| return waterfall_path, verification_results | |
| def load_model(): | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| model = com_model() | |
| model.load_state_dict(torch.load('R4V6.3_Model.pth', map_location=device)) | |
| model = model.to(device) | |
| model.eval() | |
| return model | |
| def preprocess_fft_eq(data): | |
| """Apply FFT preprocessing to earthquake data""" | |
| # Ensure data is float32 | |
| data = data.astype(np.float32) | |
| # Reshape to 2D if needed (samples, time_steps) | |
| orig_shape = data.shape | |
| if len(orig_shape) == 3: | |
| data = data.reshape(orig_shape[0], orig_shape[1]) | |
| # Convert to torch tensor | |
| data = torch.from_numpy(data).float() | |
| # Apply FFT | |
| fft_result = torch.fft.fft(data, dim=1) | |
| # Get magnitude spectrum | |
| magnitude = torch.abs(fft_result) | |
| # Normalize | |
| magnitude = magnitude / 150 | |
| # Convert back to numpy and reshape to original dimensions | |
| magnitude = magnitude.numpy() | |
| if len(orig_shape) == 3: | |
| magnitude = magnitude.reshape(orig_shape) | |
| return magnitude | |
| def preprocess_data(df_spt, df_soil_type, df_EQ_data): | |
| # Initialize scalers | |
| scalers = load('fitted_scalers/all_scalers.joblib') | |
| scaler1 = scalers['scaler1'] | |
| scaler2 = scalers['scaler2'] | |
| scaler3 = scalers['scaler3'] | |
| scaler6 = scalers['scaler6'] | |
| # Convert dataframes to numpy arrays | |
| spt = np.array(df_spt) | |
| soil_type = np.array(df_soil_type) | |
| EQ_dta = np.array(df_EQ_data) | |
| # Process SPT data | |
| data_spt = scaler1.transform(spt[:, 1:11]) | |
| data_soil_type = soil_type[:, 1:11]/2 # normalize | |
| # Process feature data | |
| feature_n = spt[:, 11:13] | |
| feature = scaler2.transform(feature_n) | |
| # Process water and vs30 data | |
| dis_water = spt[:, 18:19] | |
| vs_30 = spt[:, 19:20] | |
| dis_water = scaler3.transform(dis_water) | |
| vs_30r = scaler6.transform(vs_30) | |
| # Process EQ data | |
| EQ_data = EQ_dta[:, 1:5001] | |
| EQ_depth_S = spt[:, 17:18]/30 | |
| # Reshape EQ data | |
| EQ_data = EQ_data.astype(np.float32) | |
| EQ_data = np.reshape(EQ_data, (-1, EQ_data.shape[1], 1)) | |
| EQ_data_fft = preprocess_fft_eq(EQ_data) | |
| # Create EQ feature | |
| EQ_feature = np.zeros((EQ_data_fft.shape[0], EQ_data_fft.shape[1], 2)) | |
| EQ_feature[:,:,0:1] = EQ_data_fft | |
| for i in range(0, (EQ_data.shape[0])): | |
| EQ_feature[i,:,1] = EQ_depth_S[i,0] | |
| # Create soil data | |
| soil_data = np.stack([data_spt, data_soil_type], axis=2) | |
| X_train_CNN = np.zeros((soil_data.shape[0], soil_data.shape[1], feature.shape[1])) | |
| X_train_CNN[:,:,0:2] = soil_data | |
| # Create feature_sta | |
| feature_sta = np.concatenate((feature, dis_water, vs_30r), axis=1) | |
| return X_train_CNN, EQ_feature, feature_sta | |
| def main(): | |
| st.title("Liquefaction Probability Calculator V 1.0") | |
| # Initialize session state | |
| if 'processed' not in st.session_state: | |
| st.session_state.processed = False | |
| # Add example file download | |
| with open('input.xlsx', 'rb') as file: | |
| st.download_button( | |
| label="Download Example Input File", | |
| data=file, | |
| file_name="example_input.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| # File upload | |
| uploaded_file = st.file_uploader("Upload Excel file", type=['xlsx']) | |
| if uploaded_file is not None: | |
| try: | |
| if not st.session_state.processed: | |
| # Read the Excel file | |
| df_spt = pd.read_excel(uploaded_file, sheet_name='SPT') | |
| df_soil_type = pd.read_excel(uploaded_file, sheet_name='soil_type') | |
| df_EQ_data = pd.read_excel(uploaded_file, sheet_name='EQ_data') | |
| st.success("File uploaded successfully!") | |
| # Add calculate button | |
| if st.button("Calculate Liquefaction Probability"): | |
| with st.spinner("Processing data and calculating probabilities..."): | |
| # Preprocess data | |
| X_train_CNN, EQ_feature, feature_sta = preprocess_data(df_spt, df_soil_type, df_EQ_data) | |
| # Load model | |
| model = load_model() | |
| # Convert to tensors | |
| X_train_CNN = torch.FloatTensor(X_train_CNN) | |
| EQ_feature = torch.FloatTensor(EQ_feature) | |
| feature_sta = torch.FloatTensor(feature_sta) | |
| # Make prediction | |
| with torch.no_grad(): | |
| predictions = model(X_train_CNN, EQ_feature, feature_sta) | |
| # Display results | |
| st.subheader("Prediction Results") | |
| # Create a DataFrame for results | |
| liquefaction_probs = [pred[1].item() for pred in predictions] | |
| results_df = pd.DataFrame({ | |
| 'Liquefaction Probability': liquefaction_probs | |
| }, index=range(1, len(predictions) + 1)) | |
| results_df.index.name = 'Sample' | |
| # Display results in a table | |
| st.dataframe( | |
| results_df.style.format({ | |
| 'Liquefaction Probability': '{:.4f}' | |
| }), | |
| use_container_width=True | |
| ) | |
| # Create and display SHAP waterfall plots | |
| st.subheader("SHAP Analysis") | |
| # Load pre-computed SHAP values | |
| loaded_shap_values = np.load('V10.1_shap_values.npy') | |
| for i in range(len(predictions)): | |
| with st.expander(f"Sample {i+1}"): | |
| # Create waterfall plot | |
| waterfall_path, _ = create_waterfall_plot( | |
| shap_values=loaded_shap_values, | |
| n_features=25, | |
| output_index=1, | |
| X=i, | |
| model=model, | |
| base_values=[0.4510177, 0.5489824], | |
| raw_data=torch.cat([ | |
| X_train_CNN.reshape(len(X_train_CNN), 10, 2).transpose(-1, 1).reshape(len(X_train_CNN), -1), | |
| EQ_feature.reshape(len(EQ_feature), 5000, 2).transpose(-1, 1).reshape(len(EQ_feature), -1), | |
| feature_sta | |
| ], dim=1), | |
| sample_name=df_spt.iloc[:, :1].values, | |
| lique_y=df_spt.iloc[:, 16:17].values, | |
| test_data=torch.cat([ | |
| X_train_CNN.reshape(len(X_train_CNN), 10, 2).transpose(-1, 1).reshape(len(X_train_CNN), -1), | |
| EQ_feature.reshape(len(EQ_feature), 5000, 2).transpose(-1, 1).reshape(len(EQ_feature), -1), | |
| feature_sta | |
| ], dim=1), | |
| df_spt=df_spt, | |
| df_soil_type=df_soil_type | |
| ) | |
| if os.path.exists(waterfall_path): | |
| st.image(waterfall_path) | |
| st.session_state.processed = True | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| else: | |
| st.session_state.processed = False | |
| if __name__ == "__main__": | |
| main() |