import traceback
from abc import ABC, abstractmethod
from numpy import mean
import inspect
from collections import defaultdict
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from amorf.metrics import average_relative_root_mean_squared_error
from amorf.utils import EarlyStopping, printMessage
[docs]class NeuralNetRegressor:
"""Regressor that uses PyTorch models to predict multiple targets
Raises:
ValueError: If given model ist not instance of pytorch.NN.nodule
Args:
model (pytorch.NN.Module,optional): PyTorch Model to use. Default: None (will use Linear_NN_Model)
batch_size (int,optional): Otherwise training set is split into batches of given size. Default: None
shuffle (bool,optional) : Set to True to have the data reshuffled at every epoch. Default: False
learning_rate (float,optional): learning rate for optimizer. Default: 0.01
use_gpu (bool,optional): Flag that allows usage of cuda cores for calculations. Default: False
patience (int,optional): Stop training after p continous incrementations (stops at training limit if it is not none). Default: 0
training_limit (int,optional): After specified number of epochs training will be terminated, regardless of EarlyStopping stopping. Default: 100
verbosity (int,optional): 0 to only print errors, 1 (default) to print status information. Default: 1
print_after_epochs (int,optional): Specifies after how many epochs training and validation error will be printed to command line. Default: 10
"""
def __init__(self, model=None, batch_size=None, shuffle=False, learning_rate=0.01, use_gpu=False, patience=None, training_limit=1000, verbosity=1, print_after_epochs=100):
self.Device = 'cpu'
if use_gpu is True and torch.cuda.is_available():
torch.set_default_tensor_type('torch.cuda.FloatTensor')
self.Device = "cuda:0"
if model is not None:
if not isinstance(model, nn.Module):
raise ValueError(
'\'{}\' is not a valid instance of pytorch.nn'.format(model))
else:
self.model = model.to(self.Device)
else:
self.model = None
self.loss_fn = nn.MSELoss() #average_relative_root_mean_squared_error # nn.MSELoss()
self.patience = patience
self.learning_rate = learning_rate
self.verbosity = verbosity
self.print_after_epochs = print_after_epochs
self.batch_size = batch_size
self.shuffle = shuffle
self.training_limit = training_limit if isinstance(
training_limit, int) else None
if training_limit is None and patience is None:
raise ValueError('Either training_limit or patience must be set')
[docs] def fit(self, X_train, y_train):
"""Fits the model to the training data set
Args:
X_train (nd.array): Set of descriptive Variables
y_train (nd.array): Set of target Variables
Returns:
NeuralNetRegressor: fitted NeuralNetRegressor
"""
if self.model is None:
self.model = Linear_NN_Model(input_dim=len(X_train[0]), output_dim=len(
y_train[0]), selector='max', p_dropout_1=0.2, p_dropout_2=0.2).to(self.Device)
# Create Validation Set from train Set
X_train, X_validate, y_train, y_validate = train_test_split(
X_train, y_train, test_size=0.1)
X_train_t, y_train_t = self.model.convert_train_set_to_tensor(
X_train, y_train, self.Device)
X_validate_t, y_validate_t = self.model.convert_train_set_to_tensor(
X_validate, y_validate, self.Device)
batch_size = len(
X_train_t) if self.batch_size is None else self.batch_size
train_dataloader = DataLoader(TensorDataset(
X_train_t, y_train_t), batch_size=batch_size, shuffle=self.shuffle)
self.optimizer = optim.Adam(
self.model.parameters(), self.learning_rate)
self.model.train()
if self.patience is not None:
stopper = EarlyStopping(self.patience)
stop = False
epochs = 0
while(stop is False):
# train
for batch in train_dataloader:
batch_X = batch[0]
batch_y = batch[1]
self.optimizer.zero_grad()
y_pred_train = self.model(batch_X)
loss = self.loss_fn(y_pred_train, batch_y)
loss.backward()
self.optimizer.step()
# caculate validation loss an perform early stopping
y_pred_val = self.model(X_validate_t)
validation_loss = self.loss_fn(y_pred_val, y_validate_t)
if epochs % self.print_after_epochs == 0:
y_pred_train = self.model(X_train_t)
validation_loss = average_relative_root_mean_squared_error(
y_pred_val, y_validate_t)
train_loss = average_relative_root_mean_squared_error(
y_pred_train, y_train_t)
printMessage('Epoch: {}\nValidation Loss: {} \nTrain Loss: {}'.format(
epochs, validation_loss, train_loss), self.verbosity)
if self.patience is not None:
stop = stopper.stop(validation_loss, self.model)
if stop is True and self.patience > 1 :
self.model.load_state_dict(stopper.best_model['state_dict'])
epochs += 1
if self.training_limit is not None and self.training_limit <= epochs:
stop = True
y_pred_train = self.model(X_train_t)
final_train_loss = average_relative_root_mean_squared_error(
y_pred_train, y_train_t)
final_validation_loss = average_relative_root_mean_squared_error(
y_pred_val, y_validate_t)
printMessage("Final Epochs: {} \nFinal Train Loss: {}\nFinal Validation Loss: {}".format(
epochs, final_train_loss, final_validation_loss), self.verbosity)
return self
[docs] def predict(self, X_test):
"""Predicts the target variables for the given test set
Args:
X_test (np.ndarray): Test set with descriptive variables
Returns:
np.ndarray: Predicted target variables
"""
X_test_t = self.model.convert_test_set_to_tensor(X_test, self.Device)
self.model.eval()
with torch.no_grad():
y_pred_t = self.model(X_test_t)
return y_pred_t.detach().numpy() if self.Device is 'cpu' else y_pred_t.cpu().detach().numpy()
[docs] def save(self, store_path):
"""Save model and store it at given path
Args:
store_path (string): Path to store model at
"""
try:
torch.save(self.model, store_path)
except Exception:
printMessage(traceback.format_exc(), self.verbosity)
[docs] def load(self, load_path):
"""Load model from path
Args:
load_path (string): Path to saved model
"""
try:
model = torch.load(load_path).to(self.Device)
self.model = model
except Exception:
printMessage(traceback.format_exc(), self.verbosity)
[docs] def score(self, X_test, y_test):
"""Returns Average Relative Root Mean Squared Error for given test data and targets
Args:
X_test (np.ndarray): Test samples
y_test (np.ndarray): True targets
"""
return average_relative_root_mean_squared_error(self.predict(X_test), y_test)
### FOLLOWING FUNCTIONS ARE NECESSARY TO PERFORM GRID SEARCH
def _get_param_names(cls):
"""Get parameter names for the estimator"""
# fetch the constructor or the original constructor before
# deprecation wrapping if any
init = getattr(cls.__init__, 'deprecated_original', cls.__init__)
if init is object.__init__:
# No explicit constructor to introspect
return []
# introspect the constructor arguments to find the model parameters
# to represent
init_signature = inspect.signature(init)
# Consider the constructor parameters excluding 'self'
parameters = [p for p in init_signature.parameters.values()
if p.name != 'self' and p.kind != p.VAR_KEYWORD]
for p in parameters:
if p.kind == p.VAR_POSITIONAL:
raise RuntimeError("scikit-learn estimators should always "
"specify their parameters in the signature"
" of their __init__ (no varargs)."
" %s with constructor %s doesn't "
" follow this convention."
% (cls, init_signature))
# Extract and sort argument names excluding 'self'
return sorted([p.name for p in parameters])
[docs] def get_params(self, deep=True):
"""Get parameters for this estimator.
Parameters
----------
deep : boolean, optional
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
"""
out = dict()
for key in self._get_param_names():
value = getattr(self, key, None)
if deep and hasattr(value, 'get_params'):
deep_items = value.get_params().items()
out.update((key + '__' + k, val) for k, val in deep_items)
out[key] = value
return out
[docs] def set_params(self, **params):
"""Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
``<component>__<parameter>`` so that it's possible to update each
component of a nested object.
Returns
-------
self
"""
if not params:
# Simple optimization to gain speed (inspect is slow)
return self
valid_params = self.get_params(deep=True)
nested_params = defaultdict(dict) # grouped by prefix
for key, value in params.items():
key, delim, sub_key = key.partition('__')
if key not in valid_params:
raise ValueError('Invalid parameter %s for estimator %s. '
'Check the list of available parameters '
'with `estimator.get_params().keys()`.' %
(key, self))
if delim:
nested_params[key][sub_key] = value
else:
setattr(self, key, value)
valid_params[key] = value
for key, sub_params in nested_params.items():
valid_params[key].set_params(**sub_params)
return self
[docs]class AbstractNeuralNet(ABC):
[docs] @abstractmethod
def convert_train_set_to_tensor(self, X_train, y_train, device):
pass
[docs] @abstractmethod
def convert_test_set_to_tensor(self, X_test, device):
pass
[docs]@AbstractNeuralNet.register
class Linear_NN_Model(nn.Module):
def __init__(self, input_dim, output_dim, selector='max', p_dropout_1=0.5, p_dropout_2=0.5):
MIDDLE_LAYER_NEURON_CALCULATION = {
'mean': mean([input_dim, output_dim]),
'max': max([input_dim, output_dim]),
'doubleInput': input_dim * 2
}
super().__init__()
if selector not in MIDDLE_LAYER_NEURON_CALCULATION:
raise ValueError('Selector \'{}\' is not valid')
self.dropout_1 = p_dropout_1
self.dropout_2 = p_dropout_2
middleLayerNeurons = int(MIDDLE_LAYER_NEURON_CALCULATION[selector])
self.batchNorm = nn.BatchNorm1d(middleLayerNeurons)
self.fc1 = nn.Linear(input_dim, middleLayerNeurons)
self.fc2 = nn.Linear(middleLayerNeurons, middleLayerNeurons)
self.fc3 = nn.Linear(middleLayerNeurons, output_dim)
[docs] def forward(self, x):
out = self.fc1(x)
out = self.batchNorm(out)
out = F.relu(out)
out = F.dropout(out, self.dropout_1)
out = self.fc2(out)
out = self.batchNorm(out)
out = F.relu(out)
out = F.dropout(out, self.dropout_2)
out = self.fc3(out)
return out
[docs] def convert_train_set_to_tensor(self, X_train, y_train, device):
X_train_t = torch.from_numpy(X_train).to(device).float()
y_train_t = torch.from_numpy(y_train).to(device).float()
return X_train_t, y_train_t
[docs] def convert_test_set_to_tensor(self, X_test, device):
X_test_t = torch.from_numpy(X_test).to(device).float()
return X_test_t
# Experimental
[docs]@AbstractNeuralNet.register
class Convolutional_NN_Model(nn.Module):
def __init__(self, input_dim, output_dim, p_dropout=0.5):
super().__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.p_dropout = p_dropout
width_out = self.__get_output_size(input_dim)
self.layer1 = nn.Sequential(
nn.Conv1d(1, 24, kernel_size=2, stride=2, padding=0),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv1d(24, 64, kernel_size=2, stride=2, padding=0),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=2))
self.drop_out = nn.Dropout(self.p_dropout)
self.fc1 = nn.Linear(64 * width_out, 100)
self.fc2 = nn.Linear(100, output_dim)
[docs] def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.drop_out(out)
out = self.fc1(out)
out = self.fc2(out)
return out
[docs] def convert_train_set_to_tensor(self, X_train, y_train, device):
X_train_t = torch.from_numpy(X_train).to(device).float().reshape(
len(X_train), 1, len(X_train[0]))
y_train_t = torch.from_numpy(y_train).to(device).float()
return X_train_t, y_train_t
[docs] def convert_test_set_to_tensor(self, X_test, device):
X_test_t = torch.from_numpy(X_test).to(device).float().reshape(
len(X_test), 1, len(X_test[0]))
return X_test_t
def __get_output_size(self, input_dim):
k_c_1 = 2
k_p_1 = 2
k_c_2 = 2
k_p_2 = 2
s_c_1 = 2
s_c_2 = 2
s_p_1 = 2
s_p_2 = 2
wk1 = self.__get_size(input_dim, k_c_1, 0, s_c_1)
wp1 = self.__get_size(wk1, k_p_1, 0, s_p_1)
wk2 = self.__get_size(wp1, k_c_2, 0, s_c_2)
wp2 = self.__get_size(wk2, k_p_2, 0, s_p_2)
return int(wp2)
def __get_size(self, w, k, p=0, s=0):
return ((w - k + 2 * p) / s) + 1