FlexSED / src /utils /evaluation_measures.py
OpenSound's picture
Upload 544 files
3b6a091 verified
#Adopted from https://github.com/DCASE-REPO/DESED_task
import os
import numpy as np
import pandas as pd
import psds_eval
import sed_eval
from psds_eval import PSDSEval, plot_psd_roc
def get_event_list_current_file(df, fname):
"""
Get list of events for a given filename
Args:
df: pd.DataFrame, the dataframe to search on
fname: the filename to extract the value from the dataframe
Returns:
list of events (dictionaries) for the given filename
"""
event_file = df[df["filename"] == fname]
if len(event_file) == 1:
if pd.isna(event_file["event_label"].iloc[0]):
event_list_for_current_file = [{"filename": fname}]
else:
event_list_for_current_file = event_file.to_dict("records")
else:
event_list_for_current_file = event_file.to_dict("records")
return event_list_for_current_file
def psds_results(psds_obj):
""" Compute psds scores
Args:
psds_obj: psds_eval.PSDSEval object with operating points.
Returns:
"""
try:
psds_score = psds_obj.psds(alpha_ct=0, alpha_st=0, max_efpr=100)
print(f"\nPSD-Score (0, 0, 100): {psds_score.value:.5f}")
psds_score = psds_obj.psds(alpha_ct=1, alpha_st=0, max_efpr=100)
print(f"\nPSD-Score (1, 0, 100): {psds_score.value:.5f}")
psds_score = psds_obj.psds(alpha_ct=0, alpha_st=1, max_efpr=100)
print(f"\nPSD-Score (0, 1, 100): {psds_score.value:.5f}")
except psds_eval.psds.PSDSEvalError as e:
print("psds did not work ....")
raise EnvironmentError
def event_based_evaluation_df(
reference, estimated, t_collar=0.200, percentage_of_length=0.2
):
""" Calculate EventBasedMetric given a reference and estimated dataframe
Args:
reference: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the
reference events
estimated: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the
estimated events to be compared with reference
t_collar: float, in seconds, the number of time allowed on onsets and offsets
percentage_of_length: float, between 0 and 1, the percentage of length of the file allowed on the offset
Returns:
sed_eval.sound_event.EventBasedMetrics with the scores
"""
evaluated_files = reference["filename"].unique()
classes = []
classes.extend(reference.event_label.dropna().unique())
classes.extend(estimated.event_label.dropna().unique())
classes = list(set(classes))
event_based_metric = sed_eval.sound_event.EventBasedMetrics(
event_label_list=classes,
t_collar=t_collar,
percentage_of_length=percentage_of_length,
empty_system_output_handling="zero_score",
)
for fname in evaluated_files:
reference_event_list_for_current_file = get_event_list_current_file(
reference, fname
)
estimated_event_list_for_current_file = get_event_list_current_file(
estimated, fname
)
event_based_metric.evaluate(
reference_event_list=reference_event_list_for_current_file,
estimated_event_list=estimated_event_list_for_current_file,
)
return event_based_metric
def segment_based_evaluation_df(reference, estimated, time_resolution=1.0):
""" Calculate SegmentBasedMetrics given a reference and estimated dataframe
Args:
reference: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the
reference events
estimated: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the
estimated events to be compared with reference
time_resolution: float, the time resolution of the segment based metric
Returns:
sed_eval.sound_event.SegmentBasedMetrics with the scores
"""
evaluated_files = reference["filename"].unique()
classes = []
classes.extend(reference.event_label.dropna().unique())
classes.extend(estimated.event_label.dropna().unique())
classes = list(set(classes))
segment_based_metric = sed_eval.sound_event.SegmentBasedMetrics(
event_label_list=classes, time_resolution=time_resolution
)
for fname in evaluated_files:
reference_event_list_for_current_file = get_event_list_current_file(
reference, fname
)
estimated_event_list_for_current_file = get_event_list_current_file(
estimated, fname
)
segment_based_metric.evaluate(
reference_event_list=reference_event_list_for_current_file,
estimated_event_list=estimated_event_list_for_current_file,
)
return segment_based_metric
def compute_sed_eval_metrics(predictions, groundtruth):
""" Compute sed_eval metrics event based and segment based with default parameters used in the task.
Args:
predictions: pd.DataFrame, predictions dataframe
groundtruth: pd.DataFrame, groundtruth dataframe
Returns:
tuple, (sed_eval.sound_event.EventBasedMetrics, sed_eval.sound_event.SegmentBasedMetrics)
"""
metric_event = event_based_evaluation_df(
groundtruth, predictions, t_collar=0.200, percentage_of_length=0.2
)
metric_segment = segment_based_evaluation_df(
groundtruth, predictions, time_resolution=1.0
)
return metric_event, metric_segment
def compute_per_intersection_macro_f1(
prediction_dfs,
ground_truth_file,
durations_file,
dtc_threshold=0.5,
gtc_threshold=0.5,
cttc_threshold=0.3,
):
""" Compute F1-score per intersection, using the defautl
Args:
prediction_dfs: dict, a dictionary with thresholds keys and predictions dataframe
ground_truth_file: pd.DataFrame, the groundtruth dataframe
durations_file: pd.DataFrame, the duration dataframe
dtc_threshold: float, the parameter used in PSDSEval, percentage of tolerance for groundtruth intersection
with predictions
gtc_threshold: float, the parameter used in PSDSEval percentage of tolerance for predictions intersection
with groundtruth
gtc_threshold: float, the parameter used in PSDSEval to know the percentage needed to count FP as cross-trigger
Returns:
"""
gt = pd.read_csv(ground_truth_file, sep="\t")
durations = pd.read_csv(durations_file, sep="\t")
psds = PSDSEval(
ground_truth=gt,
metadata=durations,
dtc_threshold=dtc_threshold,
gtc_threshold=gtc_threshold,
cttc_threshold=cttc_threshold,
)
psds_macro_f1 = []
for threshold in prediction_dfs.keys():
if not prediction_dfs[threshold].empty:
threshold_f1, _ = psds.compute_macro_f_score(prediction_dfs[threshold])
else:
threshold_f1 = 0
if np.isnan(threshold_f1):
threshold_f1 = 0.0
psds_macro_f1.append(threshold_f1)
psds_macro_f1 = np.mean(psds_macro_f1)
return psds_macro_f1
def compute_psds_from_operating_points(
prediction_dfs,
ground_truth_file,
durations_file,
dtc_threshold=0.5,
gtc_threshold=0.5,
cttc_threshold=0.3,
alpha_ct=0,
alpha_st=0,
max_efpr=100,
save_dir=None,
):
gt = pd.read_csv(ground_truth_file, sep="\t")
durations = pd.read_csv(durations_file, sep="\t")
psds_eval = PSDSEval(
ground_truth=gt,
metadata=durations,
dtc_threshold=dtc_threshold,
gtc_threshold=gtc_threshold,
cttc_threshold=cttc_threshold,
)
for i, k in enumerate(prediction_dfs.keys()):
det = prediction_dfs[k]
# see issue https://github.com/audioanalytic/psds_eval/issues/3
det["index"] = range(1, len(det) + 1)
det = det.set_index("index")
psds_eval.add_operating_point(
det, info={"name": f"Op {i + 1:02d}", "threshold": k}
)
psds_score = psds_eval.psds(alpha_ct=alpha_ct, alpha_st=alpha_st, max_efpr=max_efpr)
if save_dir is not None:
os.makedirs(save_dir, exist_ok=True)
pred_dir = os.path.join(
save_dir,
f"predictions_dtc{dtc_threshold}_gtc{gtc_threshold}_cttc{cttc_threshold}",
)
os.makedirs(pred_dir, exist_ok=True)
for k in prediction_dfs.keys():
prediction_dfs[k].to_csv(
os.path.join(pred_dir, f"predictions_th_{k:.2f}.tsv"),
sep="\t",
index=False,
)
plot_psd_roc(
psds_score,
filename=os.path.join(save_dir, f"PSDS_ct{alpha_ct}_st{alpha_st}_100.png"),
)
return psds_score.value