OpenSound's picture
Upload 544 files
3b6a091 verified
from torch.utils.data import Dataset
import pandas as pd
import os
import numpy as np
import torchaudio
import random
import torch
import glob
import h5py
from pathlib import Path
def to_mono(mixture, random_ch=False):
if mixture.ndim > 1: # multi channel
if not random_ch:
mixture = torch.mean(mixture, 0)
else: # randomly select one channel
indx = np.random.randint(0, mixture.shape[0] - 1)
mixture = mixture[indx]
return mixture
def pad_audio(audio, target_len, fs):
if audio.shape[-1] < target_len:
audio = torch.nn.functional.pad(
audio, (0, target_len - audio.shape[-1]), mode="constant"
)
padded_indx = [target_len / len(audio)]
onset_s = 0.000
elif len(audio) > target_len:
rand_onset = random.randint(0, len(audio) - target_len)
audio = audio[rand_onset:rand_onset + target_len]
onset_s = round(rand_onset / fs, 3)
padded_indx = [target_len / len(audio)]
else:
onset_s = 0.000
padded_indx = [1.0]
offset_s = round(onset_s + (target_len / fs), 3)
return audio, onset_s, offset_s, padded_indx
def process_labels(df, onset, offset):
df["onset"] = df["onset"] - onset
df["offset"] = df["offset"] - onset
df["onset"] = df.apply(lambda x: max(0, x["onset"]), axis=1)
df["offset"] = df.apply(lambda x: min(10, x["offset"]), axis=1)
df_new = df[(df.onset < df.offset)]
return df_new.drop_duplicates()
def read_audio(file, multisrc, random_channel, pad_to):
mixture, fs = torchaudio.load(file)
if not multisrc:
mixture = to_mono(mixture, random_channel)
if pad_to is not None:
mixture, onset_s, offset_s, padded_indx = pad_audio(mixture, pad_to, fs)
else:
padded_indx = [1.0]
onset_s = None
offset_s = None
mixture = mixture.float()
return mixture, onset_s, offset_s, padded_indx
class StronglyAnnotatedSet(Dataset):
def __init__(
self,
audio_folder,
tsv_entries,
encoder,
pad_to=10,
fs=16000,
return_filename=False,
random_channel=False,
multisrc=False,
feats_pipeline=None,
embeddings_hdf5_file=None,
embedding_type=None
):
self.encoder = encoder
self.fs = fs
self.pad_to = pad_to * fs
self.return_filename = return_filename
self.random_channel = random_channel
self.multisrc = multisrc
self.feats_pipeline = feats_pipeline
self.embeddings_hdf5_file = embeddings_hdf5_file
self.embedding_type = embedding_type
assert embedding_type in ["global", "frame", None], "embedding type are either frame or global or None, got {}".format(embedding_type)
tsv_entries = tsv_entries.dropna()
examples = {}
for i, r in tsv_entries.iterrows():
if r["filename"] not in examples.keys():
examples[r["filename"]] = {
"mixture": os.path.join(audio_folder, r["filename"]),
"events": [],
}
if not np.isnan(r["onset"]):
examples[r["filename"]]["events"].append(
{
"event_label": r["event_label"],
"onset": r["onset"],
"offset": r["offset"],
}
)
else:
if not np.isnan(r["onset"]):
examples[r["filename"]]["events"].append(
{
"event_label": r["event_label"],
"onset": r["onset"],
"offset": r["offset"],
}
)
# we construct a dictionary for each example
self.examples = examples
self.examples_list = list(examples.keys())
if self.embeddings_hdf5_file is not None:
assert self.embedding_type is not None, "If you use embeddings you need to specify also the type (global or frame)"
# fetch dict of positions for each example
self.ex2emb_idx = {}
f = h5py.File(self.embeddings_hdf5_file, "r")
for i, fname in enumerate(f["filenames"]):
self.ex2emb_idx[fname.decode('UTF-8')] = i
self._opened_hdf5 = None
def __len__(self):
return len(self.examples_list)
@property
def hdf5_file(self):
if self._opened_hdf5 is None:
self._opened_hdf5 = h5py.File(self.embeddings_hdf5_file, "r")
return self._opened_hdf5
def __getitem__(self, item):
c_ex = self.examples[self.examples_list[item]]
mixture, onset_s, offset_s, padded_indx = read_audio(
c_ex["mixture"], self.multisrc, self.random_channel, self.pad_to
)
# labels
labels = c_ex["events"]
# to steps
labels_df = pd.DataFrame(labels)
labels_df = process_labels(labels_df, onset_s, offset_s)
# check if labels exists:
if not len(labels_df):
max_len_targets = self.encoder.n_frames
strong = torch.zeros(max_len_targets, len(self.encoder.labels)).float()
else:
strong = self.encoder.encode_strong_df(labels_df)
strong = torch.from_numpy(strong).float()
out_args = [mixture, strong.transpose(0, 1), padded_indx]
if self.feats_pipeline is not None:
# use this function to extract features in the dataloader and apply possibly some data augm
feats = self.feats_pipeline(mixture)
out_args.append(feats)
if self.return_filename:
out_args.append(c_ex["mixture"])
if self.embeddings_hdf5_file is not None:
name = Path(c_ex["mixture"]).stem
index = self.ex2emb_idx[name]
if self.embedding_type == "global":
embeddings = torch.from_numpy(self.hdf5_file["global_embeddings"][index]).float()
elif self.embedding_type == "frame":
embeddings = torch.from_numpy(np.stack(self.hdf5_file["frame_embeddings"][index])).float()
else:
raise NotImplementedError
out_args.append(embeddings)
return out_args
class WeakSet(Dataset):
def __init__(
self,
audio_folder,
tsv_entries,
encoder,
pad_to=10,
fs=16000,
return_filename=False,
random_channel=False,
multisrc=False,
feats_pipeline=None,
embeddings_hdf5_file=None,
embedding_type=None,
):
self.encoder = encoder
self.fs = fs
self.pad_to = pad_to * fs
self.return_filename = return_filename
self.random_channel = random_channel
self.multisrc = multisrc
self.feats_pipeline = feats_pipeline
self.embeddings_hdf5_file = embeddings_hdf5_file
self.embedding_type = embedding_type
assert embedding_type in ["global", "frame",
None], "embedding type are either frame or global or None, got {}".format(
embedding_type)
examples = {}
for i, r in tsv_entries.iterrows():
if r["filename"] not in examples.keys():
examples[r["filename"]] = {
"mixture": os.path.join(audio_folder, r["filename"]),
"events": r["event_labels"].split(","),
}
self.examples = examples
self.examples_list = list(examples.keys())
if self.embeddings_hdf5_file is not None:
assert self.embedding_type is not None, "If you use embeddings you need to specify also the type (global or frame)"
# fetch dict of positions for each example
self.ex2emb_idx = {}
f = h5py.File(self.embeddings_hdf5_file, "r")
for i, fname in enumerate(f["filenames"]):
self.ex2emb_idx[fname.decode('UTF-8')] = i
self._opened_hdf5 = None
def __len__(self):
return len(self.examples_list)
@property
def hdf5_file(self):
if self._opened_hdf5 is None:
self._opened_hdf5 = h5py.File(self.embeddings_hdf5_file, "r")
return self._opened_hdf5
def __getitem__(self, item):
file = self.examples_list[item]
c_ex = self.examples[file]
mixture, _, _, padded_indx = read_audio(
c_ex["mixture"], self.multisrc, self.random_channel, self.pad_to
)
# labels
labels = c_ex["events"]
# check if labels exists:
max_len_targets = self.encoder.n_frames
weak = torch.zeros(max_len_targets, len(self.encoder.labels))
if len(labels):
weak_labels = self.encoder.encode_weak(labels)
weak[0, :] = torch.from_numpy(weak_labels).float()
out_args = [mixture, weak.transpose(0, 1), padded_indx]
if self.feats_pipeline is not None:
feats = self.feats_pipeline(mixture)
out_args.append(feats)
if self.return_filename:
out_args.append(c_ex["mixture"])
if self.embeddings_hdf5_file is not None:
name = Path(c_ex["mixture"]).stem
index = self.ex2emb_idx[name]
if self.embedding_type == "global":
embeddings = torch.from_numpy(self.hdf5_file["global_embeddings"][index]).float()
elif self.embedding_type == "frame":
embeddings = torch.from_numpy(np.stack(self.hdf5_file["frame_embeddings"][index])).float()
else:
raise NotImplementedError
out_args.append(embeddings)
return out_args
class UnlabeledSet(Dataset):
def __init__(
self,
unlabeled_folder,
encoder,
pad_to=10,
fs=16000,
return_filename=False,
random_channel=False,
multisrc=False,
feats_pipeline=None,
embeddings_hdf5_file=None,
embedding_type=None,
):
self.encoder = encoder
self.fs = fs
self.pad_to = pad_to * fs if pad_to is not None else None
self.examples = glob.glob(os.path.join(unlabeled_folder, "*.wav"))
self.return_filename = return_filename
self.random_channel = random_channel
self.multisrc = multisrc
self.feats_pipeline = feats_pipeline
self.embeddings_hdf5_file = embeddings_hdf5_file
self.embedding_type = embedding_type
assert embedding_type in ["global", "frame",
None], "embedding type are either frame or global or None, got {}".format(
embedding_type)
if self.embeddings_hdf5_file is not None:
assert self.embedding_type is not None, "If you use embeddings you need to specify also the type (global or frame)"
# fetch dict of positions for each example
self.ex2emb_idx = {}
f = h5py.File(self.embeddings_hdf5_file, "r")
for i, fname in enumerate(f["filenames"]):
self.ex2emb_idx[fname.decode('UTF-8')] = i
self._opened_hdf5 = None
def __len__(self):
return len(self.examples)
@property
def hdf5_file(self):
if self._opened_hdf5 is None:
self._opened_hdf5 = h5py.File(self.embeddings_hdf5_file, "r")
return self._opened_hdf5
def __getitem__(self, item):
c_ex = self.examples[item]
mixture, _, _, padded_indx = read_audio(
c_ex, self.multisrc, self.random_channel, self.pad_to
)
max_len_targets = self.encoder.n_frames
strong = torch.zeros(max_len_targets, len(self.encoder.labels)).float()
out_args = [mixture, strong.transpose(0, 1), padded_indx]
if self.feats_pipeline is not None:
feats = self.feats_pipeline(mixture)
out_args.append(feats)
if self.return_filename:
out_args.append(c_ex)
if self.embeddings_hdf5_file is not None:
name = Path(c_ex).stem
index = self.ex2emb_idx[name]
if self.embedding_type == "global":
embeddings = torch.from_numpy(self.hdf5_file["global_embeddings"][index]).float()
elif self.embedding_type == "frame":
embeddings = torch.from_numpy(np.stack(self.hdf5_file["frame_embeddings"][index])).float()
else:
raise NotImplementedError
out_args.append(embeddings)
return out_args