File size: 12,054 Bytes
65f2520
 
 
f5cad9a
65f2520
 
6373c5a
65f2520
6373c5a
65f2520
 
ec48f8d
f5cad9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec48f8d
f5cad9a
 
 
65f2520
6373c5a
b1b7e3c
6373c5a
b1b7e3c
6373c5a
 
 
 
f5cad9a
 
 
 
6373c5a
b1b7e3c
6373c5a
 
 
 
 
 
 
f5cad9a
6373c5a
 
 
 
 
 
65f2520
 
f5cad9a
 
 
 
6373c5a
 
 
 
 
 
f5cad9a
6373c5a
 
 
 
f5cad9a
 
 
 
6373c5a
 
 
 
 
 
 
 
 
 
f5cad9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec48f8d
f5cad9a
 
2a2cf15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6373c5a
 
 
 
 
f5cad9a
6373c5a
f5cad9a
6373c5a
f5cad9a
 
6373c5a
 
f5cad9a
6373c5a
f5cad9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6373c5a
f5cad9a
6373c5a
 
f5cad9a
6373c5a
 
f5cad9a
ec48f8d
f5cad9a
 
ec48f8d
 
 
f5cad9a
6373c5a
 
f5cad9a
6373c5a
 
f5cad9a
 
 
ec48f8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f5cad9a
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
"""

Preprocessing utilities for polymer classification app.

Adapted from the original scripts/preprocess_dataset.py for Hugging Face Spaces deployment.

Supports both Raman and FTIR spectroscopy modalities.

"""

from __future__ import annotations
import numpy as np
from numpy.typing import DTypeLike
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter
from typing import Tuple, Literal, Optional

TARGET_LENGTH = 500  # Frozen default per PREPROCESSING_BASELINE

# Modality-specific validation ranges (cm⁻¹)
MODALITY_RANGES = {
    "raman": (200, 4000),  # Typical Raman range
    "ftir": (400, 4000),  # FTIR wavenumber range
}

# Modality-specific preprocessing parameters
MODALITY_PARAMS = {
    "raman": {
        "baseline_degree": 2,
        "smooth_window": 11,
        "smooth_polyorder": 2,
        "cosmic_ray_removal": False,
    },
    "ftir": {
        "baseline_degree": 2,
        "smooth_window": 13,  # Slightly larger window for FTIR
        "smooth_polyorder": 2,
        "cosmic_ray_removal": False,
        "atmospheric_correction": False,  # Placeholder for future implementation
    },
}


def _ensure_1d_equal(x: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    x = np.asarray(x, dtype=float)
    y = np.asarray(y, dtype=float)
    if x.ndim != 1 or y.ndim != 1 or x.size != y.size or x.size < 2:
        raise ValueError("x and y must be 1D arrays of equal length >= 2")
    return x, y


def resample_spectrum(

    x: np.ndarray, y: np.ndarray, target_len: int = TARGET_LENGTH

) -> tuple[np.ndarray, np.ndarray]:
    """Linear re-sampling onto a uniform grid of length target_len."""
    x, y = _ensure_1d_equal(x, y)
    order = np.argsort(x)
    x_sorted, y_sorted = x[order], y[order]
    x_new = np.linspace(x_sorted[0], x_sorted[-1], int(target_len))
    f = interp1d(x_sorted, y_sorted, kind="linear", assume_sorted=True)
    y_new = f(x_new)
    return x_new, y_new


def remove_baseline(y: np.ndarray, degree: int = 2) -> np.ndarray:
    """Polynomial baseline subtraction (degree=2 default)"""
    y = np.asarray(y, dtype=float)
    x_idx = np.arange(y.size, dtype=float)
    coeffs = np.polyfit(x_idx, y, deg=int(degree))
    baseline = np.polyval(coeffs, x_idx)
    return y - baseline


def smooth_spectrum(

    y: np.ndarray, window_length: int = 11, polyorder: int = 2

) -> np.ndarray:
    """Savitzky-Golay smoothing with safe/odd window enforcement"""
    y = np.asarray(y, dtype=float)
    window_length = int(window_length)
    polyorder = int(polyorder)
    # === window must be odd and >= polyorder+1 ===
    if window_length % 2 == 0:
        window_length += 1
    min_win = polyorder + 1
    if min_win % 2 == 0:
        min_win += 1
    window_length = max(window_length, min_win)
    return savgol_filter(
        y, window_length=window_length, polyorder=polyorder, mode="interp"
    )


def normalize_spectrum(y: np.ndarray) -> np.ndarray:
    """Min-max normalization to [0, 1] with constant-signal guard."""
    y = np.asarray(y, dtype=float)
    y_min = float(np.min(y))
    y_max = float(np.max(y))
    if np.isclose(y_max - y_min, 0.0):
        return np.zeros_like(y)
    return (y - y_min) / (y_max - y_min)


def validate_spectrum_range(x: np.ndarray, modality: str = "raman") -> bool:
    """Validate that spectrum wavenumbers are within expected range for modality."""
    if modality not in MODALITY_RANGES:
        raise ValueError(
            f"Unknown modality '{modality}'. Supported: {list(MODALITY_RANGES.keys())}"
        )

    min_range, max_range = MODALITY_RANGES[modality]
    x_min, x_max = np.min(x), np.max(x)

    # Check if majority of data points are within range
    in_range = np.sum((x >= min_range) & (x <= max_range))
    total_points = len(x)

    return bool((in_range / total_points) >= 0.7)  # At least 70% should be in range


def validate_spectrum_modality(

    x_data: np.ndarray, y_data: np.ndarray, selected_modality: str

) -> Tuple[bool, list[str]]:
    """

    Validate that spectrum characteristics match the selected modality.



    Args:

        x_data: Wavenumber array (cm⁻¹)

        y_data: Intensity array

        selected_modality: Selected modality ('raman' or 'ftir')



    Returns:

        Tuple of (is_valid, list_of_issues)

    """
    x_data = np.asarray(x_data)
    y_data = np.asarray(y_data)
    issues = []

    if selected_modality not in MODALITY_RANGES:
        issues.append(f"Unknown modality: {selected_modality}")
        return False, issues

    expected_min, expected_max = MODALITY_RANGES[selected_modality]
    actual_min, actual_max = np.min(x_data), np.max(x_data)

    # Check wavenumber range
    if actual_min < expected_min * 0.8:  # Allow 20% tolerance
        issues.append(
            f"Minimum wavenumber ({actual_min:.0f} cm⁻¹) is below typical {selected_modality.upper()} range (>{expected_min} cm⁻¹)"
        )

    if actual_max > expected_max * 1.2:  # Allow 20% tolerance
        issues.append(
            f"Maximum wavenumber ({actual_max:.0f} cm⁻¹) is above typical {selected_modality.upper()} range (<{expected_max} cm⁻¹)"
        )

    # Check for reasonable data range coverage
    data_range = actual_max - actual_min
    expected_range = expected_max - expected_min
    if data_range < expected_range * 0.3:  # Should cover at least 30% of expected range
        issues.append(
            f"Data range ({data_range:.0f} cm⁻¹) seems narrow for {selected_modality.upper()} spectroscopy"
        )

    # FTIR-specific checks
    if selected_modality == "ftir":
        # Check for typical FTIR characteristics
        if actual_min > 1000:  # FTIR usually includes fingerprint region
            issues.append(
                "FTIR data should typically include fingerprint region (400-1500 cm⁻¹)"
            )

    # Raman-specific checks
    if selected_modality == "raman":
        # Check for typical Raman characteristics
        if actual_max < 1000:  # Raman usually extends to higher wavenumbers
            issues.append(
                "Raman data typically extends to higher wavenumbers (>1000 cm⁻¹)"
            )

    return len(issues) == 0, issues


def preprocess_spectrum(

    x: np.ndarray,

    y: np.ndarray,

    *,

    target_len: int = TARGET_LENGTH,

    modality: str = "raman",  # New parameter for modality-specific processing

    do_baseline: bool = True,

    degree: int | None = None,  # Will use modality default if None

    do_smooth: bool = True,

    window_length: int | None = None,  # Will use modality default if None

    polyorder: int | None = None,  # Will use modality default if None

    do_normalize: bool = True,

    out_dtype: DTypeLike = np.float32,

    validate_range: bool = True,

) -> tuple[np.ndarray, np.ndarray]:
    """

    Modality-aware preprocessing: resample -> baseline -> smooth -> normalize



    Args:

        x, y: Input spectrum data

        target_len: Target length for resampling

        modality: 'raman' or 'ftir' for modality-specific processing

        do_baseline: Enable baseline correction

        degree: Polynomial degree for baseline (uses modality default if None)

        do_smooth: Enable smoothing

        window_length: Smoothing window length (uses modality default if None)

        polyorder: Polynomial order for smoothing (uses modality default if None)

        do_normalize: Enable normalization

        out_dtype: Output data type

        validate_range: Check if wavenumbers are in expected range for modality



    Returns:

        Tuple of (resampled_x, processed_y)

    """
    # Validate modality
    if modality not in MODALITY_PARAMS:
        raise ValueError(
            f"Unsupported modality '{modality}'. Supported: {list(MODALITY_PARAMS.keys())}"
        )

    # Get modality-specific parameters
    modality_config = MODALITY_PARAMS[modality]

    # Use modality defaults if parameters not specified
    if degree is None:
        degree = modality_config["baseline_degree"]
    if window_length is None:
        window_length = modality_config["smooth_window"]
    if polyorder is None:
        polyorder = modality_config["smooth_polyorder"]

    # Validate spectrum range if requested
    if validate_range:
        if not validate_spectrum_range(x, modality):
            print(
                f"Warning: Spectrum wavenumbers may not be optimal for {modality.upper()} analysis"
            )

    # Standard preprocessing pipeline
    x_rs, y_rs = resample_spectrum(x, y, target_len=target_len)

    if do_baseline:
        y_rs = remove_baseline(y_rs, degree=degree)

    if do_smooth:
        y_rs = smooth_spectrum(y_rs, window_length=window_length, polyorder=polyorder)

    # FTIR-specific processing
    if modality == "ftir":
        if modality_config.get("atmospheric_correction", False):
            y_rs = remove_atmospheric_interference(y_rs)
        if modality_config.get("water_correction", False):
            y_rs = remove_water_vapor_bands(y_rs, x_rs)

    if do_normalize:
        y_rs = normalize_spectrum(y_rs)

    # === Coerce to a real dtype to satisfy static checkers & runtime ===
    out_dt = np.dtype(out_dtype)
    return x_rs.astype(out_dt, copy=False), y_rs.astype(out_dt, copy=False)


def remove_atmospheric_interference(y: np.ndarray) -> np.ndarray:
    """Remove atmospheric CO2 and H2O interference common in FTIR."""
    y = np.asarray(y, dtype=float)

    # Simple atmospheric correction using median filtering
    # This is a basic implementation - in practice would use reference spectra
    from scipy.signal import medfilt

    # Apply median filter to reduce sharp atmospheric lines
    corrected = medfilt(y, kernel_size=5)

    # Blend with original to preserve peak structure
    alpha = 0.7  # Weight for original spectrum
    return alpha * y + (1 - alpha) * corrected


def remove_water_vapor_bands(y: np.ndarray, x: np.ndarray) -> np.ndarray:
    """Remove water vapor interference bands in FTIR spectra."""
    y = np.asarray(y, dtype=float)
    x = np.asarray(x, dtype=float)

    # Common water vapor regions in FTIR (cm⁻¹)
    water_regions = [(3500, 3800), (1300, 1800)]

    corrected_y = y.copy()

    for low, high in water_regions:
        # Find indices in water vapor region
        mask = (x >= low) & (x <= high)
        if np.any(mask):
            # Simple linear interpolation across water regions
            indices = np.where(mask)[0]
            if len(indices) > 2:
                start_idx, end_idx = indices[0], indices[-1]
                if start_idx > 0 and end_idx < len(y) - 1:
                    # Linear interpolation between boundary points
                    start_val = y[start_idx - 1]
                    end_val = y[end_idx + 1]
                    interp_vals = np.linspace(start_val, end_val, len(indices))
                    corrected_y[mask] = interp_vals

    return corrected_y


def apply_ftir_specific_processing(

    x: np.ndarray,

    y: np.ndarray,

    atmospheric_correction: bool = False,

    water_correction: bool = False,

) -> tuple[np.ndarray, np.ndarray]:
    """Apply FTIR-specific preprocessing steps."""
    processed_y = y.copy()

    if atmospheric_correction:
        processed_y = remove_atmospheric_interference(processed_y)

    if water_correction:
        processed_y = remove_water_vapor_bands(processed_y, x)

    return x, processed_y


def get_modality_info(modality: str) -> dict:
    """Get processing parameters and validation ranges for a modality."""
    if modality not in MODALITY_PARAMS:
        raise ValueError(f"Unknown modality '{modality}'")

    return {
        "range": MODALITY_RANGES[modality],
        "params": MODALITY_PARAMS[modality].copy(),
    }