File size: 18,370 Bytes
9470ff7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
238f86d
9470ff7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
238f86d
9470ff7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import os
import warnings

import numpy as np
import pandas as pd
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications import (
    DenseNet121,
    DenseNet169,
    InceptionV3,
    ResNet50,
    ResNet101,
)
from tensorflow.keras.layers import GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from transformers import TFConvNextV2Model, TFSwinModel, TFViTModel

# πŸ’¬ NOTE: Suppress TensorFlow warnings
warnings.filterwarnings("ignore")
tf.get_logger().setLevel("ERROR")


def load_and_preprocess_image(image_path, target_size=(224, 224)):
    """
    Load and preprocess an image.

    Args:
    - image_path (str): Path to the image file.
    - target_size (tuple): Desired image size.

    Returns:
    - np.array: Preprocessed image.
    """
    # Open the image using PIL Image.open and convert it to RGB format
    img = Image.open(image_path).convert("RGB")

    # Resize the image to the target size
    img = img.resize(target_size)

    # Convert the image to a numpy array and scale the pixel values to [0, 1]
    img = np.array(img, dtype=np.float32) / 255.0

    return img


class FoundationalCVModel:
    """
    A Keras module for loading and using foundational computer vision models.

    This class allows you to load and use various foundational computer vision models for tasks like image classification
    or feature extraction. The user can choose between evaluation mode (non-trainable model) and fine-tuning mode (trainable model).

    Attributes:
    ----------
    backbone_name : str
        The name of the foundational CV model to load (e.g., 'resnet50', 'vit_base').
    model : keras.Model
        The compiled Keras model with the selected backbone.

    Parameters:
    ----------
    backbone : str
        The name of the foundational CV model to load. The available backbones can include:
        - ResNet variants: 'resnet50', 'resnet101'
        - DenseNet variants: 'densenet121', 'densenet169'
        - InceptionV3: 'inception_v3'
        - ConvNextV2 variants: 'convnextv2_tiny', 'convnextv2_base', 'convnextv2_large'
        - Swin Transformer variants: 'swin_tiny', 'swin_small', 'swin_base'
        - Vision Transformer (ViT) variants: 'vit_base', 'vit_large'

    mode : str, optional
        The mode of the model, either 'eval' for evaluation or 'fine_tune' for fine-tuning. Default is 'eval'.

    Methods:
    -------
    __init__(self, backbone, mode='eval'):
        Initializes the model with the specified backbone and mode.

    predict(self, images):
        Given a batch of images, performs a forward pass through the model and returns predictions.
        Parameters:
        ----------
        images : numpy.ndarray
            A batch of images to perform prediction on, with shape (batch_size, 224, 224, 3).

        Returns:
        -------
        numpy.ndarray
            Model predictions or extracted features for the provided images.
    """

    def __init__(self, backbone, mode="eval", input_shape=(224, 224, 3)):
        self.backbone_name = backbone

        # Select the backbone from the possible foundational models
        input_layer = Input(shape=input_shape)

        if backbone == "resnet50":
            # Load the ResNet50 model from tensorflow.keras.applications
            self.base_model = ResNet50(
                include_top=False, weights="imagenet", input_tensor=input_layer
            )
        elif backbone == "resnet101":
            # Load the ResNet101 model from tensorflow.keras.applications
            self.base_model = ResNet101(
                include_top=False, weights="imagenet", input_tensor=input_layer
            )
        elif backbone == "densenet121":
            # Load the DenseNet121 model from tensorflow.keras.applications
            self.base_model = DenseNet121(
                include_top=False, weights="imagenet", input_tensor=input_layer
            )
        elif backbone == "densenet169":
            # Load the DenseNet169 model from tensorflow.keras.applications
            self.base_model = DenseNet169(
                include_top=False, weights="imagenet", input_tensor=input_layer
            )
        elif backbone == "inception_v3":
            # Load the InceptionV3 model from tensorflow.keras.applications
            self.base_model = InceptionV3(
                include_top=False, weights="imagenet", input_tensor=input_layer
            )
        elif backbone == "convnextv2_tiny":
            # Load the ConvNeXtV2 Tiny model from transformers
            self.base_model = TFConvNextV2Model.from_pretrained(
                "facebook/convnextv2-tiny-22k-224"
            )
        elif backbone == "convnextv2_base":
            # Load the ConvNeXtV2 Base model from transformers
            self.base_model = TFConvNextV2Model.from_pretrained(
                "facebook/convnextv2-base-22k-224"
            )
        elif backbone == "convnextv2_large":
            # Load the ConvNeXtV2 Large model from transformers
            self.base_model = TFConvNextV2Model.from_pretrained(
                "facebook/convnextv2-large-22k-224"
            )
        elif backbone == "swin_tiny":
            # Load the Swin Transformer Tiny model from transformers
            self.base_model = TFSwinModel.from_pretrained(
                "microsoft/swin-tiny-patch4-window7-224"
            )
        elif backbone == "swin_small":
            # Load the Swin Transformer Small model from transformers
            self.base_model = TFSwinModel.from_pretrained(
                "microsoft/swin-small-patch4-window7-224"
            )
        elif backbone == "swin_base":
            # Load the Swin Transformer Base model from transformers
            self.base_model = TFSwinModel.from_pretrained(
                "microsoft/swin-base-patch4-window7-224"
            )
        elif backbone in ["vit_base", "vit_large"]:
            # Load the Vision Transformer (ViT) model from transformers
            backbone_path = {
                "vit_base": "google/vit-base-patch16-224",
                "vit_large": "google/vit-large-patch16-224",
            }
            self.base_model = TFViTModel.from_pretrained(backbone_path[backbone])
        else:
            raise ValueError(f"Unsupported backbone model: {backbone}")

        if mode == "eval":
            # Set the model to evaluation mode (non-trainable)
            self.base_model.trainable = False
        elif mode == "fine_tune":
            self.base_model.trainable = True

        # πŸ’¬ NOTE: Take into account the model's input requirements. In models from transformers, the input is channels first, but in models from keras.applications, the input is channels last.
        # Additionally, the output of the model is different in both cases, we need to get the pooling of the output layer.

        # If is a model from transformers:
        if backbone in [
            "vit_base",
            "vit_large",
            "convnextv2_tiny",
            "convnextv2_base",
            "convnextv2_large",
            "swin_tiny",
            "swin_small",
            "swin_base",
        ]:
            # Adjust the input for channels first models within the model
            input_layer_transposed = tf.transpose(input_layer, perm=[0, 3, 1, 2])
            hf_outputs = self.base_model(input_layer_transposed)

            # Get the pooling output of the model "pooler_output"
            outputs = hf_outputs.pooler_output  # shape (batch_size, hidden_size)
        # If is a model from keras.applications
        else:
            # Get the pooling output of the model
            # In this case the pooling layer is not included in the model, we can use a pooling layer such as GlobalAveragePooling2D
            x = self.base_model.output
            outputs = GlobalAveragePooling2D()(x)

        # Create the final model with the input layer and the pooling output
        self.model = Model(inputs=input_layer, outputs=outputs)

    def get_output_shape(self):
        """
        Get the output shape of the model.

        Returns:
        -------
        tuple
            The shape of the model's output tensor.
        """
        return self.model.output_shape

    def predict(self, images):
        """
        Predict on a batch of images.

        Parameters:
        ----------
        images : numpy.ndarray
            A batch of images of shape (batch_size, 224, 224, 3).

        Returns:
        -------
        numpy.ndarray
            Predictions or features from the model for the given images.
        """
        # Perform a forward pass through the model and return the predictions
        images = tf.convert_to_tensor(images, dtype=tf.float32)

        # Forward pass (no training)
        predictions = self.model(images, training=False)

        # Convert back to numpy for usability
        return predictions.numpy()


class ImageFolderDataset:
    """
    A custom dataset class for loading and preprocessing images from a folder.

    This class helps in loading images from a given folder, automatically filtering valid image files and
    preprocessing them to a specified shape. It also handles any unreadable or corrupted images by excluding them.

    Attributes:
    ----------
    folder_path : str
        The path to the folder containing the images.
    shape : tuple
        The desired shape (width, height) to which the images will be resized.
    image_files : list
        A list of valid image file names that can be processed.

    Parameters:
    ----------
    folder_path : str
        The path to the folder containing image files.
    shape : tuple, optional
        The target shape to resize the images to. The default value is (224, 224).
    image_files : list, optional
        A pre-provided list of image file names. If not provided, it will automatically detect valid image files
        (with extensions '.jpg', '.jpeg', '.png', '.gif') in the specified folder.

    Methods:
    -------
    clean_unidentified_images():
        Cleans the dataset by removing images that cause an `UnidentifiedImageError` during loading. This helps ensure
        that only valid, readable images are kept in the dataset.

    __len__():
        Returns the number of valid images in the dataset after cleaning.

    __getitem__(idx):
        Given an index `idx`, retrieves the image file at that index, loads and preprocesses it, and returns the image
        along with its filename.

    """

    def __init__(self, folder_path, shape=(224, 224), image_files=None):
        """
        Initializes the dataset object by setting the folder path and target image shape.
        It also optionally accepts a list of image files to be processed, otherwise detects valid images in the folder.

        Parameters:
        ----------
        folder_path : str
            The directory containing the images.
        shape : tuple, optional
            The target shape to resize the images to. Default is (224, 224).
        image_files : list, optional
            A list of image files to load. If not provided, it will auto-detect valid images from the folder.
        """
        self.folder_path = folder_path
        self.shape = shape

        # If image files are provided, use them; otherwise, detect image files in the folder
        if image_files:
            self.image_files = image_files
        else:
            # List all files in the folder and filter only image files
            self.image_files = [
                f
                for f in os.listdir(folder_path)
                if f.lower().endswith(("jpg", "jpeg", "png", "gif"))
            ]

        # Clean the dataset by removing images that cause errors during loading
        self.clean_unidentified_images()

    def clean_unidentified_images(self):
        """
        Clean the dataset by removing images that cannot be opened due to errors (e.g., `UnidentifiedImageError`).

        This method iterates over the list of detected image files and attempts to open and convert each image to RGB.
        If an image cannot be opened (e.g., due to corruption or unsupported format), it is excluded from the dataset.

        Any image that causes an error will be skipped, and a message will be printed to indicate which file was skipped.
        """
        cleaned_files = []
        # Iterate over the image files and check if they can be opened
        for img_name in self.image_files:
            img_path = os.path.join(self.folder_path, img_name)
            try:
                # Try to open the image and convert it to RGB format
                Image.open(img_path).convert("RGB")
                # If successful, add the image to the cleaned list
                cleaned_files.append(img_name)
            except Exception as e:
                print(f"Skipping {img_name} due to error: {e}")

        # Update the list of image files with only the cleaned files
        self.image_files = cleaned_files

    def __len__(self):
        """
        Returns the number of valid images in the dataset after cleaning.

        Returns:
        -------
        int
            The number of images in the cleaned dataset.
        """
        return len(self.image_files)

    def __getitem__(self, idx):
        """
        Retrieves the image and its filename at the specified index.

        Parameters:
        ----------
        idx : int
            The index of the image to retrieve.

        Returns:
        -------
        tuple
            A tuple containing the image filename and the preprocessed image as a NumPy array or Tensor.

        Raises:
        ------
        IndexError
            If the index is out of bounds for the dataset.
        """
        # Get an item from the list of image files
        img_name = self.image_files[idx]
        # Load and preprocess the image:
        img_path = os.path.join(self.folder_path, img_name)
        img = load_and_preprocess_image(img_path, self.shape)
        # Return the image filename and the preprocessed image
        return img_name, img


def get_embeddings_df(
    batch_size=32,
    path="data/images",
    dataset_name="",
    backbone="resnet50",
    directory="embeddings",
    image_files=None,
):
    """
    Generates embeddings for images in a dataset using a specified backbone model and saves them to a CSV file.

    This function processes images from a given folder in batches, extracts features (embeddings) using a specified
    pre-trained computer vision model, and stores the results in a CSV file. The embeddings can be used for
    downstream tasks such as image retrieval or clustering.

    Parameters:
    ----------
    batch_size : int, optional
        The number of images to process in each batch. Default is 32.
    path : str, optional
        The folder path containing the images. Default is "data/images".
    dataset_name : str, optional
        The name of the dataset to create subdirectories for saving embeddings. Default is an empty string.
    backbone : str, optional
        The name of the backbone model to use for generating embeddings. The default is 'resnet50'.
        Other possible options include models like 'convnext_tiny', 'vit_base', etc.
    directory : str, optional
        The root directory where the embeddings CSV file will be saved. Default is 'embeddings'.
    image_files : list, optional
        A pre-defined list of image file names to process. If not provided, the function will automatically detect
        image files in the `path` directory.

    Returns:
    -------
    None
        The function does not return any value. It saves a CSV file containing image names and their embeddings.

    Side Effects:
    ------------
    - Saves a CSV file in the specified directory containing image file names and their corresponding embeddings.

    Notes:
    ------
    - The images are loaded and preprocessed using the `ImageFolderDataset` class.
    - The embeddings are generated using a pre-trained model from the `FoundationalCVModel` class.
    - The embeddings are saved as a CSV file with the following structure:
        - `ImageName`: The name of the image file.
        - Columns corresponding to the embedding vector (one column per feature).

    Example:
    --------
    >>> get_embeddings_df(batch_size=16, path="data/images", dataset_name='sample_dataset', backbone="resnet50")

    This would generate a CSV file with image embeddings from the 'resnet50' backbone model for images in the "data/images" directory.
    """

    # Create an instance of the ImageFolderDataset class
    dataset = ImageFolderDataset(folder_path=path, image_files=image_files)
    # Create an instance of the FoundationalCVModel class
    model = FoundationalCVModel(backbone)

    img_names = []
    features = []
    # Calculate the number of batches based on the dataset size and batch size
    num_batches = len(dataset) // batch_size + (
        1 if len(dataset) % batch_size != 0 else 0
    )

    # Process images in batches and extract features
    for i in range(0, len(dataset), batch_size):
        # Get the image files and images for the current batch
        batch_files = dataset.image_files[i : i + batch_size]
        batch_imgs = np.array(
            [dataset[j][1] for j in range(i, min(i + batch_size, len(dataset)))]
        )

        # Generate embeddings for the batch of images
        batch_features = model.predict(batch_imgs)

        # Append the image names and features to the lists
        img_names.extend(batch_files)
        features.extend(batch_features)

        if (i // batch_size + 1) % 10 == 0:
            print(f"Batch {i // batch_size + 1}/{num_batches} done")

    # Create a DataFrame with the image names and embeddings
    df = pd.DataFrame({"ImageName": img_names, "Embeddings": features})

    # Split the embeddings into separate columns
    df_aux = pd.DataFrame(df["Embeddings"].tolist())
    df = pd.concat([df["ImageName"], df_aux], axis=1)

    # Save the DataFrame to a CSV file
    if not os.path.exists(directory):
        os.makedirs(directory)

    if not os.path.exists(f"{directory}/{dataset_name}"):
        os.makedirs(f"{directory}/{dataset_name}")

    df.to_csv(f"{directory}/{dataset_name}/Embeddings_{backbone}.csv", index=False)