|
|
|
|
|
import os |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import psds_eval |
|
|
import sed_eval |
|
|
from psds_eval import PSDSEval, plot_psd_roc |
|
|
|
|
|
|
|
|
def get_event_list_current_file(df, fname): |
|
|
""" |
|
|
Get list of events for a given filename |
|
|
Args: |
|
|
df: pd.DataFrame, the dataframe to search on |
|
|
fname: the filename to extract the value from the dataframe |
|
|
Returns: |
|
|
list of events (dictionaries) for the given filename |
|
|
""" |
|
|
event_file = df[df["filename"] == fname] |
|
|
if len(event_file) == 1: |
|
|
if pd.isna(event_file["event_label"].iloc[0]): |
|
|
event_list_for_current_file = [{"filename": fname}] |
|
|
else: |
|
|
event_list_for_current_file = event_file.to_dict("records") |
|
|
else: |
|
|
event_list_for_current_file = event_file.to_dict("records") |
|
|
|
|
|
return event_list_for_current_file |
|
|
|
|
|
|
|
|
def psds_results(psds_obj): |
|
|
""" Compute psds scores |
|
|
Args: |
|
|
psds_obj: psds_eval.PSDSEval object with operating points. |
|
|
Returns: |
|
|
""" |
|
|
try: |
|
|
psds_score = psds_obj.psds(alpha_ct=0, alpha_st=0, max_efpr=100) |
|
|
print(f"\nPSD-Score (0, 0, 100): {psds_score.value:.5f}") |
|
|
psds_score = psds_obj.psds(alpha_ct=1, alpha_st=0, max_efpr=100) |
|
|
print(f"\nPSD-Score (1, 0, 100): {psds_score.value:.5f}") |
|
|
psds_score = psds_obj.psds(alpha_ct=0, alpha_st=1, max_efpr=100) |
|
|
print(f"\nPSD-Score (0, 1, 100): {psds_score.value:.5f}") |
|
|
except psds_eval.psds.PSDSEvalError as e: |
|
|
print("psds did not work ....") |
|
|
raise EnvironmentError |
|
|
|
|
|
|
|
|
def event_based_evaluation_df( |
|
|
reference, estimated, t_collar=0.200, percentage_of_length=0.2 |
|
|
): |
|
|
""" Calculate EventBasedMetric given a reference and estimated dataframe |
|
|
|
|
|
Args: |
|
|
reference: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
|
|
reference events |
|
|
estimated: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
|
|
estimated events to be compared with reference |
|
|
t_collar: float, in seconds, the number of time allowed on onsets and offsets |
|
|
percentage_of_length: float, between 0 and 1, the percentage of length of the file allowed on the offset |
|
|
Returns: |
|
|
sed_eval.sound_event.EventBasedMetrics with the scores |
|
|
""" |
|
|
|
|
|
evaluated_files = reference["filename"].unique() |
|
|
|
|
|
classes = [] |
|
|
classes.extend(reference.event_label.dropna().unique()) |
|
|
classes.extend(estimated.event_label.dropna().unique()) |
|
|
classes = list(set(classes)) |
|
|
|
|
|
event_based_metric = sed_eval.sound_event.EventBasedMetrics( |
|
|
event_label_list=classes, |
|
|
t_collar=t_collar, |
|
|
percentage_of_length=percentage_of_length, |
|
|
empty_system_output_handling="zero_score", |
|
|
) |
|
|
|
|
|
for fname in evaluated_files: |
|
|
reference_event_list_for_current_file = get_event_list_current_file( |
|
|
reference, fname |
|
|
) |
|
|
estimated_event_list_for_current_file = get_event_list_current_file( |
|
|
estimated, fname |
|
|
) |
|
|
|
|
|
event_based_metric.evaluate( |
|
|
reference_event_list=reference_event_list_for_current_file, |
|
|
estimated_event_list=estimated_event_list_for_current_file, |
|
|
) |
|
|
|
|
|
return event_based_metric |
|
|
|
|
|
|
|
|
def segment_based_evaluation_df(reference, estimated, time_resolution=1.0): |
|
|
""" Calculate SegmentBasedMetrics given a reference and estimated dataframe |
|
|
|
|
|
Args: |
|
|
reference: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
|
|
reference events |
|
|
estimated: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
|
|
estimated events to be compared with reference |
|
|
time_resolution: float, the time resolution of the segment based metric |
|
|
Returns: |
|
|
sed_eval.sound_event.SegmentBasedMetrics with the scores |
|
|
""" |
|
|
evaluated_files = reference["filename"].unique() |
|
|
|
|
|
classes = [] |
|
|
classes.extend(reference.event_label.dropna().unique()) |
|
|
classes.extend(estimated.event_label.dropna().unique()) |
|
|
classes = list(set(classes)) |
|
|
|
|
|
segment_based_metric = sed_eval.sound_event.SegmentBasedMetrics( |
|
|
event_label_list=classes, time_resolution=time_resolution |
|
|
) |
|
|
|
|
|
for fname in evaluated_files: |
|
|
reference_event_list_for_current_file = get_event_list_current_file( |
|
|
reference, fname |
|
|
) |
|
|
estimated_event_list_for_current_file = get_event_list_current_file( |
|
|
estimated, fname |
|
|
) |
|
|
|
|
|
segment_based_metric.evaluate( |
|
|
reference_event_list=reference_event_list_for_current_file, |
|
|
estimated_event_list=estimated_event_list_for_current_file, |
|
|
) |
|
|
|
|
|
return segment_based_metric |
|
|
|
|
|
|
|
|
def compute_sed_eval_metrics(predictions, groundtruth): |
|
|
""" Compute sed_eval metrics event based and segment based with default parameters used in the task. |
|
|
Args: |
|
|
predictions: pd.DataFrame, predictions dataframe |
|
|
groundtruth: pd.DataFrame, groundtruth dataframe |
|
|
Returns: |
|
|
tuple, (sed_eval.sound_event.EventBasedMetrics, sed_eval.sound_event.SegmentBasedMetrics) |
|
|
""" |
|
|
metric_event = event_based_evaluation_df( |
|
|
groundtruth, predictions, t_collar=0.200, percentage_of_length=0.2 |
|
|
) |
|
|
metric_segment = segment_based_evaluation_df( |
|
|
groundtruth, predictions, time_resolution=1.0 |
|
|
) |
|
|
|
|
|
return metric_event, metric_segment |
|
|
|
|
|
|
|
|
def compute_per_intersection_macro_f1( |
|
|
prediction_dfs, |
|
|
ground_truth_file, |
|
|
durations_file, |
|
|
dtc_threshold=0.5, |
|
|
gtc_threshold=0.5, |
|
|
cttc_threshold=0.3, |
|
|
): |
|
|
""" Compute F1-score per intersection, using the defautl |
|
|
Args: |
|
|
prediction_dfs: dict, a dictionary with thresholds keys and predictions dataframe |
|
|
ground_truth_file: pd.DataFrame, the groundtruth dataframe |
|
|
durations_file: pd.DataFrame, the duration dataframe |
|
|
dtc_threshold: float, the parameter used in PSDSEval, percentage of tolerance for groundtruth intersection |
|
|
with predictions |
|
|
gtc_threshold: float, the parameter used in PSDSEval percentage of tolerance for predictions intersection |
|
|
with groundtruth |
|
|
gtc_threshold: float, the parameter used in PSDSEval to know the percentage needed to count FP as cross-trigger |
|
|
|
|
|
Returns: |
|
|
|
|
|
""" |
|
|
gt = pd.read_csv(ground_truth_file, sep="\t") |
|
|
durations = pd.read_csv(durations_file, sep="\t") |
|
|
|
|
|
psds = PSDSEval( |
|
|
ground_truth=gt, |
|
|
metadata=durations, |
|
|
dtc_threshold=dtc_threshold, |
|
|
gtc_threshold=gtc_threshold, |
|
|
cttc_threshold=cttc_threshold, |
|
|
) |
|
|
psds_macro_f1 = [] |
|
|
for threshold in prediction_dfs.keys(): |
|
|
if not prediction_dfs[threshold].empty: |
|
|
threshold_f1, _ = psds.compute_macro_f_score(prediction_dfs[threshold]) |
|
|
else: |
|
|
threshold_f1 = 0 |
|
|
if np.isnan(threshold_f1): |
|
|
threshold_f1 = 0.0 |
|
|
psds_macro_f1.append(threshold_f1) |
|
|
psds_macro_f1 = np.mean(psds_macro_f1) |
|
|
return psds_macro_f1 |
|
|
|
|
|
|
|
|
def compute_psds_from_operating_points( |
|
|
prediction_dfs, |
|
|
ground_truth_file, |
|
|
durations_file, |
|
|
dtc_threshold=0.5, |
|
|
gtc_threshold=0.5, |
|
|
cttc_threshold=0.3, |
|
|
alpha_ct=0, |
|
|
alpha_st=0, |
|
|
max_efpr=100, |
|
|
save_dir=None, |
|
|
): |
|
|
|
|
|
gt = pd.read_csv(ground_truth_file, sep="\t") |
|
|
durations = pd.read_csv(durations_file, sep="\t") |
|
|
psds_eval = PSDSEval( |
|
|
ground_truth=gt, |
|
|
metadata=durations, |
|
|
dtc_threshold=dtc_threshold, |
|
|
gtc_threshold=gtc_threshold, |
|
|
cttc_threshold=cttc_threshold, |
|
|
) |
|
|
|
|
|
for i, k in enumerate(prediction_dfs.keys()): |
|
|
det = prediction_dfs[k] |
|
|
|
|
|
det["index"] = range(1, len(det) + 1) |
|
|
det = det.set_index("index") |
|
|
psds_eval.add_operating_point( |
|
|
det, info={"name": f"Op {i + 1:02d}", "threshold": k} |
|
|
) |
|
|
|
|
|
psds_score = psds_eval.psds(alpha_ct=alpha_ct, alpha_st=alpha_st, max_efpr=max_efpr) |
|
|
|
|
|
if save_dir is not None: |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
|
|
pred_dir = os.path.join( |
|
|
save_dir, |
|
|
f"predictions_dtc{dtc_threshold}_gtc{gtc_threshold}_cttc{cttc_threshold}", |
|
|
) |
|
|
os.makedirs(pred_dir, exist_ok=True) |
|
|
for k in prediction_dfs.keys(): |
|
|
prediction_dfs[k].to_csv( |
|
|
os.path.join(pred_dir, f"predictions_th_{k:.2f}.tsv"), |
|
|
sep="\t", |
|
|
index=False, |
|
|
) |
|
|
|
|
|
plot_psd_roc( |
|
|
psds_score, |
|
|
filename=os.path.join(save_dir, f"PSDS_ct{alpha_ct}_st{alpha_st}_100.png"), |
|
|
) |
|
|
|
|
|
return psds_score.value |
|
|
|