Spaces:
Sleeping
Sleeping
File size: 12,054 Bytes
65f2520 f5cad9a 65f2520 6373c5a 65f2520 6373c5a 65f2520 ec48f8d f5cad9a ec48f8d f5cad9a 65f2520 6373c5a b1b7e3c 6373c5a b1b7e3c 6373c5a f5cad9a 6373c5a b1b7e3c 6373c5a f5cad9a 6373c5a 65f2520 f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a ec48f8d f5cad9a 2a2cf15 6373c5a f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a ec48f8d f5cad9a ec48f8d f5cad9a 6373c5a f5cad9a 6373c5a f5cad9a ec48f8d f5cad9a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
"""
Preprocessing utilities for polymer classification app.
Adapted from the original scripts/preprocess_dataset.py for Hugging Face Spaces deployment.
Supports both Raman and FTIR spectroscopy modalities.
"""
from __future__ import annotations
import numpy as np
from numpy.typing import DTypeLike
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter
from typing import Tuple, Literal, Optional
TARGET_LENGTH = 500 # Frozen default per PREPROCESSING_BASELINE
# Modality-specific validation ranges (cm⁻¹)
MODALITY_RANGES = {
"raman": (200, 4000), # Typical Raman range
"ftir": (400, 4000), # FTIR wavenumber range
}
# Modality-specific preprocessing parameters
MODALITY_PARAMS = {
"raman": {
"baseline_degree": 2,
"smooth_window": 11,
"smooth_polyorder": 2,
"cosmic_ray_removal": False,
},
"ftir": {
"baseline_degree": 2,
"smooth_window": 13, # Slightly larger window for FTIR
"smooth_polyorder": 2,
"cosmic_ray_removal": False,
"atmospheric_correction": False, # Placeholder for future implementation
},
}
def _ensure_1d_equal(x: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
if x.ndim != 1 or y.ndim != 1 or x.size != y.size or x.size < 2:
raise ValueError("x and y must be 1D arrays of equal length >= 2")
return x, y
def resample_spectrum(
x: np.ndarray, y: np.ndarray, target_len: int = TARGET_LENGTH
) -> tuple[np.ndarray, np.ndarray]:
"""Linear re-sampling onto a uniform grid of length target_len."""
x, y = _ensure_1d_equal(x, y)
order = np.argsort(x)
x_sorted, y_sorted = x[order], y[order]
x_new = np.linspace(x_sorted[0], x_sorted[-1], int(target_len))
f = interp1d(x_sorted, y_sorted, kind="linear", assume_sorted=True)
y_new = f(x_new)
return x_new, y_new
def remove_baseline(y: np.ndarray, degree: int = 2) -> np.ndarray:
"""Polynomial baseline subtraction (degree=2 default)"""
y = np.asarray(y, dtype=float)
x_idx = np.arange(y.size, dtype=float)
coeffs = np.polyfit(x_idx, y, deg=int(degree))
baseline = np.polyval(coeffs, x_idx)
return y - baseline
def smooth_spectrum(
y: np.ndarray, window_length: int = 11, polyorder: int = 2
) -> np.ndarray:
"""Savitzky-Golay smoothing with safe/odd window enforcement"""
y = np.asarray(y, dtype=float)
window_length = int(window_length)
polyorder = int(polyorder)
# === window must be odd and >= polyorder+1 ===
if window_length % 2 == 0:
window_length += 1
min_win = polyorder + 1
if min_win % 2 == 0:
min_win += 1
window_length = max(window_length, min_win)
return savgol_filter(
y, window_length=window_length, polyorder=polyorder, mode="interp"
)
def normalize_spectrum(y: np.ndarray) -> np.ndarray:
"""Min-max normalization to [0, 1] with constant-signal guard."""
y = np.asarray(y, dtype=float)
y_min = float(np.min(y))
y_max = float(np.max(y))
if np.isclose(y_max - y_min, 0.0):
return np.zeros_like(y)
return (y - y_min) / (y_max - y_min)
def validate_spectrum_range(x: np.ndarray, modality: str = "raman") -> bool:
"""Validate that spectrum wavenumbers are within expected range for modality."""
if modality not in MODALITY_RANGES:
raise ValueError(
f"Unknown modality '{modality}'. Supported: {list(MODALITY_RANGES.keys())}"
)
min_range, max_range = MODALITY_RANGES[modality]
x_min, x_max = np.min(x), np.max(x)
# Check if majority of data points are within range
in_range = np.sum((x >= min_range) & (x <= max_range))
total_points = len(x)
return bool((in_range / total_points) >= 0.7) # At least 70% should be in range
def validate_spectrum_modality(
x_data: np.ndarray, y_data: np.ndarray, selected_modality: str
) -> Tuple[bool, list[str]]:
"""
Validate that spectrum characteristics match the selected modality.
Args:
x_data: Wavenumber array (cm⁻¹)
y_data: Intensity array
selected_modality: Selected modality ('raman' or 'ftir')
Returns:
Tuple of (is_valid, list_of_issues)
"""
x_data = np.asarray(x_data)
y_data = np.asarray(y_data)
issues = []
if selected_modality not in MODALITY_RANGES:
issues.append(f"Unknown modality: {selected_modality}")
return False, issues
expected_min, expected_max = MODALITY_RANGES[selected_modality]
actual_min, actual_max = np.min(x_data), np.max(x_data)
# Check wavenumber range
if actual_min < expected_min * 0.8: # Allow 20% tolerance
issues.append(
f"Minimum wavenumber ({actual_min:.0f} cm⁻¹) is below typical {selected_modality.upper()} range (>{expected_min} cm⁻¹)"
)
if actual_max > expected_max * 1.2: # Allow 20% tolerance
issues.append(
f"Maximum wavenumber ({actual_max:.0f} cm⁻¹) is above typical {selected_modality.upper()} range (<{expected_max} cm⁻¹)"
)
# Check for reasonable data range coverage
data_range = actual_max - actual_min
expected_range = expected_max - expected_min
if data_range < expected_range * 0.3: # Should cover at least 30% of expected range
issues.append(
f"Data range ({data_range:.0f} cm⁻¹) seems narrow for {selected_modality.upper()} spectroscopy"
)
# FTIR-specific checks
if selected_modality == "ftir":
# Check for typical FTIR characteristics
if actual_min > 1000: # FTIR usually includes fingerprint region
issues.append(
"FTIR data should typically include fingerprint region (400-1500 cm⁻¹)"
)
# Raman-specific checks
if selected_modality == "raman":
# Check for typical Raman characteristics
if actual_max < 1000: # Raman usually extends to higher wavenumbers
issues.append(
"Raman data typically extends to higher wavenumbers (>1000 cm⁻¹)"
)
return len(issues) == 0, issues
def preprocess_spectrum(
x: np.ndarray,
y: np.ndarray,
*,
target_len: int = TARGET_LENGTH,
modality: str = "raman", # New parameter for modality-specific processing
do_baseline: bool = True,
degree: int | None = None, # Will use modality default if None
do_smooth: bool = True,
window_length: int | None = None, # Will use modality default if None
polyorder: int | None = None, # Will use modality default if None
do_normalize: bool = True,
out_dtype: DTypeLike = np.float32,
validate_range: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
"""
Modality-aware preprocessing: resample -> baseline -> smooth -> normalize
Args:
x, y: Input spectrum data
target_len: Target length for resampling
modality: 'raman' or 'ftir' for modality-specific processing
do_baseline: Enable baseline correction
degree: Polynomial degree for baseline (uses modality default if None)
do_smooth: Enable smoothing
window_length: Smoothing window length (uses modality default if None)
polyorder: Polynomial order for smoothing (uses modality default if None)
do_normalize: Enable normalization
out_dtype: Output data type
validate_range: Check if wavenumbers are in expected range for modality
Returns:
Tuple of (resampled_x, processed_y)
"""
# Validate modality
if modality not in MODALITY_PARAMS:
raise ValueError(
f"Unsupported modality '{modality}'. Supported: {list(MODALITY_PARAMS.keys())}"
)
# Get modality-specific parameters
modality_config = MODALITY_PARAMS[modality]
# Use modality defaults if parameters not specified
if degree is None:
degree = modality_config["baseline_degree"]
if window_length is None:
window_length = modality_config["smooth_window"]
if polyorder is None:
polyorder = modality_config["smooth_polyorder"]
# Validate spectrum range if requested
if validate_range:
if not validate_spectrum_range(x, modality):
print(
f"Warning: Spectrum wavenumbers may not be optimal for {modality.upper()} analysis"
)
# Standard preprocessing pipeline
x_rs, y_rs = resample_spectrum(x, y, target_len=target_len)
if do_baseline:
y_rs = remove_baseline(y_rs, degree=degree)
if do_smooth:
y_rs = smooth_spectrum(y_rs, window_length=window_length, polyorder=polyorder)
# FTIR-specific processing
if modality == "ftir":
if modality_config.get("atmospheric_correction", False):
y_rs = remove_atmospheric_interference(y_rs)
if modality_config.get("water_correction", False):
y_rs = remove_water_vapor_bands(y_rs, x_rs)
if do_normalize:
y_rs = normalize_spectrum(y_rs)
# === Coerce to a real dtype to satisfy static checkers & runtime ===
out_dt = np.dtype(out_dtype)
return x_rs.astype(out_dt, copy=False), y_rs.astype(out_dt, copy=False)
def remove_atmospheric_interference(y: np.ndarray) -> np.ndarray:
"""Remove atmospheric CO2 and H2O interference common in FTIR."""
y = np.asarray(y, dtype=float)
# Simple atmospheric correction using median filtering
# This is a basic implementation - in practice would use reference spectra
from scipy.signal import medfilt
# Apply median filter to reduce sharp atmospheric lines
corrected = medfilt(y, kernel_size=5)
# Blend with original to preserve peak structure
alpha = 0.7 # Weight for original spectrum
return alpha * y + (1 - alpha) * corrected
def remove_water_vapor_bands(y: np.ndarray, x: np.ndarray) -> np.ndarray:
"""Remove water vapor interference bands in FTIR spectra."""
y = np.asarray(y, dtype=float)
x = np.asarray(x, dtype=float)
# Common water vapor regions in FTIR (cm⁻¹)
water_regions = [(3500, 3800), (1300, 1800)]
corrected_y = y.copy()
for low, high in water_regions:
# Find indices in water vapor region
mask = (x >= low) & (x <= high)
if np.any(mask):
# Simple linear interpolation across water regions
indices = np.where(mask)[0]
if len(indices) > 2:
start_idx, end_idx = indices[0], indices[-1]
if start_idx > 0 and end_idx < len(y) - 1:
# Linear interpolation between boundary points
start_val = y[start_idx - 1]
end_val = y[end_idx + 1]
interp_vals = np.linspace(start_val, end_val, len(indices))
corrected_y[mask] = interp_vals
return corrected_y
def apply_ftir_specific_processing(
x: np.ndarray,
y: np.ndarray,
atmospheric_correction: bool = False,
water_correction: bool = False,
) -> tuple[np.ndarray, np.ndarray]:
"""Apply FTIR-specific preprocessing steps."""
processed_y = y.copy()
if atmospheric_correction:
processed_y = remove_atmospheric_interference(processed_y)
if water_correction:
processed_y = remove_water_vapor_bands(processed_y, x)
return x, processed_y
def get_modality_info(modality: str) -> dict:
"""Get processing parameters and validation ranges for a modality."""
if modality not in MODALITY_PARAMS:
raise ValueError(f"Unknown modality '{modality}'")
return {
"range": MODALITY_RANGES[modality],
"params": MODALITY_PARAMS[modality].copy(),
}
|