Multimodal_Product_Classification / src /vision_embeddings_tf.py
iBrokeTheCode's picture
chore: Upload missing project files
238f86d
import os
import warnings
import numpy as np
import pandas as pd
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications import (
DenseNet121,
DenseNet169,
InceptionV3,
ResNet50,
ResNet101,
)
from tensorflow.keras.layers import GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from transformers import TFConvNextV2Model, TFSwinModel, TFViTModel
# 💬 NOTE: Suppress TensorFlow warnings
warnings.filterwarnings("ignore")
tf.get_logger().setLevel("ERROR")
def load_and_preprocess_image(image_path, target_size=(224, 224)):
"""
Load and preprocess an image.
Args:
- image_path (str): Path to the image file.
- target_size (tuple): Desired image size.
Returns:
- np.array: Preprocessed image.
"""
# Open the image using PIL Image.open and convert it to RGB format
img = Image.open(image_path).convert("RGB")
# Resize the image to the target size
img = img.resize(target_size)
# Convert the image to a numpy array and scale the pixel values to [0, 1]
img = np.array(img, dtype=np.float32) / 255.0
return img
class FoundationalCVModel:
"""
A Keras module for loading and using foundational computer vision models.
This class allows you to load and use various foundational computer vision models for tasks like image classification
or feature extraction. The user can choose between evaluation mode (non-trainable model) and fine-tuning mode (trainable model).
Attributes:
----------
backbone_name : str
The name of the foundational CV model to load (e.g., 'resnet50', 'vit_base').
model : keras.Model
The compiled Keras model with the selected backbone.
Parameters:
----------
backbone : str
The name of the foundational CV model to load. The available backbones can include:
- ResNet variants: 'resnet50', 'resnet101'
- DenseNet variants: 'densenet121', 'densenet169'
- InceptionV3: 'inception_v3'
- ConvNextV2 variants: 'convnextv2_tiny', 'convnextv2_base', 'convnextv2_large'
- Swin Transformer variants: 'swin_tiny', 'swin_small', 'swin_base'
- Vision Transformer (ViT) variants: 'vit_base', 'vit_large'
mode : str, optional
The mode of the model, either 'eval' for evaluation or 'fine_tune' for fine-tuning. Default is 'eval'.
Methods:
-------
__init__(self, backbone, mode='eval'):
Initializes the model with the specified backbone and mode.
predict(self, images):
Given a batch of images, performs a forward pass through the model and returns predictions.
Parameters:
----------
images : numpy.ndarray
A batch of images to perform prediction on, with shape (batch_size, 224, 224, 3).
Returns:
-------
numpy.ndarray
Model predictions or extracted features for the provided images.
"""
def __init__(self, backbone, mode="eval", input_shape=(224, 224, 3)):
self.backbone_name = backbone
# Select the backbone from the possible foundational models
input_layer = Input(shape=input_shape)
if backbone == "resnet50":
# Load the ResNet50 model from tensorflow.keras.applications
self.base_model = ResNet50(
include_top=False, weights="imagenet", input_tensor=input_layer
)
elif backbone == "resnet101":
# Load the ResNet101 model from tensorflow.keras.applications
self.base_model = ResNet101(
include_top=False, weights="imagenet", input_tensor=input_layer
)
elif backbone == "densenet121":
# Load the DenseNet121 model from tensorflow.keras.applications
self.base_model = DenseNet121(
include_top=False, weights="imagenet", input_tensor=input_layer
)
elif backbone == "densenet169":
# Load the DenseNet169 model from tensorflow.keras.applications
self.base_model = DenseNet169(
include_top=False, weights="imagenet", input_tensor=input_layer
)
elif backbone == "inception_v3":
# Load the InceptionV3 model from tensorflow.keras.applications
self.base_model = InceptionV3(
include_top=False, weights="imagenet", input_tensor=input_layer
)
elif backbone == "convnextv2_tiny":
# Load the ConvNeXtV2 Tiny model from transformers
self.base_model = TFConvNextV2Model.from_pretrained(
"facebook/convnextv2-tiny-22k-224"
)
elif backbone == "convnextv2_base":
# Load the ConvNeXtV2 Base model from transformers
self.base_model = TFConvNextV2Model.from_pretrained(
"facebook/convnextv2-base-22k-224"
)
elif backbone == "convnextv2_large":
# Load the ConvNeXtV2 Large model from transformers
self.base_model = TFConvNextV2Model.from_pretrained(
"facebook/convnextv2-large-22k-224"
)
elif backbone == "swin_tiny":
# Load the Swin Transformer Tiny model from transformers
self.base_model = TFSwinModel.from_pretrained(
"microsoft/swin-tiny-patch4-window7-224"
)
elif backbone == "swin_small":
# Load the Swin Transformer Small model from transformers
self.base_model = TFSwinModel.from_pretrained(
"microsoft/swin-small-patch4-window7-224"
)
elif backbone == "swin_base":
# Load the Swin Transformer Base model from transformers
self.base_model = TFSwinModel.from_pretrained(
"microsoft/swin-base-patch4-window7-224"
)
elif backbone in ["vit_base", "vit_large"]:
# Load the Vision Transformer (ViT) model from transformers
backbone_path = {
"vit_base": "google/vit-base-patch16-224",
"vit_large": "google/vit-large-patch16-224",
}
self.base_model = TFViTModel.from_pretrained(backbone_path[backbone])
else:
raise ValueError(f"Unsupported backbone model: {backbone}")
if mode == "eval":
# Set the model to evaluation mode (non-trainable)
self.base_model.trainable = False
elif mode == "fine_tune":
self.base_model.trainable = True
# 💬 NOTE: Take into account the model's input requirements. In models from transformers, the input is channels first, but in models from keras.applications, the input is channels last.
# Additionally, the output of the model is different in both cases, we need to get the pooling of the output layer.
# If is a model from transformers:
if backbone in [
"vit_base",
"vit_large",
"convnextv2_tiny",
"convnextv2_base",
"convnextv2_large",
"swin_tiny",
"swin_small",
"swin_base",
]:
# Adjust the input for channels first models within the model
input_layer_transposed = tf.transpose(input_layer, perm=[0, 3, 1, 2])
hf_outputs = self.base_model(input_layer_transposed)
# Get the pooling output of the model "pooler_output"
outputs = hf_outputs.pooler_output # shape (batch_size, hidden_size)
# If is a model from keras.applications
else:
# Get the pooling output of the model
# In this case the pooling layer is not included in the model, we can use a pooling layer such as GlobalAveragePooling2D
x = self.base_model.output
outputs = GlobalAveragePooling2D()(x)
# Create the final model with the input layer and the pooling output
self.model = Model(inputs=input_layer, outputs=outputs)
def get_output_shape(self):
"""
Get the output shape of the model.
Returns:
-------
tuple
The shape of the model's output tensor.
"""
return self.model.output_shape
def predict(self, images):
"""
Predict on a batch of images.
Parameters:
----------
images : numpy.ndarray
A batch of images of shape (batch_size, 224, 224, 3).
Returns:
-------
numpy.ndarray
Predictions or features from the model for the given images.
"""
# Perform a forward pass through the model and return the predictions
images = tf.convert_to_tensor(images, dtype=tf.float32)
# Forward pass (no training)
predictions = self.model(images, training=False)
# Convert back to numpy for usability
return predictions.numpy()
class ImageFolderDataset:
"""
A custom dataset class for loading and preprocessing images from a folder.
This class helps in loading images from a given folder, automatically filtering valid image files and
preprocessing them to a specified shape. It also handles any unreadable or corrupted images by excluding them.
Attributes:
----------
folder_path : str
The path to the folder containing the images.
shape : tuple
The desired shape (width, height) to which the images will be resized.
image_files : list
A list of valid image file names that can be processed.
Parameters:
----------
folder_path : str
The path to the folder containing image files.
shape : tuple, optional
The target shape to resize the images to. The default value is (224, 224).
image_files : list, optional
A pre-provided list of image file names. If not provided, it will automatically detect valid image files
(with extensions '.jpg', '.jpeg', '.png', '.gif') in the specified folder.
Methods:
-------
clean_unidentified_images():
Cleans the dataset by removing images that cause an `UnidentifiedImageError` during loading. This helps ensure
that only valid, readable images are kept in the dataset.
__len__():
Returns the number of valid images in the dataset after cleaning.
__getitem__(idx):
Given an index `idx`, retrieves the image file at that index, loads and preprocesses it, and returns the image
along with its filename.
"""
def __init__(self, folder_path, shape=(224, 224), image_files=None):
"""
Initializes the dataset object by setting the folder path and target image shape.
It also optionally accepts a list of image files to be processed, otherwise detects valid images in the folder.
Parameters:
----------
folder_path : str
The directory containing the images.
shape : tuple, optional
The target shape to resize the images to. Default is (224, 224).
image_files : list, optional
A list of image files to load. If not provided, it will auto-detect valid images from the folder.
"""
self.folder_path = folder_path
self.shape = shape
# If image files are provided, use them; otherwise, detect image files in the folder
if image_files:
self.image_files = image_files
else:
# List all files in the folder and filter only image files
self.image_files = [
f
for f in os.listdir(folder_path)
if f.lower().endswith(("jpg", "jpeg", "png", "gif"))
]
# Clean the dataset by removing images that cause errors during loading
self.clean_unidentified_images()
def clean_unidentified_images(self):
"""
Clean the dataset by removing images that cannot be opened due to errors (e.g., `UnidentifiedImageError`).
This method iterates over the list of detected image files and attempts to open and convert each image to RGB.
If an image cannot be opened (e.g., due to corruption or unsupported format), it is excluded from the dataset.
Any image that causes an error will be skipped, and a message will be printed to indicate which file was skipped.
"""
cleaned_files = []
# Iterate over the image files and check if they can be opened
for img_name in self.image_files:
img_path = os.path.join(self.folder_path, img_name)
try:
# Try to open the image and convert it to RGB format
Image.open(img_path).convert("RGB")
# If successful, add the image to the cleaned list
cleaned_files.append(img_name)
except Exception as e:
print(f"Skipping {img_name} due to error: {e}")
# Update the list of image files with only the cleaned files
self.image_files = cleaned_files
def __len__(self):
"""
Returns the number of valid images in the dataset after cleaning.
Returns:
-------
int
The number of images in the cleaned dataset.
"""
return len(self.image_files)
def __getitem__(self, idx):
"""
Retrieves the image and its filename at the specified index.
Parameters:
----------
idx : int
The index of the image to retrieve.
Returns:
-------
tuple
A tuple containing the image filename and the preprocessed image as a NumPy array or Tensor.
Raises:
------
IndexError
If the index is out of bounds for the dataset.
"""
# Get an item from the list of image files
img_name = self.image_files[idx]
# Load and preprocess the image:
img_path = os.path.join(self.folder_path, img_name)
img = load_and_preprocess_image(img_path, self.shape)
# Return the image filename and the preprocessed image
return img_name, img
def get_embeddings_df(
batch_size=32,
path="data/images",
dataset_name="",
backbone="resnet50",
directory="embeddings",
image_files=None,
):
"""
Generates embeddings for images in a dataset using a specified backbone model and saves them to a CSV file.
This function processes images from a given folder in batches, extracts features (embeddings) using a specified
pre-trained computer vision model, and stores the results in a CSV file. The embeddings can be used for
downstream tasks such as image retrieval or clustering.
Parameters:
----------
batch_size : int, optional
The number of images to process in each batch. Default is 32.
path : str, optional
The folder path containing the images. Default is "data/images".
dataset_name : str, optional
The name of the dataset to create subdirectories for saving embeddings. Default is an empty string.
backbone : str, optional
The name of the backbone model to use for generating embeddings. The default is 'resnet50'.
Other possible options include models like 'convnext_tiny', 'vit_base', etc.
directory : str, optional
The root directory where the embeddings CSV file will be saved. Default is 'embeddings'.
image_files : list, optional
A pre-defined list of image file names to process. If not provided, the function will automatically detect
image files in the `path` directory.
Returns:
-------
None
The function does not return any value. It saves a CSV file containing image names and their embeddings.
Side Effects:
------------
- Saves a CSV file in the specified directory containing image file names and their corresponding embeddings.
Notes:
------
- The images are loaded and preprocessed using the `ImageFolderDataset` class.
- The embeddings are generated using a pre-trained model from the `FoundationalCVModel` class.
- The embeddings are saved as a CSV file with the following structure:
- `ImageName`: The name of the image file.
- Columns corresponding to the embedding vector (one column per feature).
Example:
--------
>>> get_embeddings_df(batch_size=16, path="data/images", dataset_name='sample_dataset', backbone="resnet50")
This would generate a CSV file with image embeddings from the 'resnet50' backbone model for images in the "data/images" directory.
"""
# Create an instance of the ImageFolderDataset class
dataset = ImageFolderDataset(folder_path=path, image_files=image_files)
# Create an instance of the FoundationalCVModel class
model = FoundationalCVModel(backbone)
img_names = []
features = []
# Calculate the number of batches based on the dataset size and batch size
num_batches = len(dataset) // batch_size + (
1 if len(dataset) % batch_size != 0 else 0
)
# Process images in batches and extract features
for i in range(0, len(dataset), batch_size):
# Get the image files and images for the current batch
batch_files = dataset.image_files[i : i + batch_size]
batch_imgs = np.array(
[dataset[j][1] for j in range(i, min(i + batch_size, len(dataset)))]
)
# Generate embeddings for the batch of images
batch_features = model.predict(batch_imgs)
# Append the image names and features to the lists
img_names.extend(batch_files)
features.extend(batch_features)
if (i // batch_size + 1) % 10 == 0:
print(f"Batch {i // batch_size + 1}/{num_batches} done")
# Create a DataFrame with the image names and embeddings
df = pd.DataFrame({"ImageName": img_names, "Embeddings": features})
# Split the embeddings into separate columns
df_aux = pd.DataFrame(df["Embeddings"].tolist())
df = pd.concat([df["ImageName"], df_aux], axis=1)
# Save the DataFrame to a CSV file
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.exists(f"{directory}/{dataset_name}"):
os.makedirs(f"{directory}/{dataset_name}")
df.to_csv(f"{directory}/{dataset_name}/Embeddings_{backbone}.csv", index=False)