from datasets.midlevel import df_get_midlevel_set from torch import optim from torch.utils.data.dataset import random_split from utils import PATH_ANNOTATIONS, PATH_AUDIO import torch import torch.nn.functional as F from torch.utils.data import DataLoader from sklearn import metrics import os from datasets.mtgjamendo import df_get_mtg_set import numpy as np import pytorch_lightning as pl from datasets.shared_data_utils import * def my_loss(y_hat, y): return F.binary_cross_entropy(y_hat, y) def training_step(model, data_batch, batch_nb): x, _, y = data_batch y_hat = model.forward(x) y = y.float() y_hat = y_hat.float() return {'loss': model.my_loss(y_hat, y)} def validation_step(model, data_batch, batch_nb): # print("data_batch", data_batch) x, _, y = data_batch # print("x", x) # print("y", y) y_hat = model.forward(x) y = y.float() y_hat = y_hat.float() #print("y", y) #print("y_hat", y_hat) #rocauc = metrics.roc_auc_score(y.t().cpu(), y_hat.t().cpu(), average='macro') #prauc = metrics.average_precision_score(y.t().cpu(), y_hat.t().cpu(), average='macro') # _, _, fscore, _ = metrics.precision_recall_fscore_support(y.t().cpu(), y_hat.t().cpu()) #fscore = 0. return {'val_loss': model.my_loss(y_hat, y), 'y': y.cpu().numpy(), 'y_hat': y_hat.cpu().numpy(), #'val_rocauc': rocauc, #'val_prauc': prauc, #'val_fscore': fscore } def validation_end(outputs): avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean() y = [] y_hat = [] for output in outputs: y.append(output['y']) y_hat.append(output['y_hat']) y = np.concatenate(y) y_hat = np.concatenate(y_hat) #print(y[0:10]) #print(y_hat[0:10]) rocauc = metrics.roc_auc_score(y, y_hat, average='macro') prauc = metrics.average_precision_score(y, y_hat, average='macro') #_, _, fscore, _ = metrics.precision_recall_fscore_support(y, y_hat, average='macro') fscore = 0. #print('metrics', rocauc, prauc, fscore) #avg_auc = torch.stack([torch.tensor([x['val_rocauc']]) for x in outputs]).mean() #avg_prauc = torch.stack([torch.tensor([x['val_prauc']]) for x in outputs]).mean() #avg_fscore = torch.stack([torch.tensor([x['val_fscore']]) for x in outputs]).mean() return {'val_loss': avg_loss, 'val_rocauc': rocauc, 'val_prauc': prauc, 'val_fscore': fscore} def test_step(model, data_batch, batch_nb): # print("data_batch", data_batch) x, _, y = data_batch # print("x", x) # print("y", y) y_hat = model.forward(x) y = y.float() y_hat = y_hat.float() rocauc = metrics.roc_auc_score(y.t().cpu(), y_hat.t().cpu(), average='macro') prauc = metrics.average_precision_score(y.t().cpu(), y_hat.t().cpu(), average='macro') # _, _, fscore, _ = metrics.precision_recall_fscore_support(y.t().cpu(), y_hat.t().cpu()) fscore = 0. return {'test_loss': model.my_loss(y_hat, y), 'y': y.cpu(), 'y_hat': y_hat.cpu(), #'test_rocauc': rocauc, #'test_prauc': prauc, #'test_fscore': fscore } def test_end(outputs): avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean() y = [] y_hat = [] for output in outputs: y.append(output['y']) y_hat.append(output['y_hat']) y = np.concatenate(y) y_hat = np.concatenate(y_hat) #print(y[0:10]) #print(y_hat[0:10]) rocauc = metrics.roc_auc_score(y, y_hat, average='macro') prauc = metrics.average_precision_score(y, y_hat, average='macro') #_, _, fscore, _ = metrics.precision_recall_fscore_support(y, y_hat, average='macro') fscore = 0. return {'test_loss': avg_loss, 'test_rocauc': rocauc, 'test_prauc': prauc, 'test_fscore': fscore} def tng_dataloader(batch_size=32, augment_options=None): train_csv = os.path.join(PATH_ANNOTATIONS, 'train_processed.tsv') cache_x_name = "_ap_mtgjamendo44k" dataset = df_get_mtg_set('mtgjamendo', train_csv, PATH_AUDIO, cache_x_name, augment_options) return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True) def val_dataloader(batch_size=32, augment_options=None): validation_csv = os.path.join(PATH_ANNOTATIONS, 'validation_processed.tsv') cache_x_name = "_ap_mtgjamendo44k" dataset = df_get_mtg_set('mtgjamendo_val', validation_csv, PATH_AUDIO, cache_x_name, augment_options) return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True) def test_dataloader(batch_size=32): test_csv = os.path.join(PATH_ANNOTATIONS, 'test_processed.tsv') cache_x_name = "_ap_mtgjamendo44k" dataset = df_get_mtg_set('mtgjamendo_test', test_csv, PATH_AUDIO, cache_x_name) return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True) # example config dict base_model_config = { 'data_source':'mtgjamendo', 'training_metrics':['loss'], 'validation_metrics':['loss', 'prauc', 'rocauc'], 'test_metrics':['loss', 'prauc', 'rocauc'] } class BasePtlModel(pl.LightningModule): def __init__(self, config, hparams): super(BasePtlModel, self).__init__() self.data_source = config.get('data_source') self.hparams = hparams self.training_metrics = config.get('training_metrics') self.validation_metrics = config.get('validation_metrics') self.test_metrics = config.get('test_metrics') if self.data_source=='midlevel': dataset, dataset_length = df_get_midlevel_set('midlevel', path_midlevel_annotations, path_midlevel_audio_dir, "_ap_midlevel44k") self.midlevel_trainset, self.midlevel_valset, self.midlevel_testset = \ random_split(dataset, [int(i * dataset_length) for i in [0.7, 0.2, 0.1]]) def _load_model(self, load_from, map_location=None, on_gpu=True): last_epoch = -1 last_ckpt_name = None import re checkpoints = os.listdir(load_from) for name in checkpoints: # ignore hpc ckpts if 'hpc_' in name: continue if '.ckpt' in name: epoch = name.split('epoch_')[1] epoch = int(re.sub('[^0-9]', '', epoch)) if epoch > last_epoch: last_epoch = epoch last_ckpt_name = name # restore last checkpoint if last_ckpt_name is not None: last_ckpt_path = os.path.join(load_from, last_ckpt_name) if on_gpu: if map_location is not None: checkpoint = torch.load(last_ckpt_path, map_location=map_location) else: checkpoint = torch.load(last_ckpt_path) else: checkpoint = torch.load(last_ckpt_path, map_location=lambda storage, loc: storage) self.load_state_dict(checkpoint['state_dict']) def training_step(self, data_batch, batch_i): x, _, y = data_batch y_hat = self.forward(x) y = y.float() y_hat = y_hat.float() return {'loss': self.loss(y_hat, y)} def validation_step(self, data_batch, batch_i): x, _, y = data_batch y_hat = self.forward(x) y = y.float() y_hat = y_hat.float() return { 'val_loss': self.loss(y_hat, y), 'y': y.cpu().numpy(), 'y_hat': y_hat.cpu().numpy() } def test_step(self, data_batch, batch_i): x, _, y = data_batch y_hat = self.forward(x) y = y.float() y_hat = y_hat.float() return { 'test_loss': self.loss(y_hat, y), 'y': y.cpu().numpy(), 'y_hat': y_hat.cpu().numpy() } def validation_end(self, outputs): avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean() y = [] y_hat = [] for output in outputs: y.append(output['y']) y_hat.append(output['y_hat']) y = np.concatenate(y) y_hat = np.concatenate(y_hat) metrics = self._compute_metrics(y, y_hat, self.validation_metrics) metrics['val_loss'] = avg_loss return metrics def test_end(self, outputs): avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean() y = [] y_hat = [] for output in outputs: y.append(output['y']) y_hat.append(output['y_hat']) y = np.concatenate(y) y_hat = np.concatenate(y_hat) metrics = self._compute_metrics(y, y_hat, self.test_metrics) metrics['test_loss'] = avg_loss return metrics def _compute_metrics(self, y, y_hat, metrics_list): metrics_res = {} for metric in metrics_list: # if 'loss' in metric: # works for 'val_loss' etc. # Y, Y_hat = y, y_hat # else: # Y, Y_hat = y.cpu().numpy(), y_hat.cpu().numpy() # # if 'loss' in metric: # metrics_res[metric] = self.loss(Y_hat, Y) Y, Y_hat = y, y_hat if metric in ['rocauc-macro', 'rocauc']: metrics_res[metric] = metrics.roc_auc_score(Y, Y_hat, average='macro') if metric == 'rocauc-micro': metrics_res[metric] = metrics.roc_auc_score(Y, Y_hat, average='micro') if metric in ['prauc-macro', 'prauc']: metrics_res[metric] = metrics.average_precision_score(Y, Y_hat, average='macro') if metric == 'prauc-micro': metrics_res[metric] = metrics.average_precision_score(Y, Y_hat, average='micro') return metrics_res def configure_optimizers(self): optimizer = optim.Adam(self.parameters(), lr=self.hparams.learning_rate) scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-7) return [optimizer], [scheduler] def loss(self, y_hat, y): if self.data_source == 'midlevel': return F.mse_loss(y_hat, y) elif self.data_source == 'mtgjamendo': return F.binary_cross_entropy(y_hat, y) else: raise Exception(f"Loss not implemented for {self.data_source}") @pl.data_loader def tng_dataloader(self): if self.data_source=='mtgjamendo': dataset = df_get_mtg_set('mtgjamendo', path_mtgjamendo_annotations_train, path_mtgjamendo_audio_dir, "_ap_mtgjamendo44k") elif self.data_source=='midlevel': dataset = self.midlevel_trainset else: raise Exception(f"Data source {self.data_source} not defined") return DataLoader(dataset=dataset, batch_size=self.hparams.batch_size, shuffle=True) @pl.data_loader def val_dataloader(self): if self.data_source == 'mtgjamendo': dataset = df_get_mtg_set('mtgjamendo_val', path_mtgjamendo_annotations_val, path_mtgjamendo_audio_dir, "_ap_mtgjamendo44k") elif self.data_source == 'midlevel': dataset = self.midlevel_valset else: raise Exception(f"Data source {self.data_source} not defined") return DataLoader(dataset=dataset, batch_size=self.hparams.batch_size, shuffle=True) @pl.data_loader def test_dataloader(self): if self.data_source == 'mtgjamendo': dataset = df_get_mtg_set('mtgjamendo_test', path_mtgjamendo_annotations_test, path_mtgjamendo_audio_dir, "_ap_mtgjamendo44k") elif self.data_source == 'midlevel': dataset = self.midlevel_testset else: raise Exception(f"Data source {self.data_source} not defined") return DataLoader(dataset=dataset, batch_size=self.hparams.batch_size, shuffle=True)