|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
|
|
|
import pandas as pd |
|
|
import pytest |
|
|
from sklearn.datasets import make_classification |
|
|
from sklearn.metrics import accuracy_score, f1_score |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.preprocessing import LabelEncoder |
|
|
from tensorflow.keras.layers import BatchNormalization, Concatenate, Dense, Dropout |
|
|
from tensorflow.keras.losses import CategoricalCrossentropy |
|
|
from tensorflow.keras.models import Model |
|
|
from tensorflow.keras.optimizers import SGD, Adam |
|
|
|
|
|
from src.classifiers_mlp import MultimodalDataset, create_early_fusion_model, train_mlp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def correlated_sample_data(): |
|
|
""" |
|
|
Fixture to create a correlated synthetic dataset using make_classification for testing. |
|
|
It generates data with 10 text features and 10 image features. |
|
|
Returns: |
|
|
train_df (pd.DataFrame): DataFrame with train data. |
|
|
test_df (pd.DataFrame): DataFrame with test data. |
|
|
""" |
|
|
|
|
|
X, y = make_classification( |
|
|
n_samples=20, n_features=8, n_informative=6, n_classes=3, random_state=42 |
|
|
) |
|
|
|
|
|
|
|
|
feature_names = [f"text_{i}" for i in range(4)] + [ |
|
|
f"image_{i}" for i in range(4, 8) |
|
|
] |
|
|
|
|
|
|
|
|
df = pd.DataFrame(X, columns=feature_names) |
|
|
df["class_id"] = y |
|
|
|
|
|
|
|
|
train_df, test_df = train_test_split(df, test_size=0.3, random_state=42) |
|
|
|
|
|
return train_df, test_df |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def label_encoder(correlated_sample_data): |
|
|
""" |
|
|
Fixture to create a label encoder based on the training data. |
|
|
""" |
|
|
train_df, _ = correlated_sample_data |
|
|
label_encoder = LabelEncoder() |
|
|
label_encoder.fit(train_df["class_id"]) |
|
|
return label_encoder |
|
|
|
|
|
|
|
|
def test_multimodal_dataset_image_only(correlated_sample_data, label_encoder): |
|
|
""" |
|
|
Test the MultimodalDataset class with only image data. |
|
|
""" |
|
|
train_df, test_df = correlated_sample_data |
|
|
|
|
|
|
|
|
image_columns = [f"image_{i}" for i in range(4, 8)] |
|
|
label_column = "class_id" |
|
|
|
|
|
|
|
|
train_dataset = MultimodalDataset( |
|
|
train_df, |
|
|
text_cols=None, |
|
|
image_cols=image_columns, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
|
|
|
|
|
|
assert train_dataset.image_data is not None, "Image data should be instantiated" |
|
|
assert train_dataset.text_data is None, "Text data should be None" |
|
|
|
|
|
|
|
|
(batch_inputs, batch_labels) = train_dataset[0] |
|
|
|
|
|
assert "image" in batch_inputs, "Batch should contain image data" |
|
|
assert "text" not in batch_inputs, "Batch should not contain text data" |
|
|
assert batch_inputs["image"].shape[1] == len(image_columns), ( |
|
|
"Image data shape is incorrect" |
|
|
) |
|
|
assert batch_labels is not None, "Batch should contain labels" |
|
|
assert batch_labels.shape[0] == batch_inputs["image"].shape[0], ( |
|
|
"Labels should match the batch size" |
|
|
) |
|
|
|
|
|
|
|
|
def test_multimodal_dataset_text_only(correlated_sample_data, label_encoder): |
|
|
""" |
|
|
Test the MultimodalDataset class with only text data. |
|
|
""" |
|
|
train_df, test_df = correlated_sample_data |
|
|
|
|
|
|
|
|
text_columns = [f"text_{i}" for i in range(4)] |
|
|
label_column = "class_id" |
|
|
|
|
|
|
|
|
train_dataset = MultimodalDataset( |
|
|
train_df, |
|
|
text_cols=text_columns, |
|
|
image_cols=None, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
|
|
|
|
|
|
assert train_dataset.text_data is not None, "Text data should be instantiated" |
|
|
assert train_dataset.image_data is None, "Image data should be None" |
|
|
|
|
|
|
|
|
(batch_inputs, batch_labels) = train_dataset[0] |
|
|
|
|
|
assert "text" in batch_inputs, "Batch should contain text data" |
|
|
assert "image" not in batch_inputs, "Batch should not contain image data" |
|
|
assert batch_inputs["text"].shape[1] == len(text_columns), ( |
|
|
"Text data shape is incorrect" |
|
|
) |
|
|
assert batch_labels is not None, "Batch should contain labels" |
|
|
assert batch_labels.shape[0] == batch_inputs["text"].shape[0], ( |
|
|
"Labels should match the batch size" |
|
|
) |
|
|
|
|
|
|
|
|
def test_multimodal_dataset_multimodal(correlated_sample_data, label_encoder): |
|
|
""" |
|
|
Test the MultimodalDataset class with both text and image data. |
|
|
""" |
|
|
train_df, test_df = correlated_sample_data |
|
|
|
|
|
|
|
|
text_columns = [f"text_{i}" for i in range(4)] |
|
|
image_columns = [f"image_{i}" for i in range(4, 8)] |
|
|
label_column = "class_id" |
|
|
|
|
|
|
|
|
train_dataset = MultimodalDataset( |
|
|
train_df, |
|
|
text_cols=text_columns, |
|
|
image_cols=image_columns, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
|
|
|
|
|
|
assert train_dataset.text_data is not None, "Text data should be instantiated" |
|
|
assert train_dataset.image_data is not None, "Image data should be instantiated" |
|
|
|
|
|
|
|
|
(batch_inputs, batch_labels) = train_dataset[0] |
|
|
assert "text" in batch_inputs, "Batch should contain text data" |
|
|
assert "image" in batch_inputs, "Batch should contain image data" |
|
|
assert batch_inputs["text"].shape[1] == len(text_columns), ( |
|
|
"Text data shape is incorrect" |
|
|
) |
|
|
assert batch_inputs["image"].shape[1] == len(image_columns), ( |
|
|
"Image data shape is incorrect" |
|
|
) |
|
|
assert batch_labels is not None, "Batch should contain labels" |
|
|
assert ( |
|
|
batch_labels.shape[0] |
|
|
== batch_inputs["text"].shape[0] |
|
|
== batch_inputs["image"].shape[0] |
|
|
), "Labels should match the batch size" |
|
|
|
|
|
|
|
|
def test_create_early_fusion_model_single_modality_image(): |
|
|
""" |
|
|
Test the model creation with only image input or only text input. |
|
|
Ensure the architecture matches expectations. |
|
|
""" |
|
|
text_input_size = None |
|
|
image_input_size = 4 |
|
|
output_size = 3 |
|
|
|
|
|
|
|
|
model = create_early_fusion_model( |
|
|
text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 |
|
|
) |
|
|
|
|
|
|
|
|
assert isinstance(model, Model), "Model should be a Keras Model instance" |
|
|
|
|
|
|
|
|
assert model.input_shape == (None, image_input_size), ( |
|
|
"Input shape should match image input size" |
|
|
) |
|
|
assert model.output_shape == (None, output_size), ( |
|
|
"Output shape should match number of classes" |
|
|
) |
|
|
|
|
|
|
|
|
dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] |
|
|
dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] |
|
|
batchnorm_layers = [ |
|
|
layer for layer in model.layers if isinstance(layer, BatchNormalization) |
|
|
] |
|
|
|
|
|
assert len(dense_layers) == 3, ( |
|
|
"There should be 3 Dense layers (2 hidden + 1 output)" |
|
|
) |
|
|
assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" |
|
|
assert len(batchnorm_layers) > 0, ( |
|
|
"There should be at least 1 BatchNormalization layer" |
|
|
) |
|
|
|
|
|
|
|
|
def test_create_early_fusion_model_single_modality_text(): |
|
|
""" |
|
|
Test the model creation with only image input or only text input. |
|
|
Ensure the architecture matches expectations. |
|
|
""" |
|
|
text_input_size = 4 |
|
|
image_input_size = None |
|
|
output_size = 3 |
|
|
|
|
|
|
|
|
model = create_early_fusion_model( |
|
|
text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 |
|
|
) |
|
|
|
|
|
|
|
|
assert isinstance(model, Model), "Model should be a Keras Model instance" |
|
|
|
|
|
|
|
|
assert model.input_shape == (None, text_input_size), ( |
|
|
"Input shape should match text input size" |
|
|
) |
|
|
assert model.output_shape == (None, output_size), ( |
|
|
"Output shape should match number of classes" |
|
|
) |
|
|
|
|
|
|
|
|
dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] |
|
|
dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] |
|
|
batchnorm_layers = [ |
|
|
layer for layer in model.layers if isinstance(layer, BatchNormalization) |
|
|
] |
|
|
|
|
|
assert len(dense_layers) == 3, ( |
|
|
"There should be 3 Dense layers (2 hidden + 1 output)" |
|
|
) |
|
|
assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" |
|
|
assert len(batchnorm_layers) > 0, ( |
|
|
"There should be at least 1 BatchNormalization layer" |
|
|
) |
|
|
|
|
|
|
|
|
def test_create_early_fusion_model_multimodal(): |
|
|
""" |
|
|
Test the model creation with both text and image input. |
|
|
Ensure the architecture matches expectations. |
|
|
""" |
|
|
text_input_size = 4 |
|
|
image_input_size = 4 |
|
|
output_size = 3 |
|
|
|
|
|
|
|
|
model = create_early_fusion_model( |
|
|
text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 |
|
|
) |
|
|
|
|
|
|
|
|
assert isinstance(model, Model), "Model should be a Keras Model instance" |
|
|
|
|
|
|
|
|
assert model.input_shape == [(None, text_input_size), (None, image_input_size)], ( |
|
|
"Input shape should match both text and image input sizes" |
|
|
) |
|
|
assert model.output_shape == (None, output_size), ( |
|
|
"Output shape should match number of classes" |
|
|
) |
|
|
|
|
|
|
|
|
assert any(isinstance(layer, Concatenate) for layer in model.layers), ( |
|
|
"There should be a Concatenate layer for text and image inputs" |
|
|
) |
|
|
|
|
|
|
|
|
dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] |
|
|
dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] |
|
|
batchnorm_layers = [ |
|
|
layer for layer in model.layers if isinstance(layer, BatchNormalization) |
|
|
] |
|
|
|
|
|
assert len(dense_layers) == 3, ( |
|
|
"There should be 3 Dense layers (2 hidden + 1 output)" |
|
|
) |
|
|
assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" |
|
|
assert len(batchnorm_layers) > 0, ( |
|
|
"There should be at least 1 BatchNormalization layer" |
|
|
) |
|
|
|
|
|
|
|
|
def test_train_mlp_single_modality_image(correlated_sample_data, label_encoder): |
|
|
""" |
|
|
Test the MLP training with only image data. |
|
|
Ensure the model trains and evaluates correctly. |
|
|
""" |
|
|
train_df, test_df = correlated_sample_data |
|
|
|
|
|
|
|
|
image_columns = [f"image_{i}" for i in range(4, 8)] |
|
|
label_column = "class_id" |
|
|
|
|
|
|
|
|
train_dataset = MultimodalDataset( |
|
|
train_df, |
|
|
text_cols=None, |
|
|
image_cols=image_columns, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
test_dataset = MultimodalDataset( |
|
|
test_df, |
|
|
text_cols=None, |
|
|
image_cols=image_columns, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
|
|
|
image_input_size = len(image_columns) |
|
|
output_size = len(label_encoder.classes_) |
|
|
|
|
|
|
|
|
model, test_accuracy, f1, macro_auc = train_mlp( |
|
|
train_loader=train_dataset, |
|
|
test_loader=test_dataset, |
|
|
text_input_size=None, |
|
|
image_input_size=image_input_size, |
|
|
output_size=output_size, |
|
|
num_epochs=1, |
|
|
set_weights=True, |
|
|
adam=True, |
|
|
patience=10, |
|
|
save_results=False, |
|
|
train_model=False, |
|
|
test_mlp_model=False, |
|
|
) |
|
|
|
|
|
|
|
|
assert model is not None, "Model should not be None after training." |
|
|
|
|
|
|
|
|
assert ( |
|
|
isinstance(model.loss, CategoricalCrossentropy) |
|
|
or model.loss == "categorical_crossentropy" |
|
|
), f"Loss function should be categorical crossentropy, but got {model.loss}" |
|
|
|
|
|
|
|
|
assert model.input_shape == (None, image_input_size), ( |
|
|
"Input shape should match image input size" |
|
|
) |
|
|
assert model.output_shape == (None, output_size), ( |
|
|
"Output shape should match number of classes" |
|
|
) |
|
|
|
|
|
|
|
|
assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( |
|
|
f"Optimizer should be Adam or SGD, but got {model.optimizer}" |
|
|
) |
|
|
|
|
|
|
|
|
def test_train_mlp_single_modality_text(correlated_sample_data, label_encoder): |
|
|
""" |
|
|
Test the MLP training with only text data. |
|
|
Ensure the model trains and evaluates correctly. |
|
|
""" |
|
|
train_df, test_df = correlated_sample_data |
|
|
|
|
|
|
|
|
text_columns = [f"text_{i}" for i in range(4)] |
|
|
label_column = "class_id" |
|
|
|
|
|
|
|
|
train_dataset = MultimodalDataset( |
|
|
train_df, |
|
|
text_cols=text_columns, |
|
|
image_cols=None, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
test_dataset = MultimodalDataset( |
|
|
test_df, |
|
|
text_cols=text_columns, |
|
|
image_cols=None, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
|
|
|
text_input_size = len(text_columns) |
|
|
output_size = len(label_encoder.classes_) |
|
|
|
|
|
|
|
|
model, test_accuracy, f1, macro_auc = train_mlp( |
|
|
train_loader=train_dataset, |
|
|
test_loader=test_dataset, |
|
|
text_input_size=text_input_size, |
|
|
image_input_size=None, |
|
|
output_size=output_size, |
|
|
num_epochs=1, |
|
|
set_weights=True, |
|
|
adam=True, |
|
|
patience=10, |
|
|
save_results=False, |
|
|
train_model=False, |
|
|
test_mlp_model=False, |
|
|
) |
|
|
|
|
|
|
|
|
assert model is not None, "Model should not be None after training." |
|
|
|
|
|
|
|
|
assert ( |
|
|
isinstance(model.loss, CategoricalCrossentropy) |
|
|
or model.loss == "categorical_crossentropy" |
|
|
), f"Loss function should be categorical crossentropy, but got {model.loss}" |
|
|
|
|
|
|
|
|
assert model.input_shape == (None, text_input_size), ( |
|
|
"Input shape should match text input size" |
|
|
) |
|
|
assert model.output_shape == (None, output_size), ( |
|
|
"Output shape should match number of classes" |
|
|
) |
|
|
|
|
|
|
|
|
assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( |
|
|
f"Optimizer should be Adam or SGD, but got {model.optimizer}" |
|
|
) |
|
|
|
|
|
|
|
|
def test_train_mlp_multimodal(correlated_sample_data, label_encoder): |
|
|
""" |
|
|
Test the MLP training with class weights for an imbalanced dataset. |
|
|
Ensure class weights are applied correctly and early stopping works. |
|
|
""" |
|
|
train_df, test_df = correlated_sample_data |
|
|
|
|
|
|
|
|
text_columns = [f"text_{i}" for i in range(4)] |
|
|
image_columns = [f"image_{i}" for i in range(4, 8)] |
|
|
label_column = "class_id" |
|
|
|
|
|
|
|
|
train_dataset = MultimodalDataset( |
|
|
train_df, |
|
|
text_cols=text_columns, |
|
|
image_cols=image_columns, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
test_dataset = MultimodalDataset( |
|
|
test_df, |
|
|
text_cols=text_columns, |
|
|
image_cols=image_columns, |
|
|
label_col=label_column, |
|
|
encoder=label_encoder, |
|
|
) |
|
|
|
|
|
text_input_size = len(text_columns) |
|
|
image_input_size = len(image_columns) |
|
|
output_size = len(label_encoder.classes_) |
|
|
|
|
|
|
|
|
model, test_accuracy, f1, macro_auc = train_mlp( |
|
|
train_loader=train_dataset, |
|
|
test_loader=test_dataset, |
|
|
text_input_size=text_input_size, |
|
|
image_input_size=image_input_size, |
|
|
output_size=output_size, |
|
|
num_epochs=1, |
|
|
set_weights=True, |
|
|
adam=True, |
|
|
patience=10, |
|
|
save_results=False, |
|
|
train_model=False, |
|
|
test_mlp_model=False, |
|
|
) |
|
|
|
|
|
|
|
|
assert model is not None, "Model should not be None after training." |
|
|
|
|
|
|
|
|
assert ( |
|
|
isinstance(model.loss, CategoricalCrossentropy) |
|
|
or model.loss == "categorical_crossentropy" |
|
|
), f"Loss function should be categorical crossentropy, but got {model.loss}" |
|
|
|
|
|
|
|
|
assert model.input_shape == [(None, text_input_size), (None, image_input_size)], ( |
|
|
"Input shape should match both text and image input sizes" |
|
|
) |
|
|
assert model.output_shape == (None, output_size), ( |
|
|
"Output shape should match number of classes" |
|
|
) |
|
|
|
|
|
|
|
|
assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( |
|
|
f"Optimizer should be Adam or SGD, but got {model.optimizer}" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_result_files(): |
|
|
""" |
|
|
Test if the result files are created for each modality and have the correct format. |
|
|
""" |
|
|
|
|
|
test_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
|
|
|
multimodal_results_path = os.path.join( |
|
|
test_dir, "../results/multimodal_results.csv" |
|
|
) |
|
|
text_results_path = os.path.join(test_dir, "../results/text_results.csv") |
|
|
image_results_path = os.path.join(test_dir, "../results/image_results.csv") |
|
|
|
|
|
|
|
|
assert os.path.exists(multimodal_results_path), "Multimodal result file is missing!" |
|
|
assert os.path.exists(text_results_path), "Text result file is missing!" |
|
|
assert os.path.exists(image_results_path), "Image result file is missing!" |
|
|
|
|
|
|
|
|
for file_path in [multimodal_results_path, text_results_path, image_results_path]: |
|
|
df = pd.read_csv(file_path) |
|
|
assert not df.empty, f"{file_path} is empty!" |
|
|
assert "Predictions" in df.columns and "True Labels" in df.columns, ( |
|
|
f"{file_path} is not in the correct format!" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_model_performance(): |
|
|
""" |
|
|
Test if the accuracy and F1 score are above the required thresholds. |
|
|
""" |
|
|
|
|
|
test_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
|
|
|
multimodal_results_path = os.path.join( |
|
|
test_dir, "../results/multimodal_results.csv" |
|
|
) |
|
|
text_results_path = os.path.join(test_dir, "../results/text_results.csv") |
|
|
image_results_path = os.path.join(test_dir, "../results/image_results.csv") |
|
|
|
|
|
|
|
|
multimodal_results = pd.read_csv(multimodal_results_path) |
|
|
text_results = pd.read_csv(text_results_path) |
|
|
image_results = pd.read_csv(image_results_path) |
|
|
|
|
|
|
|
|
multimodal_accuracy_threshold = 0.85 |
|
|
multimodal_f1_threshold = 0.80 |
|
|
text_accuracy_threshold = 0.85 |
|
|
text_f1_threshold = 0.80 |
|
|
image_accuracy_threshold = 0.75 |
|
|
image_f1_threshold = 0.70 |
|
|
|
|
|
|
|
|
multimodal_accuracy = accuracy_score( |
|
|
multimodal_results["True Labels"], multimodal_results["Predictions"] |
|
|
) |
|
|
multimodal_f1 = f1_score( |
|
|
multimodal_results["True Labels"], |
|
|
multimodal_results["Predictions"], |
|
|
average="macro", |
|
|
) |
|
|
|
|
|
|
|
|
text_accuracy = accuracy_score( |
|
|
text_results["True Labels"], text_results["Predictions"] |
|
|
) |
|
|
text_f1 = f1_score( |
|
|
text_results["True Labels"], text_results["Predictions"], average="macro" |
|
|
) |
|
|
|
|
|
|
|
|
image_accuracy = accuracy_score( |
|
|
image_results["True Labels"], image_results["Predictions"] |
|
|
) |
|
|
image_f1 = f1_score( |
|
|
image_results["True Labels"], image_results["Predictions"], average="macro" |
|
|
) |
|
|
|
|
|
|
|
|
assert multimodal_accuracy > multimodal_accuracy_threshold, ( |
|
|
f"Multimodal accuracy is below {multimodal_accuracy_threshold}" |
|
|
) |
|
|
assert multimodal_f1 > multimodal_f1_threshold, ( |
|
|
f"Multimodal F1 score is below {multimodal_f1_threshold}" |
|
|
) |
|
|
|
|
|
|
|
|
assert text_accuracy > text_accuracy_threshold, ( |
|
|
f"Text accuracy is below {text_accuracy_threshold}" |
|
|
) |
|
|
assert text_f1 > text_f1_threshold, f"Text F1 score is below {text_f1_threshold}" |
|
|
|
|
|
|
|
|
assert image_accuracy > image_accuracy_threshold, ( |
|
|
f"Image accuracy is below {image_accuracy_threshold}" |
|
|
) |
|
|
assert image_f1 > image_f1_threshold, ( |
|
|
f"Image F1 score is below {image_f1_threshold}" |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
pytest.main() |
|
|
|