Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from sklearn.neighbors import NearestNeighbors | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.cluster import DBSCAN | |
| import pandas as pd | |
| from typing import List, Dict, Any, Tuple | |
| import streamlit as st | |
| class NearestNeighborGrouping: | |
| def __init__(self): | |
| self.scaler = StandardScaler() | |
| self.feature_weights = { | |
| 'depth_mid': 0.05, # Depth position (less important for similarity) | |
| 'thickness': 0.05, # Layer thickness (less important) | |
| 'soil_type_encoded': 0.35, # Soil type (most important) | |
| 'consistency_encoded': 0.30, # Consistency/density (very important) | |
| 'strength_value': 0.15, # Strength parameter | |
| 'moisture_encoded': 0.05, # Moisture content | |
| 'color_encoded': 0.05 # Color | |
| } | |
| def encode_categorical_features(self, layers: List[Dict]) -> pd.DataFrame: | |
| """Convert categorical features to numerical for clustering""" | |
| # Create DataFrame from layers | |
| df_data = [] | |
| for i, layer in enumerate(layers): | |
| layer_data = { | |
| 'layer_index': i, | |
| 'layer_id': layer.get('layer_id', i+1), | |
| 'depth_from': layer.get('depth_from', 0), | |
| 'depth_to': layer.get('depth_to', 0), | |
| 'depth_mid': (layer.get('depth_from', 0) + layer.get('depth_to', 0)) / 2, | |
| 'thickness': layer.get('depth_to', 0) - layer.get('depth_from', 0), | |
| 'soil_type': layer.get('soil_type', 'unknown').lower(), | |
| 'consistency': layer.get('consistency', 'unknown').lower(), | |
| 'strength_value': layer.get('strength_value', 0) or layer.get('calculated_su', 0) or 0, | |
| 'moisture': layer.get('moisture', 'unknown').lower(), | |
| 'color': layer.get('color', 'unknown').lower(), | |
| 'description': layer.get('description', '') | |
| } | |
| df_data.append(layer_data) | |
| df = pd.DataFrame(df_data) | |
| # Encode soil types | |
| soil_type_mapping = { | |
| 'clay': 1, 'silt': 2, 'sand': 3, 'gravel': 4, 'rock': 5, 'unknown': 0 | |
| } | |
| df['soil_type_encoded'] = df['soil_type'].map(soil_type_mapping).fillna(0) | |
| # Encode consistency/density | |
| consistency_mapping = { | |
| 'very soft': 1, 'soft': 2, 'medium': 3, 'stiff': 4, 'very stiff': 5, 'hard': 6, | |
| 'very loose': 1, 'loose': 2, 'medium dense': 3, 'dense': 4, 'very dense': 5, | |
| 'unknown': 0 | |
| } | |
| df['consistency_encoded'] = df['consistency'].map(consistency_mapping).fillna(0) | |
| # Encode moisture | |
| moisture_mapping = { | |
| 'dry': 1, 'moist': 2, 'wet': 3, 'saturated': 4, 'unknown': 0 | |
| } | |
| df['moisture_encoded'] = df['moisture'].map(moisture_mapping).fillna(0) | |
| # Encode colors (simplified) | |
| color_mapping = { | |
| 'brown': 1, 'gray': 2, 'black': 3, 'red': 4, 'yellow': 5, 'white': 6, 'unknown': 0 | |
| } | |
| df['color_encoded'] = df['color'].map(color_mapping).fillna(0) | |
| return df | |
| def calculate_layer_similarity(self, df: pd.DataFrame) -> np.ndarray: | |
| """Calculate similarity matrix between layers using weighted features""" | |
| # Select features for similarity calculation | |
| feature_columns = [ | |
| 'depth_mid', 'thickness', 'soil_type_encoded', | |
| 'consistency_encoded', 'strength_value', 'moisture_encoded', 'color_encoded' | |
| ] | |
| # Prepare feature matrix | |
| features = df[feature_columns].copy() | |
| # Handle missing values | |
| features = features.fillna(0) | |
| # Apply feature weights | |
| for col in feature_columns: | |
| if col in self.feature_weights: | |
| features[col] = features[col] * self.feature_weights[col] | |
| # Standardize features | |
| features_scaled = self.scaler.fit_transform(features) | |
| # Calculate similarity matrix (using negative euclidean distance) | |
| from sklearn.metrics.pairwise import euclidean_distances | |
| distance_matrix = euclidean_distances(features_scaled) | |
| similarity_matrix = 1 / (1 + distance_matrix) # Convert distance to similarity | |
| return similarity_matrix, features_scaled | |
| def find_nearest_neighbors(self, df: pd.DataFrame, k: int = 3) -> List[Dict]: | |
| """Find k nearest neighbors for each soil layer""" | |
| similarity_matrix, features_scaled = self.calculate_layer_similarity(df) | |
| # Use NearestNeighbors to find k nearest neighbors | |
| nn_model = NearestNeighbors(n_neighbors=min(k+1, len(df)), metric='euclidean') | |
| nn_model.fit(features_scaled) | |
| distances, indices = nn_model.kneighbors(features_scaled) | |
| nearest_neighbors = [] | |
| for i, (layer_distances, layer_indices) in enumerate(zip(distances, indices)): | |
| neighbors = [] | |
| for j, (dist, idx) in enumerate(zip(layer_distances[1:], layer_indices[1:])): # Skip self | |
| neighbor_info = { | |
| 'neighbor_index': int(idx), | |
| 'neighbor_id': df.iloc[idx]['layer_id'], | |
| 'distance': float(dist), | |
| 'similarity_score': float(similarity_matrix[i, idx]), | |
| 'soil_type': df.iloc[idx]['soil_type'], | |
| 'consistency': df.iloc[idx]['consistency'], | |
| 'depth_range': f"{df.iloc[idx]['depth_from']:.1f}-{df.iloc[idx]['depth_to']:.1f}m" | |
| } | |
| neighbors.append(neighbor_info) | |
| layer_nn = { | |
| 'layer_index': i, | |
| 'layer_id': df.iloc[i]['layer_id'], | |
| 'soil_type': df.iloc[i]['soil_type'], | |
| 'consistency': df.iloc[i]['consistency'], | |
| 'depth_range': f"{df.iloc[i]['depth_from']:.1f}-{df.iloc[i]['depth_to']:.1f}m", | |
| 'nearest_neighbors': neighbors | |
| } | |
| nearest_neighbors.append(layer_nn) | |
| return nearest_neighbors | |
| def group_similar_layers(self, df: pd.DataFrame, similarity_threshold: float = 0.7) -> List[List[int]]: | |
| """Group layers using DBSCAN clustering based on similarity""" | |
| similarity_matrix, features_scaled = self.calculate_layer_similarity(df) | |
| # Convert similarity to distance for DBSCAN | |
| distance_matrix = 1 - similarity_matrix | |
| # Use DBSCAN for clustering | |
| eps = 1 - similarity_threshold # Convert similarity threshold to distance | |
| clustering = DBSCAN(eps=eps, min_samples=1, metric='precomputed') | |
| cluster_labels = clustering.fit_predict(distance_matrix) | |
| # Group layers by cluster | |
| clusters = {} | |
| for i, label in enumerate(cluster_labels): | |
| if label not in clusters: | |
| clusters[label] = [] | |
| clusters[label].append(i) | |
| # Convert to list of groups, filter out single-layer groups | |
| layer_groups = [] | |
| for cluster_id, layer_indices in clusters.items(): | |
| if len(layer_indices) > 1: # Only groups with multiple layers | |
| layer_groups.append(layer_indices) | |
| return layer_groups, cluster_labels | |
| def analyze_group_properties(self, df: pd.DataFrame, group_indices: List[int]) -> Dict: | |
| """Analyze properties of a group of similar layers""" | |
| group_layers = df.iloc[group_indices] | |
| analysis = { | |
| 'group_size': len(group_indices), | |
| 'depth_range': { | |
| 'min': group_layers['depth_from'].min(), | |
| 'max': group_layers['depth_to'].max(), | |
| 'total_thickness': group_layers['thickness'].sum() | |
| }, | |
| 'soil_types': group_layers['soil_type'].value_counts().to_dict(), | |
| 'consistencies': group_layers['consistency'].value_counts().to_dict(), | |
| 'strength_stats': { | |
| 'mean': group_layers['strength_value'].mean(), | |
| 'min': group_layers['strength_value'].min(), | |
| 'max': group_layers['strength_value'].max(), | |
| 'std': group_layers['strength_value'].std() | |
| }, | |
| 'layer_ids': group_layers['layer_id'].tolist(), | |
| 'depth_ranges': [f"{row['depth_from']:.1f}-{row['depth_to']:.1f}m" | |
| for _, row in group_layers.iterrows()] | |
| } | |
| return analysis | |
| def suggest_layer_merging(self, layers: List[Dict], similarity_threshold: float = 0.8) -> Dict: | |
| """Suggest which layers should be merged based on nearest neighbor analysis""" | |
| if len(layers) < 2: | |
| return {"groups": [], "recommendations": []} | |
| # Encode features | |
| df = self.encode_categorical_features(layers) | |
| # Find similar layer groups | |
| layer_groups, cluster_labels = self.group_similar_layers(df, similarity_threshold) | |
| # Analyze each group | |
| group_analyses = [] | |
| recommendations = [] | |
| for i, group_indices in enumerate(layer_groups): | |
| group_analysis = self.analyze_group_properties(df, group_indices) | |
| group_analysis['group_id'] = i + 1 | |
| group_analyses.append(group_analysis) | |
| # Check if layers are adjacent or close | |
| group_df = df.iloc[group_indices].sort_values('depth_from') | |
| is_adjacent = self._check_adjacency(group_df) | |
| if is_adjacent: | |
| dominant_soil_type = max(group_analysis['soil_types'].items(), key=lambda x: x[1])[0] | |
| dominant_consistency = max(group_analysis['consistencies'].items(), key=lambda x: x[1])[0] | |
| recommendation = { | |
| 'group_id': i + 1, | |
| 'action': 'merge', | |
| 'reason': f'Similar {dominant_consistency} {dominant_soil_type} layers in adjacent depths', | |
| 'layer_ids': group_analysis['layer_ids'], | |
| 'depth_ranges': group_analysis['depth_ranges'], | |
| 'merged_properties': { | |
| 'soil_type': dominant_soil_type, | |
| 'consistency': dominant_consistency, | |
| 'depth_from': group_analysis['depth_range']['min'], | |
| 'depth_to': group_analysis['depth_range']['max'], | |
| 'thickness': group_analysis['depth_range']['total_thickness'], | |
| 'avg_strength': group_analysis['strength_stats']['mean'] | |
| } | |
| } | |
| recommendations.append(recommendation) | |
| return { | |
| 'groups': group_analyses, | |
| 'recommendations': recommendations, | |
| 'cluster_labels': cluster_labels.tolist() | |
| } | |
| def _check_adjacency(self, group_df: pd.DataFrame, max_gap: float = 0.5) -> bool: | |
| """Check if layers in group are adjacent or nearly adjacent""" | |
| if len(group_df) <= 1: | |
| return True | |
| # Sort by depth | |
| sorted_df = group_df.sort_values('depth_from') | |
| # Check gaps between consecutive layers | |
| for i in range(len(sorted_df) - 1): | |
| current_end = sorted_df.iloc[i]['depth_to'] | |
| next_start = sorted_df.iloc[i + 1]['depth_from'] | |
| gap = next_start - current_end | |
| if gap > max_gap: | |
| return False | |
| return True | |
| def get_layer_neighbors_report(self, layers: List[Dict], k: int = 3) -> str: | |
| """Generate a detailed report of nearest neighbors for each layer""" | |
| if len(layers) < 2: | |
| return "Insufficient layers for neighbor analysis." | |
| df = self.encode_categorical_features(layers) | |
| nearest_neighbors = self.find_nearest_neighbors(df, k) | |
| report_lines = [ | |
| "NEAREST NEIGHBOR ANALYSIS REPORT", | |
| "=" * 50, | |
| "" | |
| ] | |
| for layer_info in nearest_neighbors: | |
| report_lines.append(f"Layer {layer_info['layer_id']}: {layer_info['consistency']} {layer_info['soil_type']} ({layer_info['depth_range']})") | |
| report_lines.append(" Nearest Neighbors:") | |
| for i, neighbor in enumerate(layer_info['nearest_neighbors'][:k], 1): | |
| similarity_pct = neighbor['similarity_score'] * 100 | |
| report_lines.append( | |
| f" {i}. Layer {neighbor['neighbor_id']}: {neighbor['consistency']} {neighbor['soil_type']} " | |
| f"({neighbor['depth_range']}) - Similarity: {similarity_pct:.1f}%" | |
| ) | |
| report_lines.append("") | |
| return "\n".join(report_lines) |