|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from dcase_util.data import DecisionEncoder |
|
|
|
|
|
|
|
|
class ManyHotEncoder: |
|
|
"""" |
|
|
Adapted after DecisionEncoder.find_contiguous_regions method in |
|
|
https://github.com/DCASE-REPO/dcase_util/blob/master/dcase_util/data/decisions.py |
|
|
|
|
|
Encode labels into numpy arrays where 1 correspond to presence of the class and 0 absence. |
|
|
Multiple 1 can appear on the same line, it is for multi label problem. |
|
|
Args: |
|
|
labels: list, the classes which will be encoded |
|
|
n_frames: int, (Default value = None) only useful for strong labels. The number of frames of a segment. |
|
|
Attributes: |
|
|
labels: list, the classes which will be encoded |
|
|
n_frames: int, only useful for strong labels. The number of frames of a segment. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, labels, audio_len, frame_len, frame_hop, net_pooling=1, fs=16000 |
|
|
): |
|
|
if type(labels) in [np.ndarray, np.array]: |
|
|
labels = labels.tolist() |
|
|
self.labels = labels |
|
|
self.audio_len = audio_len |
|
|
self.frame_len = frame_len |
|
|
self.frame_hop = frame_hop |
|
|
self.fs = fs |
|
|
self.net_pooling = net_pooling |
|
|
n_frames = self.audio_len * self.fs |
|
|
|
|
|
|
|
|
|
|
|
self.n_frames = int(int((n_frames / self.frame_hop)) / self.net_pooling) |
|
|
|
|
|
def encode_weak(self, labels): |
|
|
""" Encode a list of weak labels into a numpy array |
|
|
|
|
|
Args: |
|
|
labels: list, list of labels to encode (to a vector of 0 and 1) |
|
|
|
|
|
Returns: |
|
|
numpy.array |
|
|
A vector containing 1 for each label, and 0 everywhere else |
|
|
""" |
|
|
|
|
|
if type(labels) is str: |
|
|
if labels == "empty": |
|
|
y = np.zeros(len(self.labels)) - 1 |
|
|
return y |
|
|
else: |
|
|
labels = labels.split(",") |
|
|
if type(labels) is pd.DataFrame: |
|
|
if labels.empty: |
|
|
labels = [] |
|
|
elif "event_label" in labels.columns: |
|
|
labels = labels["event_label"] |
|
|
y = np.zeros(len(self.labels)) |
|
|
for label in labels: |
|
|
if not pd.isna(label): |
|
|
i = self.labels.index(label) |
|
|
y[i] = 1 |
|
|
return y |
|
|
|
|
|
def _time_to_frame(self, time): |
|
|
samples = time * self.fs |
|
|
frame = (samples) / self.frame_hop |
|
|
return np.clip(frame / self.net_pooling, a_min=0, a_max=self.n_frames) |
|
|
|
|
|
def _frame_to_time(self, frame): |
|
|
frame = frame * self.net_pooling / (self.fs / self.frame_hop) |
|
|
return np.clip(frame, a_min=0, a_max=self.audio_len) |
|
|
|
|
|
def encode_strong_df(self, label_df): |
|
|
"""Encode a list (or pandas Dataframe or Serie) of strong labels, they correspond to a given filename |
|
|
|
|
|
Args: |
|
|
label_df: pandas DataFrame or Series, contains filename, onset (in frames) and offset (in frames) |
|
|
If only filename (no onset offset) is specified, it will return the event on all the frames |
|
|
onset and offset should be in frames |
|
|
Returns: |
|
|
numpy.array |
|
|
Encoded labels, 1 where the label is present, 0 otherwise |
|
|
""" |
|
|
|
|
|
assert any( |
|
|
[x is not None for x in [self.audio_len, self.frame_len, self.frame_hop]] |
|
|
) |
|
|
|
|
|
samples_len = self.n_frames |
|
|
if type(label_df) is str: |
|
|
if label_df == "empty": |
|
|
y = np.zeros((samples_len, len(self.labels))) - 1 |
|
|
return y |
|
|
y = np.zeros((samples_len, len(self.labels))) |
|
|
if type(label_df) is pd.DataFrame: |
|
|
if {"onset", "offset", "event_label"}.issubset(label_df.columns): |
|
|
for _, row in label_df.iterrows(): |
|
|
if not pd.isna(row["event_label"]): |
|
|
i = self.labels.index(row["event_label"]) |
|
|
onset = int(self._time_to_frame(row["onset"])) |
|
|
offset = int(np.ceil(self._time_to_frame(row["offset"]))) |
|
|
y[ |
|
|
onset:offset, i |
|
|
] = 1 |
|
|
|
|
|
elif type(label_df) in [ |
|
|
pd.Series, |
|
|
list, |
|
|
np.ndarray, |
|
|
]: |
|
|
if type(label_df) is pd.Series: |
|
|
if {"onset", "offset", "event_label"}.issubset( |
|
|
label_df.index |
|
|
): |
|
|
if not pd.isna(label_df["event_label"]): |
|
|
i = self.labels.index(label_df["event_label"]) |
|
|
onset = int(self._time_to_frame(label_df["onset"])) |
|
|
offset = int(np.ceil(self._time_to_frame(label_df["offset"]))) |
|
|
y[onset:offset, i] = 1 |
|
|
return y |
|
|
|
|
|
for event_label in label_df: |
|
|
|
|
|
if type(event_label) is str: |
|
|
if event_label != "": |
|
|
i = self.labels.index(event_label) |
|
|
y[:, i] = 1 |
|
|
|
|
|
|
|
|
elif len(event_label) == 3: |
|
|
if event_label[0] != "": |
|
|
i = self.labels.index(event_label[0]) |
|
|
onset = int(self._time_to_frame(event_label[1])) |
|
|
offset = int(np.ceil(self._time_to_frame(event_label[2]))) |
|
|
y[onset:offset, i] = 1 |
|
|
|
|
|
else: |
|
|
raise NotImplementedError( |
|
|
"cannot encode strong, type mismatch: {}".format( |
|
|
type(event_label) |
|
|
) |
|
|
) |
|
|
|
|
|
else: |
|
|
raise NotImplementedError( |
|
|
"To encode_strong, type is pandas.Dataframe with onset, offset and event_label" |
|
|
"columns, or it is a list or pandas Series of event labels, " |
|
|
"type given: {}".format(type(label_df)) |
|
|
) |
|
|
return y |
|
|
|
|
|
def decode_weak(self, labels): |
|
|
""" Decode the encoded weak labels |
|
|
Args: |
|
|
labels: numpy.array, the encoded labels to be decoded |
|
|
|
|
|
Returns: |
|
|
list |
|
|
Decoded labels, list of string |
|
|
|
|
|
""" |
|
|
result_labels = [] |
|
|
for i, value in enumerate(labels): |
|
|
if value == 1: |
|
|
result_labels.append(self.labels[i]) |
|
|
return result_labels |
|
|
|
|
|
def decode_strong(self, labels): |
|
|
""" Decode the encoded strong labels |
|
|
Args: |
|
|
labels: numpy.array, the encoded labels to be decoded |
|
|
Returns: |
|
|
list |
|
|
Decoded labels, list of list: [[label, onset offset], ...] |
|
|
|
|
|
""" |
|
|
result_labels = [] |
|
|
for i, label_column in enumerate(labels.T): |
|
|
change_indices = DecisionEncoder().find_contiguous_regions(label_column) |
|
|
|
|
|
|
|
|
for row in change_indices: |
|
|
result_labels.append( |
|
|
[ |
|
|
self.labels[i], |
|
|
self._frame_to_time(row[0]), |
|
|
self._frame_to_time(row[1]), |
|
|
] |
|
|
) |
|
|
return result_labels |
|
|
|
|
|
def state_dict(self): |
|
|
return { |
|
|
"labels": self.labels, |
|
|
"audio_len": self.audio_len, |
|
|
"frame_len": self.frame_len, |
|
|
"frame_hop": self.frame_hop, |
|
|
"net_pooling": self.net_pooling, |
|
|
"fs": self.fs, |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def load_state_dict(cls, state_dict): |
|
|
labels = state_dict["labels"] |
|
|
audio_len = state_dict["audio_len"] |
|
|
frame_len = state_dict["frame_len"] |
|
|
frame_hop = state_dict["frame_hop"] |
|
|
net_pooling = state_dict["net_pooling"] |
|
|
fs = state_dict["fs"] |
|
|
return cls(labels, audio_len, frame_len, frame_hop, net_pooling, fs) |
|
|
|