Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from typing import List, Dict, Any | |
| import streamlit as st | |
| from nearest_neighbor_grouping import NearestNeighborGrouping | |
| class SoilLayerAnalyzer: | |
| def __init__(self): | |
| self.consistency_mapping = { | |
| "soft": 1, "loose": 1, | |
| "medium": 2, "medium dense": 2, | |
| "stiff": 3, "dense": 3, | |
| "very stiff": 4, "very dense": 4, | |
| "hard": 5 | |
| } | |
| self.nn_grouping = NearestNeighborGrouping() | |
| def validate_layer_continuity(self, layers: List[Dict]) -> List[Dict]: | |
| """Validate and fix layer depth continuity""" | |
| if not layers: | |
| return layers | |
| # Sort layers by depth_from | |
| sorted_layers = sorted(layers, key=lambda x: x.get("depth_from", 0)) | |
| validated_layers = [] | |
| for i, layer in enumerate(sorted_layers): | |
| if i == 0: | |
| # First layer starts from 0 | |
| layer["depth_from"] = 0 | |
| else: | |
| # Each layer starts where previous ends | |
| layer["depth_from"] = validated_layers[-1]["depth_to"] | |
| validated_layers.append(layer) | |
| return validated_layers | |
| def identify_similar_layers(self, layers: List[Dict], similarity_threshold: float = 0.8) -> List[List[int]]: | |
| """Identify layers that could potentially be grouped together""" | |
| similar_groups = [] | |
| for i, layer1 in enumerate(layers): | |
| for j, layer2 in enumerate(layers[i+1:], i+1): | |
| similarity_score = self._calculate_layer_similarity(layer1, layer2) | |
| if similarity_score >= similarity_threshold: | |
| # Check if either layer is already in a group | |
| group_found = False | |
| for group in similar_groups: | |
| if i in group: | |
| if j not in group: | |
| group.append(j) | |
| group_found = True | |
| break | |
| elif j in group: | |
| if i not in group: | |
| group.append(i) | |
| group_found = True | |
| break | |
| if not group_found: | |
| similar_groups.append([i, j]) | |
| return similar_groups | |
| def _calculate_layer_similarity(self, layer1: Dict, layer2: Dict) -> float: | |
| """Calculate similarity score between two layers""" | |
| score = 0.0 | |
| total_weight = 0.0 | |
| # Soil type similarity (weight: 0.4) | |
| if layer1.get("soil_type", "").lower() == layer2.get("soil_type", "").lower(): | |
| score += 0.4 | |
| total_weight += 0.4 | |
| # Strength parameter similarity (weight: 0.3) | |
| strength1 = layer1.get("strength_value") | |
| strength2 = layer2.get("strength_value") | |
| if strength1 is not None and strength2 is not None: | |
| if abs(strength1 - strength2) / max(strength1, strength2) < 0.3: | |
| score += 0.3 | |
| total_weight += 0.3 | |
| # Consistency similarity (weight: 0.2) | |
| consistency1 = self._extract_consistency(layer1.get("soil_type", "")) | |
| consistency2 = self._extract_consistency(layer2.get("soil_type", "")) | |
| if consistency1 == consistency2: | |
| score += 0.2 | |
| total_weight += 0.2 | |
| # Color similarity (weight: 0.1) | |
| color1 = layer1.get("color") or "" | |
| color2 = layer2.get("color") or "" | |
| if color1.lower() == color2.lower(): | |
| score += 0.1 | |
| total_weight += 0.1 | |
| return score / total_weight if total_weight > 0 else 0.0 | |
| def _extract_consistency(self, soil_type: str) -> str: | |
| """Extract consistency from soil type description""" | |
| soil_type_lower = soil_type.lower() | |
| for consistency in self.consistency_mapping.keys(): | |
| if consistency in soil_type_lower: | |
| return consistency | |
| return "" | |
| def suggest_layer_merging(self, layers: List[Dict]) -> Dict[str, Any]: | |
| """Suggest which layers could be merged""" | |
| similar_groups = self.identify_similar_layers(layers) | |
| suggestions = [] | |
| for group in similar_groups: | |
| if len(group) >= 2: | |
| group_layers = [layers[i] for i in group] | |
| # Check if layers are adjacent or close | |
| depths = [(layer["depth_from"], layer["depth_to"]) for layer in group_layers] | |
| depths.sort() | |
| # Check for adjacency | |
| is_adjacent = True | |
| for i in range(len(depths) - 1): | |
| if abs(depths[i][1] - depths[i+1][0]) > 0.5: # 0.5m tolerance | |
| is_adjacent = False | |
| break | |
| if is_adjacent: | |
| suggestions.append({ | |
| "layer_indices": group, | |
| "reason": "Similar soil properties and adjacent depths", | |
| "merged_layer": self._create_merged_layer(group_layers) | |
| }) | |
| return {"suggestions": suggestions} | |
| def _create_merged_layer(self, layers: List[Dict]) -> Dict: | |
| """Create a merged layer from multiple similar layers""" | |
| if not layers: | |
| return {} | |
| merged = { | |
| "layer_id": f"merged_{layers[0]['layer_id']}_{layers[-1]['layer_id']}", | |
| "depth_from": min(layer["depth_from"] for layer in layers), | |
| "depth_to": max(layer["depth_to"] for layer in layers), | |
| "soil_type": layers[0]["soil_type"], # Use first layer's type | |
| "description": f"Merged layer: {', '.join([layer.get('description', '') for layer in layers])}", | |
| "strength_parameter": layers[0].get("strength_parameter", ""), | |
| "strength_value": np.mean([layer.get("strength_value", 0) for layer in layers if layer.get("strength_value") is not None]), | |
| "color": layers[0].get("color", ""), | |
| "moisture": layers[0].get("moisture", ""), | |
| "consistency": layers[0].get("consistency", "") | |
| } | |
| return merged | |
| def suggest_layer_splitting(self, layers: List[Dict]) -> Dict[str, Any]: | |
| """Suggest which layers should be split based on thickness and variability""" | |
| suggestions = [] | |
| for i, layer in enumerate(layers): | |
| thickness = layer["depth_to"] - layer["depth_from"] | |
| # Suggest splitting very thick layers (>5m) | |
| if thickness > 5.0: | |
| suggested_splits = int(thickness / 2.5) # Split into ~2.5m sublayers | |
| suggestions.append({ | |
| "layer_index": i, | |
| "reason": f"Layer is very thick ({thickness:.1f}m) - consider splitting into {suggested_splits} sublayers", | |
| "suggested_depths": np.linspace(layer["depth_from"], layer["depth_to"], suggested_splits + 1).tolist() | |
| }) | |
| # Check for significant strength variation indication | |
| description = layer.get("description", "").lower() | |
| if any(word in description for word in ["varying", "variable", "interbedded", "alternating"]): | |
| suggestions.append({ | |
| "layer_index": i, | |
| "reason": "Description indicates variable conditions - consider splitting based on detailed log", | |
| "suggested_depths": [layer["depth_from"], (layer["depth_from"] + layer["depth_to"])/2, layer["depth_to"]] | |
| }) | |
| return {"suggestions": suggestions} | |
| def optimize_layer_division(self, layers: List[Dict], merge_similar=True, split_thick=True) -> Dict[str, Any]: | |
| """Optimize layer division by merging similar layers and splitting thick ones""" | |
| optimized_layers = layers.copy() | |
| changes_made = [] | |
| # Traditional merge suggestions | |
| merge_suggestions = {"suggestions": []} | |
| if merge_similar: | |
| merge_suggestions = self.suggest_layer_merging(optimized_layers) | |
| for suggestion in merge_suggestions["suggestions"]: | |
| changes_made.append(f"Merged layers {suggestion['layer_indices']}: {suggestion['reason']}") | |
| # Nearest neighbor analysis | |
| nn_analysis = self.analyze_nearest_neighbors(optimized_layers) | |
| # Split suggestions | |
| split_suggestions = {"suggestions": []} | |
| if split_thick: | |
| split_suggestions = self.suggest_layer_splitting(optimized_layers) | |
| for suggestion in split_suggestions["suggestions"]: | |
| changes_made.append(f"Suggested splitting layer {suggestion['layer_index']}: {suggestion['reason']}") | |
| return { | |
| "optimized_layers": optimized_layers, | |
| "changes_made": changes_made, | |
| "merge_suggestions": merge_suggestions, | |
| "split_suggestions": split_suggestions, | |
| "nearest_neighbor_analysis": nn_analysis | |
| } | |
| def analyze_nearest_neighbors(self, layers: List[Dict], k: int = 3, similarity_threshold: float = 0.55) -> Dict[str, Any]: | |
| """Perform nearest neighbor analysis on soil layers""" | |
| if len(layers) < 2: | |
| return {"message": "Insufficient layers for neighbor analysis"} | |
| try: | |
| # Get nearest neighbor analysis | |
| nn_suggestions = self.nn_grouping.suggest_layer_merging(layers, similarity_threshold) | |
| # Get detailed neighbor report | |
| neighbor_report = self.nn_grouping.get_layer_neighbors_report(layers, k) | |
| return { | |
| "neighbor_groups": nn_suggestions.get("groups", []), | |
| "merge_recommendations": nn_suggestions.get("recommendations", []), | |
| "cluster_labels": nn_suggestions.get("cluster_labels", []), | |
| "neighbor_report": neighbor_report, | |
| "analysis_parameters": { | |
| "similarity_threshold": similarity_threshold, | |
| "k_neighbors": k, | |
| "total_layers": len(layers) | |
| } | |
| } | |
| except Exception as e: | |
| st.error(f"Error in nearest neighbor analysis: {str(e)}") | |
| return {"error": str(e)} | |
| def get_grouping_summary(self, layers: List[Dict]) -> Dict[str, Any]: | |
| """Get a comprehensive summary of layer grouping analysis""" | |
| nn_analysis = self.analyze_nearest_neighbors(layers) | |
| if "error" in nn_analysis: | |
| return nn_analysis | |
| summary = { | |
| "total_layers": len(layers), | |
| "identified_groups": len(nn_analysis.get("neighbor_groups", [])), | |
| "merge_recommendations": len(nn_analysis.get("merge_recommendations", [])), | |
| "group_details": [] | |
| } | |
| # Add details for each group | |
| for i, group in enumerate(nn_analysis.get("neighbor_groups", [])): | |
| group_detail = { | |
| "group_id": group.get("group_id", i+1), | |
| "layers_in_group": group.get("group_size", 0), | |
| "depth_range": f"{group.get('depth_range', {}).get('min', 0):.1f}-{group.get('depth_range', {}).get('max', 0):.1f}m", | |
| "total_thickness": group.get('depth_range', {}).get('total_thickness', 0), | |
| "dominant_soil_type": max(group.get('soil_types', {}).items(), key=lambda x: x[1])[0] if group.get('soil_types') else "unknown", | |
| "layer_ids": group.get("layer_ids", []) | |
| } | |
| summary["group_details"].append(group_detail) | |
| return summary | |
| def calculate_layer_statistics(self, layers: List[Dict]) -> Dict[str, Any]: | |
| """Calculate statistics for the soil profile""" | |
| if not layers: | |
| return {} | |
| total_depth = max(layer["depth_to"] for layer in layers) | |
| layer_count = len(layers) | |
| # Soil type distribution | |
| soil_types = {} | |
| for layer in layers: | |
| soil_type = layer.get("soil_type", "unknown") | |
| thickness = layer["depth_to"] - layer["depth_from"] | |
| if soil_type in soil_types: | |
| soil_types[soil_type] += thickness | |
| else: | |
| soil_types[soil_type] = thickness | |
| # Convert to percentages | |
| soil_type_percentages = {k: (v/total_depth)*100 for k, v in soil_types.items()} | |
| # Average layer thickness | |
| thicknesses = [layer["depth_to"] - layer["depth_from"] for layer in layers] | |
| avg_thickness = np.mean(thicknesses) | |
| return { | |
| "total_depth": total_depth, | |
| "layer_count": layer_count, | |
| "average_layer_thickness": avg_thickness, | |
| "soil_type_distribution": soil_type_percentages, | |
| "thickest_layer": max(thicknesses), | |
| "thinnest_layer": min(thicknesses) | |
| } |