File size: 15,590 Bytes
b4123b8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
"""
Spectral feature extraction for the Sorghum Pipeline.
This module handles extraction of spectral features and analysis
of multispectral data.
"""
import numpy as np
import cv2
from sklearn.decomposition import PCA
from typing import Dict, Any, Optional, List, Tuple
import logging
logger = logging.getLogger(__name__)
class SpectralExtractor:
"""Extracts spectral features from multispectral data."""
def __init__(self, n_components: int = 3):
"""
Initialize spectral extractor.
Args:
n_components: Number of PCA components to extract
"""
self.n_components = n_components
def extract_spectral_features(self, spectral_stack: Dict[str, np.ndarray],
mask: Optional[np.ndarray] = None) -> Dict[str, Any]:
"""
Extract spectral features from multispectral data.
Args:
spectral_stack: Dictionary of spectral bands
mask: Optional binary mask
Returns:
Dictionary containing spectral features
"""
features = {}
try:
# Extract individual band features
features['band_features'] = self._extract_band_features(spectral_stack, mask)
# Extract PCA features
features['pca_features'] = self._extract_pca_features(spectral_stack, mask)
# Extract spectral indices
features['spectral_indices'] = self._extract_spectral_indices(spectral_stack, mask)
# Extract texture features from spectral bands
features['spectral_texture'] = self._extract_spectral_texture(spectral_stack, mask)
logger.debug("Spectral features extracted successfully")
except Exception as e:
logger.error(f"Spectral feature extraction failed: {e}")
return features
def _extract_band_features(self, spectral_stack: Dict[str, np.ndarray],
mask: Optional[np.ndarray] = None) -> Dict[str, Dict[str, float]]:
"""Extract features from individual spectral bands."""
band_features = {}
for band_name, band_data in spectral_stack.items():
try:
# Squeeze to 2D if needed
if band_data.ndim > 2:
band_data = band_data.squeeze()
# Apply mask if provided
if mask is not None and mask.shape == band_data.shape:
masked_data = np.where(mask > 0, band_data, np.nan)
else:
masked_data = band_data
# Compute statistics
valid_data = masked_data[~np.isnan(masked_data)]
if len(valid_data) > 0:
band_features[band_name] = {
'mean': float(np.mean(valid_data)),
'std': float(np.std(valid_data)),
'min': float(np.min(valid_data)),
'max': float(np.max(valid_data)),
'median': float(np.median(valid_data)),
'q25': float(np.percentile(valid_data, 25)),
'q75': float(np.percentile(valid_data, 75)),
'skewness': float(self._compute_skewness(valid_data)),
'kurtosis': float(self._compute_kurtosis(valid_data)),
'entropy': float(self._compute_entropy(valid_data))
}
else:
band_features[band_name] = {
'mean': 0.0, 'std': 0.0, 'min': 0.0, 'max': 0.0,
'median': 0.0, 'q25': 0.0, 'q75': 0.0,
'skewness': 0.0, 'kurtosis': 0.0, 'entropy': 0.0
}
except Exception as e:
logger.error(f"Band feature extraction failed for {band_name}: {e}")
band_features[band_name] = {}
return band_features
def _extract_pca_features(self, spectral_stack: Dict[str, np.ndarray],
mask: Optional[np.ndarray] = None) -> Dict[str, Any]:
"""Extract PCA features from spectral data."""
try:
# Stack all bands
band_names = ['nir', 'red_edge', 'red', 'green']
band_data = []
for band_name in band_names:
if band_name in spectral_stack:
arr = spectral_stack[band_name].squeeze().astype(float)
if mask is not None and mask.shape == arr.shape:
arr = np.where(mask > 0, arr, np.nan)
band_data.append(arr)
if not band_data:
return {}
# Stack bands
full_stack = np.stack(band_data, axis=-1)
h, w, c = full_stack.shape
# Reshape for PCA
flat_data = full_stack.reshape(-1, c)
valid_mask = ~np.isnan(flat_data).any(axis=1)
if valid_mask.sum() == 0:
return {}
# Apply PCA
valid_data = flat_data[valid_mask]
pca = PCA(n_components=min(self.n_components, valid_data.shape[1]))
pca_result = pca.fit_transform(valid_data)
# Create full result array
full_result = np.full((h * w, self.n_components), np.nan)
full_result[valid_mask] = pca_result
# Reshape back to image dimensions
pca_components = {}
for i in range(self.n_components):
component = full_result[:, i].reshape(h, w)
pca_components[f'pca_{i+1}'] = component
# Compute statistics for this component
valid_component = component[~np.isnan(component)]
if len(valid_component) > 0:
pca_components[f'pca_{i+1}_stats'] = {
'mean': float(np.mean(valid_component)),
'std': float(np.std(valid_component)),
'min': float(np.min(valid_component)),
'max': float(np.max(valid_component))
}
# Add PCA metadata
pca_components['explained_variance_ratio'] = pca.explained_variance_ratio_.tolist()
pca_components['total_variance_explained'] = float(np.sum(pca.explained_variance_ratio_))
return pca_components
except Exception as e:
logger.error(f"PCA feature extraction failed: {e}")
return {}
def _extract_spectral_indices(self, spectral_stack: Dict[str, np.ndarray],
mask: Optional[np.ndarray] = None) -> Dict[str, np.ndarray]:
"""Extract basic spectral indices."""
indices = {}
try:
# Get required bands
nir = spectral_stack.get('nir', None)
red = spectral_stack.get('red', None)
green = spectral_stack.get('green', None)
red_edge = spectral_stack.get('red_edge', None)
if nir is not None:
nir = nir.squeeze().astype(float)
if red is not None:
red = red.squeeze().astype(float)
if green is not None:
green = green.squeeze().astype(float)
if red_edge is not None:
red_edge = red_edge.squeeze().astype(float)
# Apply mask
if mask is not None:
if nir is not None and mask.shape == nir.shape:
nir = np.where(mask > 0, nir, np.nan)
if red is not None and mask.shape == red.shape:
red = np.where(mask > 0, red, np.nan)
if green is not None and mask.shape == green.shape:
green = np.where(mask > 0, green, np.nan)
if red_edge is not None and mask.shape == red_edge.shape:
red_edge = np.where(mask > 0, red_edge, np.nan)
# Compute basic indices
if nir is not None and red is not None:
indices['nir_red_ratio'] = nir / (red + 1e-10)
indices['nir_red_diff'] = nir - red
if nir is not None and green is not None:
indices['nir_green_ratio'] = nir / (green + 1e-10)
indices['nir_green_diff'] = nir - green
if red is not None and green is not None:
indices['red_green_ratio'] = red / (green + 1e-10)
indices['red_green_diff'] = red - green
if nir is not None and red_edge is not None:
indices['nir_red_edge_ratio'] = nir / (red_edge + 1e-10)
indices['nir_red_edge_diff'] = nir - red_edge
# Compute band ratios
if nir is not None and red is not None and green is not None:
indices['nir_red_green_sum'] = nir + red + green
indices['nir_red_green_mean'] = (nir + red + green) / 3
except Exception as e:
logger.error(f"Spectral index extraction failed: {e}")
return indices
def _extract_spectral_texture(self, spectral_stack: Dict[str, np.ndarray],
mask: Optional[np.ndarray] = None) -> Dict[str, Any]:
"""Extract texture features from spectral bands."""
texture_features = {}
try:
from .texture import TextureExtractor
texture_extractor = TextureExtractor()
for band_name, band_data in spectral_stack.items():
try:
# Prepare grayscale image
gray_data = band_data.squeeze().astype(float)
# Apply mask
if mask is not None and mask.shape == gray_data.shape:
gray_data = np.where(mask > 0, gray_data, np.nan)
# Normalize to 0-255
valid_data = gray_data[~np.isnan(gray_data)]
if len(valid_data) > 0:
m, M = np.min(valid_data), np.max(valid_data)
if M > m:
normalized = ((gray_data - m) / (M - m) * 255).astype(np.uint8)
else:
normalized = np.zeros_like(gray_data, dtype=np.uint8)
else:
normalized = np.zeros_like(gray_data, dtype=np.uint8)
# Extract texture features
band_texture = texture_extractor.extract_all_texture_features(normalized)
texture_features[band_name] = band_texture
except Exception as e:
logger.error(f"Spectral texture extraction failed for {band_name}: {e}")
texture_features[band_name] = {}
except ImportError:
logger.warning("TextureExtractor not available for spectral texture analysis")
return texture_features
def _compute_skewness(self, data: np.ndarray) -> float:
"""Compute skewness of data."""
if len(data) < 3:
return 0.0
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0.0
return np.mean(((data - mean) / std) ** 3)
def _compute_kurtosis(self, data: np.ndarray) -> float:
"""Compute kurtosis of data."""
if len(data) < 4:
return 0.0
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0.0
return np.mean(((data - mean) / std) ** 4) - 3
def _compute_entropy(self, data: np.ndarray) -> float:
"""Compute entropy of data."""
if len(data) == 0:
return 0.0
# Create histogram
hist, _ = np.histogram(data, bins=256, range=(0, 256))
hist = hist / np.sum(hist) # Normalize
# Remove zero probabilities
hist = hist[hist > 0]
# Compute entropy
return -np.sum(hist * np.log2(hist))
def create_spectral_visualization(self, spectral_stack: Dict[str, np.ndarray],
pca_features: Dict[str, Any]) -> np.ndarray:
"""
Create visualization of spectral features.
Args:
spectral_stack: Original spectral data
pca_features: PCA features
Returns:
Visualization image
"""
try:
# Preferred visualization: RGB = (Red, Red-Edge, Green)
if 'red' in spectral_stack and 'red_edge' in spectral_stack and 'green' in spectral_stack:
red = spectral_stack['red'].squeeze()
red_edge = spectral_stack['red_edge'].squeeze()
green = spectral_stack['green'].squeeze()
# Normalize each band
red_norm = self._normalize_band(red)
red_edge_norm = self._normalize_band(red_edge)
green_norm = self._normalize_band(green)
# Create composite (Red, Red-Edge, Green)
rgb_composite = np.stack([red_norm, red_edge_norm, green_norm], axis=-1)
return rgb_composite.astype(np.uint8)
# Fallback visualization: RGB = (NIR, Red, Green)
if 'red' in spectral_stack and 'green' in spectral_stack and 'nir' in spectral_stack:
red = spectral_stack['red'].squeeze()
green = spectral_stack['green'].squeeze()
nir = spectral_stack['nir'].squeeze()
# Normalize each band
red_norm = self._normalize_band(red)
green_norm = self._normalize_band(green)
nir_norm = self._normalize_band(nir)
rgb_composite = np.stack([nir_norm, red_norm, green_norm], axis=-1)
return rgb_composite.astype(np.uint8)
# Fallback to first PCA component
elif 'pca_1' in pca_features:
pca1 = pca_features['pca_1']
pca1_norm = self._normalize_band(pca1)
return np.stack([pca1_norm, pca1_norm, pca1_norm], axis=-1).astype(np.uint8)
else:
# Return empty image
return np.zeros((100, 100, 3), dtype=np.uint8)
except Exception as e:
logger.error(f"Spectral visualization creation failed: {e}")
return np.zeros((100, 100, 3), dtype=np.uint8)
def _normalize_band(self, band: np.ndarray) -> np.ndarray:
"""Normalize band to 0-255 range."""
valid_data = band[~np.isnan(band)]
if len(valid_data) == 0:
return np.zeros_like(band, dtype=np.uint8)
m, M = np.min(valid_data), np.max(valid_data)
if M > m:
normalized = ((band - m) / (M - m) * 255).astype(np.uint8)
else:
normalized = np.zeros_like(band, dtype=np.uint8)
return normalized
|