|
|
from torch.utils.data import Dataset |
|
|
import pandas as pd |
|
|
import os |
|
|
import numpy as np |
|
|
import torchaudio |
|
|
import random |
|
|
import torch |
|
|
import glob |
|
|
import h5py |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
def to_mono(mixture, random_ch=False): |
|
|
if mixture.ndim > 1: |
|
|
if not random_ch: |
|
|
mixture = torch.mean(mixture, 0) |
|
|
else: |
|
|
indx = np.random.randint(0, mixture.shape[0] - 1) |
|
|
mixture = mixture[indx] |
|
|
return mixture |
|
|
|
|
|
|
|
|
def pad_audio(audio, target_len, fs): |
|
|
if audio.shape[-1] < target_len: |
|
|
audio = torch.nn.functional.pad( |
|
|
audio, (0, target_len - audio.shape[-1]), mode="constant" |
|
|
) |
|
|
|
|
|
padded_indx = [target_len / len(audio)] |
|
|
onset_s = 0.000 |
|
|
|
|
|
elif len(audio) > target_len: |
|
|
|
|
|
rand_onset = random.randint(0, len(audio) - target_len) |
|
|
audio = audio[rand_onset:rand_onset + target_len] |
|
|
onset_s = round(rand_onset / fs, 3) |
|
|
|
|
|
padded_indx = [target_len / len(audio)] |
|
|
else: |
|
|
|
|
|
onset_s = 0.000 |
|
|
padded_indx = [1.0] |
|
|
|
|
|
offset_s = round(onset_s + (target_len / fs), 3) |
|
|
return audio, onset_s, offset_s, padded_indx |
|
|
|
|
|
|
|
|
def process_labels(df, onset, offset): |
|
|
df["onset"] = df["onset"] - onset |
|
|
df["offset"] = df["offset"] - onset |
|
|
|
|
|
df["onset"] = df.apply(lambda x: max(0, x["onset"]), axis=1) |
|
|
df["offset"] = df.apply(lambda x: min(10, x["offset"]), axis=1) |
|
|
|
|
|
df_new = df[(df.onset < df.offset)] |
|
|
|
|
|
return df_new.drop_duplicates() |
|
|
|
|
|
|
|
|
def read_audio(file, multisrc, random_channel, pad_to): |
|
|
mixture, fs = torchaudio.load(file) |
|
|
|
|
|
if not multisrc: |
|
|
mixture = to_mono(mixture, random_channel) |
|
|
|
|
|
if pad_to is not None: |
|
|
mixture, onset_s, offset_s, padded_indx = pad_audio(mixture, pad_to, fs) |
|
|
else: |
|
|
padded_indx = [1.0] |
|
|
onset_s = None |
|
|
offset_s = None |
|
|
|
|
|
mixture = mixture.float() |
|
|
return mixture, onset_s, offset_s, padded_indx |
|
|
|
|
|
|
|
|
class StronglyAnnotatedSet(Dataset): |
|
|
def __init__( |
|
|
self, |
|
|
audio_folder, |
|
|
tsv_entries, |
|
|
encoder, |
|
|
pad_to=10, |
|
|
fs=16000, |
|
|
return_filename=False, |
|
|
random_channel=False, |
|
|
multisrc=False, |
|
|
feats_pipeline=None, |
|
|
embeddings_hdf5_file=None, |
|
|
embedding_type=None |
|
|
|
|
|
): |
|
|
|
|
|
self.encoder = encoder |
|
|
self.fs = fs |
|
|
self.pad_to = pad_to * fs |
|
|
self.return_filename = return_filename |
|
|
self.random_channel = random_channel |
|
|
self.multisrc = multisrc |
|
|
self.feats_pipeline = feats_pipeline |
|
|
self.embeddings_hdf5_file = embeddings_hdf5_file |
|
|
self.embedding_type = embedding_type |
|
|
assert embedding_type in ["global", "frame", None], "embedding type are either frame or global or None, got {}".format(embedding_type) |
|
|
|
|
|
tsv_entries = tsv_entries.dropna() |
|
|
|
|
|
examples = {} |
|
|
for i, r in tsv_entries.iterrows(): |
|
|
if r["filename"] not in examples.keys(): |
|
|
examples[r["filename"]] = { |
|
|
"mixture": os.path.join(audio_folder, r["filename"]), |
|
|
"events": [], |
|
|
} |
|
|
if not np.isnan(r["onset"]): |
|
|
examples[r["filename"]]["events"].append( |
|
|
{ |
|
|
"event_label": r["event_label"], |
|
|
"onset": r["onset"], |
|
|
"offset": r["offset"], |
|
|
} |
|
|
) |
|
|
else: |
|
|
if not np.isnan(r["onset"]): |
|
|
examples[r["filename"]]["events"].append( |
|
|
{ |
|
|
"event_label": r["event_label"], |
|
|
"onset": r["onset"], |
|
|
"offset": r["offset"], |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
self.examples = examples |
|
|
self.examples_list = list(examples.keys()) |
|
|
|
|
|
if self.embeddings_hdf5_file is not None: |
|
|
assert self.embedding_type is not None, "If you use embeddings you need to specify also the type (global or frame)" |
|
|
|
|
|
self.ex2emb_idx = {} |
|
|
f = h5py.File(self.embeddings_hdf5_file, "r") |
|
|
for i, fname in enumerate(f["filenames"]): |
|
|
self.ex2emb_idx[fname.decode('UTF-8')] = i |
|
|
self._opened_hdf5 = None |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.examples_list) |
|
|
|
|
|
@property |
|
|
def hdf5_file(self): |
|
|
if self._opened_hdf5 is None: |
|
|
self._opened_hdf5 = h5py.File(self.embeddings_hdf5_file, "r") |
|
|
return self._opened_hdf5 |
|
|
|
|
|
def __getitem__(self, item): |
|
|
|
|
|
c_ex = self.examples[self.examples_list[item]] |
|
|
mixture, onset_s, offset_s, padded_indx = read_audio( |
|
|
c_ex["mixture"], self.multisrc, self.random_channel, self.pad_to |
|
|
) |
|
|
|
|
|
|
|
|
labels = c_ex["events"] |
|
|
|
|
|
|
|
|
labels_df = pd.DataFrame(labels) |
|
|
labels_df = process_labels(labels_df, onset_s, offset_s) |
|
|
|
|
|
|
|
|
if not len(labels_df): |
|
|
max_len_targets = self.encoder.n_frames |
|
|
strong = torch.zeros(max_len_targets, len(self.encoder.labels)).float() |
|
|
else: |
|
|
strong = self.encoder.encode_strong_df(labels_df) |
|
|
strong = torch.from_numpy(strong).float() |
|
|
|
|
|
out_args = [mixture, strong.transpose(0, 1), padded_indx] |
|
|
|
|
|
if self.feats_pipeline is not None: |
|
|
|
|
|
feats = self.feats_pipeline(mixture) |
|
|
out_args.append(feats) |
|
|
if self.return_filename: |
|
|
out_args.append(c_ex["mixture"]) |
|
|
|
|
|
if self.embeddings_hdf5_file is not None: |
|
|
|
|
|
name = Path(c_ex["mixture"]).stem |
|
|
index = self.ex2emb_idx[name] |
|
|
|
|
|
if self.embedding_type == "global": |
|
|
embeddings = torch.from_numpy(self.hdf5_file["global_embeddings"][index]).float() |
|
|
elif self.embedding_type == "frame": |
|
|
embeddings = torch.from_numpy(np.stack(self.hdf5_file["frame_embeddings"][index])).float() |
|
|
else: |
|
|
raise NotImplementedError |
|
|
|
|
|
out_args.append(embeddings) |
|
|
|
|
|
return out_args |
|
|
|
|
|
|
|
|
class WeakSet(Dataset): |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
audio_folder, |
|
|
tsv_entries, |
|
|
encoder, |
|
|
pad_to=10, |
|
|
fs=16000, |
|
|
return_filename=False, |
|
|
random_channel=False, |
|
|
multisrc=False, |
|
|
feats_pipeline=None, |
|
|
embeddings_hdf5_file=None, |
|
|
embedding_type=None, |
|
|
|
|
|
): |
|
|
|
|
|
self.encoder = encoder |
|
|
self.fs = fs |
|
|
self.pad_to = pad_to * fs |
|
|
self.return_filename = return_filename |
|
|
self.random_channel = random_channel |
|
|
self.multisrc = multisrc |
|
|
self.feats_pipeline = feats_pipeline |
|
|
self.embeddings_hdf5_file = embeddings_hdf5_file |
|
|
self.embedding_type = embedding_type |
|
|
assert embedding_type in ["global", "frame", |
|
|
None], "embedding type are either frame or global or None, got {}".format( |
|
|
embedding_type) |
|
|
|
|
|
examples = {} |
|
|
for i, r in tsv_entries.iterrows(): |
|
|
|
|
|
if r["filename"] not in examples.keys(): |
|
|
examples[r["filename"]] = { |
|
|
"mixture": os.path.join(audio_folder, r["filename"]), |
|
|
"events": r["event_labels"].split(","), |
|
|
} |
|
|
|
|
|
self.examples = examples |
|
|
self.examples_list = list(examples.keys()) |
|
|
|
|
|
if self.embeddings_hdf5_file is not None: |
|
|
assert self.embedding_type is not None, "If you use embeddings you need to specify also the type (global or frame)" |
|
|
|
|
|
self.ex2emb_idx = {} |
|
|
f = h5py.File(self.embeddings_hdf5_file, "r") |
|
|
for i, fname in enumerate(f["filenames"]): |
|
|
self.ex2emb_idx[fname.decode('UTF-8')] = i |
|
|
self._opened_hdf5 = None |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.examples_list) |
|
|
|
|
|
@property |
|
|
def hdf5_file(self): |
|
|
if self._opened_hdf5 is None: |
|
|
self._opened_hdf5 = h5py.File(self.embeddings_hdf5_file, "r") |
|
|
return self._opened_hdf5 |
|
|
|
|
|
def __getitem__(self, item): |
|
|
file = self.examples_list[item] |
|
|
c_ex = self.examples[file] |
|
|
|
|
|
mixture, _, _, padded_indx = read_audio( |
|
|
c_ex["mixture"], self.multisrc, self.random_channel, self.pad_to |
|
|
) |
|
|
|
|
|
|
|
|
labels = c_ex["events"] |
|
|
|
|
|
max_len_targets = self.encoder.n_frames |
|
|
weak = torch.zeros(max_len_targets, len(self.encoder.labels)) |
|
|
if len(labels): |
|
|
weak_labels = self.encoder.encode_weak(labels) |
|
|
weak[0, :] = torch.from_numpy(weak_labels).float() |
|
|
|
|
|
out_args = [mixture, weak.transpose(0, 1), padded_indx] |
|
|
|
|
|
if self.feats_pipeline is not None: |
|
|
feats = self.feats_pipeline(mixture) |
|
|
out_args.append(feats) |
|
|
|
|
|
if self.return_filename: |
|
|
out_args.append(c_ex["mixture"]) |
|
|
|
|
|
if self.embeddings_hdf5_file is not None: |
|
|
name = Path(c_ex["mixture"]).stem |
|
|
index = self.ex2emb_idx[name] |
|
|
|
|
|
if self.embedding_type == "global": |
|
|
embeddings = torch.from_numpy(self.hdf5_file["global_embeddings"][index]).float() |
|
|
elif self.embedding_type == "frame": |
|
|
embeddings = torch.from_numpy(np.stack(self.hdf5_file["frame_embeddings"][index])).float() |
|
|
else: |
|
|
raise NotImplementedError |
|
|
|
|
|
out_args.append(embeddings) |
|
|
|
|
|
|
|
|
return out_args |
|
|
|
|
|
|
|
|
class UnlabeledSet(Dataset): |
|
|
def __init__( |
|
|
self, |
|
|
unlabeled_folder, |
|
|
encoder, |
|
|
pad_to=10, |
|
|
fs=16000, |
|
|
return_filename=False, |
|
|
random_channel=False, |
|
|
multisrc=False, |
|
|
feats_pipeline=None, |
|
|
embeddings_hdf5_file=None, |
|
|
embedding_type=None, |
|
|
): |
|
|
|
|
|
self.encoder = encoder |
|
|
self.fs = fs |
|
|
self.pad_to = pad_to * fs if pad_to is not None else None |
|
|
self.examples = glob.glob(os.path.join(unlabeled_folder, "*.wav")) |
|
|
self.return_filename = return_filename |
|
|
self.random_channel = random_channel |
|
|
self.multisrc = multisrc |
|
|
self.feats_pipeline = feats_pipeline |
|
|
self.embeddings_hdf5_file = embeddings_hdf5_file |
|
|
self.embedding_type = embedding_type |
|
|
assert embedding_type in ["global", "frame", |
|
|
None], "embedding type are either frame or global or None, got {}".format( |
|
|
embedding_type) |
|
|
|
|
|
if self.embeddings_hdf5_file is not None: |
|
|
assert self.embedding_type is not None, "If you use embeddings you need to specify also the type (global or frame)" |
|
|
|
|
|
self.ex2emb_idx = {} |
|
|
f = h5py.File(self.embeddings_hdf5_file, "r") |
|
|
for i, fname in enumerate(f["filenames"]): |
|
|
self.ex2emb_idx[fname.decode('UTF-8')] = i |
|
|
self._opened_hdf5 = None |
|
|
|
|
|
def __len__(self): |
|
|
return len(self.examples) |
|
|
|
|
|
@property |
|
|
def hdf5_file(self): |
|
|
if self._opened_hdf5 is None: |
|
|
self._opened_hdf5 = h5py.File(self.embeddings_hdf5_file, "r") |
|
|
return self._opened_hdf5 |
|
|
|
|
|
def __getitem__(self, item): |
|
|
c_ex = self.examples[item] |
|
|
|
|
|
mixture, _, _, padded_indx = read_audio( |
|
|
c_ex, self.multisrc, self.random_channel, self.pad_to |
|
|
) |
|
|
|
|
|
max_len_targets = self.encoder.n_frames |
|
|
strong = torch.zeros(max_len_targets, len(self.encoder.labels)).float() |
|
|
out_args = [mixture, strong.transpose(0, 1), padded_indx] |
|
|
if self.feats_pipeline is not None: |
|
|
feats = self.feats_pipeline(mixture) |
|
|
out_args.append(feats) |
|
|
|
|
|
if self.return_filename: |
|
|
out_args.append(c_ex) |
|
|
|
|
|
if self.embeddings_hdf5_file is not None: |
|
|
name = Path(c_ex).stem |
|
|
index = self.ex2emb_idx[name] |
|
|
|
|
|
if self.embedding_type == "global": |
|
|
embeddings = torch.from_numpy(self.hdf5_file["global_embeddings"][index]).float() |
|
|
elif self.embedding_type == "frame": |
|
|
embeddings = torch.from_numpy(np.stack(self.hdf5_file["frame_embeddings"][index])).float() |
|
|
else: |
|
|
raise NotImplementedError |
|
|
|
|
|
out_args.append(embeddings) |
|
|
|
|
|
return out_args |
|
|
|