|
|
import os |
|
|
import warnings |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import tensorflow as tf |
|
|
from PIL import Image |
|
|
from tensorflow.keras.applications import ( |
|
|
DenseNet121, |
|
|
DenseNet169, |
|
|
InceptionV3, |
|
|
ResNet50, |
|
|
ResNet101, |
|
|
) |
|
|
from tensorflow.keras.layers import GlobalAveragePooling2D, Input |
|
|
from tensorflow.keras.models import Model |
|
|
from transformers import TFConvNextV2Model, TFSwinModel, TFViTModel |
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
tf.get_logger().setLevel("ERROR") |
|
|
|
|
|
|
|
|
def load_and_preprocess_image(image_path, target_size=(224, 224)): |
|
|
""" |
|
|
Load and preprocess an image. |
|
|
|
|
|
Args: |
|
|
- image_path (str): Path to the image file. |
|
|
- target_size (tuple): Desired image size. |
|
|
|
|
|
Returns: |
|
|
- np.array: Preprocessed image. |
|
|
""" |
|
|
|
|
|
img = Image.open(image_path).convert("RGB") |
|
|
|
|
|
|
|
|
img = img.resize(target_size) |
|
|
|
|
|
|
|
|
img = np.array(img, dtype=np.float32) / 255.0 |
|
|
|
|
|
return img |
|
|
|
|
|
|
|
|
class FoundationalCVModel: |
|
|
""" |
|
|
A Keras module for loading and using foundational computer vision models. |
|
|
|
|
|
This class allows you to load and use various foundational computer vision models for tasks like image classification |
|
|
or feature extraction. The user can choose between evaluation mode (non-trainable model) and fine-tuning mode (trainable model). |
|
|
|
|
|
Attributes: |
|
|
---------- |
|
|
backbone_name : str |
|
|
The name of the foundational CV model to load (e.g., 'resnet50', 'vit_base'). |
|
|
model : keras.Model |
|
|
The compiled Keras model with the selected backbone. |
|
|
|
|
|
Parameters: |
|
|
---------- |
|
|
backbone : str |
|
|
The name of the foundational CV model to load. The available backbones can include: |
|
|
- ResNet variants: 'resnet50', 'resnet101' |
|
|
- DenseNet variants: 'densenet121', 'densenet169' |
|
|
- InceptionV3: 'inception_v3' |
|
|
- ConvNextV2 variants: 'convnextv2_tiny', 'convnextv2_base', 'convnextv2_large' |
|
|
- Swin Transformer variants: 'swin_tiny', 'swin_small', 'swin_base' |
|
|
- Vision Transformer (ViT) variants: 'vit_base', 'vit_large' |
|
|
|
|
|
mode : str, optional |
|
|
The mode of the model, either 'eval' for evaluation or 'fine_tune' for fine-tuning. Default is 'eval'. |
|
|
|
|
|
Methods: |
|
|
------- |
|
|
__init__(self, backbone, mode='eval'): |
|
|
Initializes the model with the specified backbone and mode. |
|
|
|
|
|
predict(self, images): |
|
|
Given a batch of images, performs a forward pass through the model and returns predictions. |
|
|
Parameters: |
|
|
---------- |
|
|
images : numpy.ndarray |
|
|
A batch of images to perform prediction on, with shape (batch_size, 224, 224, 3). |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
numpy.ndarray |
|
|
Model predictions or extracted features for the provided images. |
|
|
""" |
|
|
|
|
|
def __init__(self, backbone, mode="eval", input_shape=(224, 224, 3)): |
|
|
self.backbone_name = backbone |
|
|
|
|
|
|
|
|
input_layer = Input(shape=input_shape) |
|
|
|
|
|
if backbone == "resnet50": |
|
|
|
|
|
self.base_model = ResNet50( |
|
|
include_top=False, weights="imagenet", input_tensor=input_layer |
|
|
) |
|
|
elif backbone == "resnet101": |
|
|
|
|
|
self.base_model = ResNet101( |
|
|
include_top=False, weights="imagenet", input_tensor=input_layer |
|
|
) |
|
|
elif backbone == "densenet121": |
|
|
|
|
|
self.base_model = DenseNet121( |
|
|
include_top=False, weights="imagenet", input_tensor=input_layer |
|
|
) |
|
|
elif backbone == "densenet169": |
|
|
|
|
|
self.base_model = DenseNet169( |
|
|
include_top=False, weights="imagenet", input_tensor=input_layer |
|
|
) |
|
|
elif backbone == "inception_v3": |
|
|
|
|
|
self.base_model = InceptionV3( |
|
|
include_top=False, weights="imagenet", input_tensor=input_layer |
|
|
) |
|
|
elif backbone == "convnextv2_tiny": |
|
|
|
|
|
self.base_model = TFConvNextV2Model.from_pretrained( |
|
|
"facebook/convnextv2-tiny-22k-224" |
|
|
) |
|
|
elif backbone == "convnextv2_base": |
|
|
|
|
|
self.base_model = TFConvNextV2Model.from_pretrained( |
|
|
"facebook/convnextv2-base-22k-224" |
|
|
) |
|
|
elif backbone == "convnextv2_large": |
|
|
|
|
|
self.base_model = TFConvNextV2Model.from_pretrained( |
|
|
"facebook/convnextv2-large-22k-224" |
|
|
) |
|
|
elif backbone == "swin_tiny": |
|
|
|
|
|
self.base_model = TFSwinModel.from_pretrained( |
|
|
"microsoft/swin-tiny-patch4-window7-224" |
|
|
) |
|
|
elif backbone == "swin_small": |
|
|
|
|
|
self.base_model = TFSwinModel.from_pretrained( |
|
|
"microsoft/swin-small-patch4-window7-224" |
|
|
) |
|
|
elif backbone == "swin_base": |
|
|
|
|
|
self.base_model = TFSwinModel.from_pretrained( |
|
|
"microsoft/swin-base-patch4-window7-224" |
|
|
) |
|
|
elif backbone in ["vit_base", "vit_large"]: |
|
|
|
|
|
backbone_path = { |
|
|
"vit_base": "google/vit-base-patch16-224", |
|
|
"vit_large": "google/vit-large-patch16-224", |
|
|
} |
|
|
self.base_model = TFViTModel.from_pretrained(backbone_path[backbone]) |
|
|
else: |
|
|
raise ValueError(f"Unsupported backbone model: {backbone}") |
|
|
|
|
|
if mode == "eval": |
|
|
|
|
|
self.base_model.trainable = False |
|
|
elif mode == "fine_tune": |
|
|
self.base_model.trainable = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if backbone in [ |
|
|
"vit_base", |
|
|
"vit_large", |
|
|
"convnextv2_tiny", |
|
|
"convnextv2_base", |
|
|
"convnextv2_large", |
|
|
"swin_tiny", |
|
|
"swin_small", |
|
|
"swin_base", |
|
|
]: |
|
|
|
|
|
input_layer_transposed = tf.transpose(input_layer, perm=[0, 3, 1, 2]) |
|
|
hf_outputs = self.base_model(input_layer_transposed) |
|
|
|
|
|
|
|
|
outputs = hf_outputs.pooler_output |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
x = self.base_model.output |
|
|
outputs = GlobalAveragePooling2D()(x) |
|
|
|
|
|
|
|
|
self.model = Model(inputs=input_layer, outputs=outputs) |
|
|
|
|
|
def get_output_shape(self): |
|
|
""" |
|
|
Get the output shape of the model. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
tuple |
|
|
The shape of the model's output tensor. |
|
|
""" |
|
|
return self.model.output_shape |
|
|
|
|
|
def predict(self, images): |
|
|
""" |
|
|
Predict on a batch of images. |
|
|
|
|
|
Parameters: |
|
|
---------- |
|
|
images : numpy.ndarray |
|
|
A batch of images of shape (batch_size, 224, 224, 3). |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
numpy.ndarray |
|
|
Predictions or features from the model for the given images. |
|
|
""" |
|
|
|
|
|
images = tf.convert_to_tensor(images, dtype=tf.float32) |
|
|
|
|
|
|
|
|
predictions = self.model(images, training=False) |
|
|
|
|
|
|
|
|
return predictions.numpy() |
|
|
|
|
|
|
|
|
class ImageFolderDataset: |
|
|
""" |
|
|
A custom dataset class for loading and preprocessing images from a folder. |
|
|
|
|
|
This class helps in loading images from a given folder, automatically filtering valid image files and |
|
|
preprocessing them to a specified shape. It also handles any unreadable or corrupted images by excluding them. |
|
|
|
|
|
Attributes: |
|
|
---------- |
|
|
folder_path : str |
|
|
The path to the folder containing the images. |
|
|
shape : tuple |
|
|
The desired shape (width, height) to which the images will be resized. |
|
|
image_files : list |
|
|
A list of valid image file names that can be processed. |
|
|
|
|
|
Parameters: |
|
|
---------- |
|
|
folder_path : str |
|
|
The path to the folder containing image files. |
|
|
shape : tuple, optional |
|
|
The target shape to resize the images to. The default value is (224, 224). |
|
|
image_files : list, optional |
|
|
A pre-provided list of image file names. If not provided, it will automatically detect valid image files |
|
|
(with extensions '.jpg', '.jpeg', '.png', '.gif') in the specified folder. |
|
|
|
|
|
Methods: |
|
|
------- |
|
|
clean_unidentified_images(): |
|
|
Cleans the dataset by removing images that cause an `UnidentifiedImageError` during loading. This helps ensure |
|
|
that only valid, readable images are kept in the dataset. |
|
|
|
|
|
__len__(): |
|
|
Returns the number of valid images in the dataset after cleaning. |
|
|
|
|
|
__getitem__(idx): |
|
|
Given an index `idx`, retrieves the image file at that index, loads and preprocesses it, and returns the image |
|
|
along with its filename. |
|
|
|
|
|
""" |
|
|
|
|
|
def __init__(self, folder_path, shape=(224, 224), image_files=None): |
|
|
""" |
|
|
Initializes the dataset object by setting the folder path and target image shape. |
|
|
It also optionally accepts a list of image files to be processed, otherwise detects valid images in the folder. |
|
|
|
|
|
Parameters: |
|
|
---------- |
|
|
folder_path : str |
|
|
The directory containing the images. |
|
|
shape : tuple, optional |
|
|
The target shape to resize the images to. Default is (224, 224). |
|
|
image_files : list, optional |
|
|
A list of image files to load. If not provided, it will auto-detect valid images from the folder. |
|
|
""" |
|
|
self.folder_path = folder_path |
|
|
self.shape = shape |
|
|
|
|
|
|
|
|
if image_files: |
|
|
self.image_files = image_files |
|
|
else: |
|
|
|
|
|
self.image_files = [ |
|
|
f |
|
|
for f in os.listdir(folder_path) |
|
|
if f.lower().endswith(("jpg", "jpeg", "png", "gif")) |
|
|
] |
|
|
|
|
|
|
|
|
self.clean_unidentified_images() |
|
|
|
|
|
def clean_unidentified_images(self): |
|
|
""" |
|
|
Clean the dataset by removing images that cannot be opened due to errors (e.g., `UnidentifiedImageError`). |
|
|
|
|
|
This method iterates over the list of detected image files and attempts to open and convert each image to RGB. |
|
|
If an image cannot be opened (e.g., due to corruption or unsupported format), it is excluded from the dataset. |
|
|
|
|
|
Any image that causes an error will be skipped, and a message will be printed to indicate which file was skipped. |
|
|
""" |
|
|
cleaned_files = [] |
|
|
|
|
|
for img_name in self.image_files: |
|
|
img_path = os.path.join(self.folder_path, img_name) |
|
|
try: |
|
|
|
|
|
Image.open(img_path).convert("RGB") |
|
|
|
|
|
cleaned_files.append(img_name) |
|
|
except Exception as e: |
|
|
print(f"Skipping {img_name} due to error: {e}") |
|
|
|
|
|
|
|
|
self.image_files = cleaned_files |
|
|
|
|
|
def __len__(self): |
|
|
""" |
|
|
Returns the number of valid images in the dataset after cleaning. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
int |
|
|
The number of images in the cleaned dataset. |
|
|
""" |
|
|
return len(self.image_files) |
|
|
|
|
|
def __getitem__(self, idx): |
|
|
""" |
|
|
Retrieves the image and its filename at the specified index. |
|
|
|
|
|
Parameters: |
|
|
---------- |
|
|
idx : int |
|
|
The index of the image to retrieve. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
tuple |
|
|
A tuple containing the image filename and the preprocessed image as a NumPy array or Tensor. |
|
|
|
|
|
Raises: |
|
|
------ |
|
|
IndexError |
|
|
If the index is out of bounds for the dataset. |
|
|
""" |
|
|
|
|
|
img_name = self.image_files[idx] |
|
|
|
|
|
img_path = os.path.join(self.folder_path, img_name) |
|
|
img = load_and_preprocess_image(img_path, self.shape) |
|
|
|
|
|
return img_name, img |
|
|
|
|
|
|
|
|
def get_embeddings_df( |
|
|
batch_size=32, |
|
|
path="data/images", |
|
|
dataset_name="", |
|
|
backbone="resnet50", |
|
|
directory="embeddings", |
|
|
image_files=None, |
|
|
): |
|
|
""" |
|
|
Generates embeddings for images in a dataset using a specified backbone model and saves them to a CSV file. |
|
|
|
|
|
This function processes images from a given folder in batches, extracts features (embeddings) using a specified |
|
|
pre-trained computer vision model, and stores the results in a CSV file. The embeddings can be used for |
|
|
downstream tasks such as image retrieval or clustering. |
|
|
|
|
|
Parameters: |
|
|
---------- |
|
|
batch_size : int, optional |
|
|
The number of images to process in each batch. Default is 32. |
|
|
path : str, optional |
|
|
The folder path containing the images. Default is "data/images". |
|
|
dataset_name : str, optional |
|
|
The name of the dataset to create subdirectories for saving embeddings. Default is an empty string. |
|
|
backbone : str, optional |
|
|
The name of the backbone model to use for generating embeddings. The default is 'resnet50'. |
|
|
Other possible options include models like 'convnext_tiny', 'vit_base', etc. |
|
|
directory : str, optional |
|
|
The root directory where the embeddings CSV file will be saved. Default is 'embeddings'. |
|
|
image_files : list, optional |
|
|
A pre-defined list of image file names to process. If not provided, the function will automatically detect |
|
|
image files in the `path` directory. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
None |
|
|
The function does not return any value. It saves a CSV file containing image names and their embeddings. |
|
|
|
|
|
Side Effects: |
|
|
------------ |
|
|
- Saves a CSV file in the specified directory containing image file names and their corresponding embeddings. |
|
|
|
|
|
Notes: |
|
|
------ |
|
|
- The images are loaded and preprocessed using the `ImageFolderDataset` class. |
|
|
- The embeddings are generated using a pre-trained model from the `FoundationalCVModel` class. |
|
|
- The embeddings are saved as a CSV file with the following structure: |
|
|
- `ImageName`: The name of the image file. |
|
|
- Columns corresponding to the embedding vector (one column per feature). |
|
|
|
|
|
Example: |
|
|
-------- |
|
|
>>> get_embeddings_df(batch_size=16, path="data/images", dataset_name='sample_dataset', backbone="resnet50") |
|
|
|
|
|
This would generate a CSV file with image embeddings from the 'resnet50' backbone model for images in the "data/images" directory. |
|
|
""" |
|
|
|
|
|
|
|
|
dataset = ImageFolderDataset(folder_path=path, image_files=image_files) |
|
|
|
|
|
model = FoundationalCVModel(backbone) |
|
|
|
|
|
img_names = [] |
|
|
features = [] |
|
|
|
|
|
num_batches = len(dataset) // batch_size + ( |
|
|
1 if len(dataset) % batch_size != 0 else 0 |
|
|
) |
|
|
|
|
|
|
|
|
for i in range(0, len(dataset), batch_size): |
|
|
|
|
|
batch_files = dataset.image_files[i : i + batch_size] |
|
|
batch_imgs = np.array( |
|
|
[dataset[j][1] for j in range(i, min(i + batch_size, len(dataset)))] |
|
|
) |
|
|
|
|
|
|
|
|
batch_features = model.predict(batch_imgs) |
|
|
|
|
|
|
|
|
img_names.extend(batch_files) |
|
|
features.extend(batch_features) |
|
|
|
|
|
if (i // batch_size + 1) % 10 == 0: |
|
|
print(f"Batch {i // batch_size + 1}/{num_batches} done") |
|
|
|
|
|
|
|
|
df = pd.DataFrame({"ImageName": img_names, "Embeddings": features}) |
|
|
|
|
|
|
|
|
df_aux = pd.DataFrame(df["Embeddings"].tolist()) |
|
|
df = pd.concat([df["ImageName"], df_aux], axis=1) |
|
|
|
|
|
|
|
|
if not os.path.exists(directory): |
|
|
os.makedirs(directory) |
|
|
|
|
|
if not os.path.exists(f"{directory}/{dataset_name}"): |
|
|
os.makedirs(f"{directory}/{dataset_name}") |
|
|
|
|
|
df.to_csv(f"{directory}/{dataset_name}/Embeddings_{backbone}.csv", index=False) |
|
|
|