Source code for amorf.problemTransformation

from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import GradientBoostingRegressor, AdaBoostRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
import xgboost as xgb
from sklearn.neural_network import MLPRegressor

# AutoEncoderRegression
from amorf.utils import EarlyStopping, printMessage
import torch
from torch import nn
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from amorf.metrics import average_relative_root_mean_squared_error


[docs]class SingleTargetMethod: """ Performs regression for each target variable separately. This method is a wrapper around scikit learns MultiOutputRegressor class. It has some estimators readily provided and allows for custom estimators to be used. Args: selector (string): Can be one of the following linear', 'kneighbors', 'adaboost', 'gradientboost', 'mlp', 'svr', 'xgb' custom_regressor (object): Custom Estimator that must implement 'fit()' and 'predict()' function. Raises: Warning: If Custom Regressor is not valid, default estimator will be used instead ValueError: If selector is not a valid value """ def __init__(self, selector='gradientboost', custom_regressor=None): super().__init__() ESTIMATORS = { 'linear': LinearRegression(), 'kneighbors': KNeighborsRegressor(), 'adaboost': AdaBoostRegressor(), 'gradientboost': GradientBoostingRegressor(), 'mlp': MLPRegressor(solver='adam', alpha=1e-5, hidden_layer_sizes=(15, ), max_iter=1000, random_state=1), 'svr': SVR(gamma='auto'), 'xgb': xgb.XGBRegressor(verbosity=0, objective='reg:squarederror', colsample_bytree=1, learning_rate=0.2, max_depth=6, alpha=10, n_estimators=10) } if custom_regressor is not None and _implements_SciKitLearn_API(custom_regressor): try: self.MORegressor = MultiOutputRegressor(custom_regressor) finally: pass return elif isinstance(selector, str) and selector.lower() in ESTIMATORS: self.MORegressor = MultiOutputRegressor( ESTIMATORS[selector.lower()]) if custom_regressor is not None: raise Warning('\'{}\' is not valid regressor using \'{}\' instead'.format( custom_regressor, selector)) else: raise ValueError( '\'{}\' is not a valid selector for SingleTargetMethod'.format(selector))
[docs] def fit(self, X_train, y_train): """Fits the estimator to the training data Args: X_train (np.ndarray): Training set descriptive variables y_train (np.ndarray): Training set target variables Returns: [sklearn.MultiOutputRegressor]: Trained estimator """ self.MORegressor.fit(X_train, y_train) return self.MORegressor
[docs] def predict(self, X_test): """Predicts the target variables for a given set of descriptive variables Args: X_test (np.ndarray): Array with descriptive variables Returns: np.ndarray: Array with predicted target variables """ result = self.MORegressor.predict(X_test) return result
# FIXME: Wrong Output (100..0..100)
[docs]class AutoEncoderRegression: """Regressor that uses an Autoencoder to reduce dimensionality of target variables Raises: Warning: If Custom Regressor is not valid, default estimator will be used instead ValueError: If selector is not a valid value Args: regressor (string,optional): Can be one of the following linear', 'kneighbors', 'adaboost', 'gradientboost', 'mlp', 'svr', 'xgb'. Default: 'gradientboost' custom_regressor (object,optional): Custom Estimator that must implement 'fit()' and 'predict()' function. Default: None batch_size (int,optional): Otherwise training set is split into batches of given size. Default: None shuffle (bool,optional) Set to True to have the data reshuffled at every epoch. Default: False learning_rate (float,optional): Learning rate for optimizer. Default: 1e-3 use_gpu (bool,optional): Flag that allows usage of cuda cores for calculations. Default: False patience (int,optional): Stop training after p continous incrementations. Default: None training_limit (int,optional): After specified number of epochs training will be terminated, regardless of EarlyStopping stopping. Default: 100 verbosity (int,optional): 0 to only print errors, 1 (default) to print status information. Default: 1 print_after_epochs (int,optional): Specifies after how many epochs training and validation loss will be printed to command line. Default: 500 """ def __init__(self, regressor='gradientboost', custom_regressor=None, batch_size=None, shuffle=False, learning_rate=1e-3, use_gpu=False, patience=None, training_limit=100, verbosity=1, print_after_epochs=500): self.learning_rate = learning_rate self.path = ".autoncoder_bestmodel_validation" self.print_after_epochs = print_after_epochs self.patience = patience self.batch_size = batch_size self.shuffle = shuffle self.training_limit = training_limit self.verbosity = verbosity self.Device = 'cpu' if use_gpu is True and torch.cuda.is_available(): torch.set_default_tensor_type('torch.cuda.FloatTensor') self.Device = "cuda:0" if training_limit is None and patience is None: raise ValueError('Either training_limit or patience must be set') ESTIMATORS = { 'linear': LinearRegression(), 'kneighbors': KNeighborsRegressor(), 'adaboost': AdaBoostRegressor(), 'gradientboost': GradientBoostingRegressor(), 'mlp': MLPRegressor(solver='adam', alpha=1e-5, hidden_layer_sizes=(15,), max_iter=1000, random_state=1), 'svr': SVR(gamma='auto'), 'xgb': xgb.XGBRegressor(verbosity=0, objective='reg:squarederror', colsample_bytree=1, learning_rate=0.2, max_depth=6, alpha=10, n_estimators=10) } if custom_regressor is not None and _implements_SciKitLearn_API(custom_regressor): try: self.regressor = custom_regressor finally: pass return elif isinstance(regressor, str) and regressor.lower() in ESTIMATORS: self.regressor = ESTIMATORS[regressor.lower()] if custom_regressor is not None: raise Warning('\'{}\' is not valid regressor using \'{}\' instead'.format( custom_regressor, regressor)) else: raise ValueError( '\'{}\' is not a valid selector for AutoEncoderRegression'.format(regressor))
[docs] def fit(self, X_train, y_train): """Fits the model to the training data set Trains an AutoEncoder to encode multidimensional target variables into scalar. The resulting data set is used to train the given regressor to predict these scalars. Args: X_train (nd.array): Set of descriptive Variables y_train (nd.array): Set of target Variables Returns: AutoEncoderRegressor: fitted AutoEncoderRegressor """ n_targets = len(y_train[0]) X_train, X_val, y_train, y_val = train_test_split( X_train, y_train, test_size=0.1) y_train_t = torch.tensor(y_train, dtype=torch.float).to(self.Device) y_validate_t = torch.tensor(y_val, dtype=torch.float).to(self.Device) model = autoencoder(n_targets).to(self.Device) best_model, best_score = None, np.inf criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=self.learning_rate) val_losses = [] if self.patience is not None: stopper = EarlyStopping(self.patience) stop = False epochs = 0 self.batch_size = len( y_train_t) if self.batch_size is None else self.batch_size train_dataloader = DataLoader(TensorDataset( y_train_t), batch_size=self.batch_size, shuffle=self.shuffle) while(stop is False): model.train() for batch in train_dataloader: batch_y = batch[0] # ===================forward===================== output = model(batch_y) loss = criterion(output, batch_y) # ===================backward==================== optimizer.zero_grad() loss.backward() optimizer.step() # ===================validate======================== model.eval() y_pred_validate = model(y_validate_t) validation_loss = criterion(y_pred_validate, y_validate_t) if validation_loss < best_score: best_score = validation_loss torch.save(model.state_dict(), self.path) if self.patience is not None: stop = stopper.stop(validation_loss, model) if stop is True and self.patience > 1: model.load_state_dict(stopper.best_model['state_dict']) # ===================log======================== if epochs % self.print_after_epochs == 0: printMessage('Epoch {}\nValidation Loss: {}\nTrain Loss:{}'.format( epochs, loss, validation_loss), self.verbosity) epochs += 1 if self.training_limit is not None and self.training_limit <= epochs: stop = True y_pred_train = model(y_train_t) final_train_loss = criterion(y_pred_train, y_train_t) final_validation_loss = criterion(y_pred_validate, y_validate_t) printMessage("Final Epochs: {} \nFinal Train Loss: {}\nFinal Validation Loss: {}".format( epochs, final_train_loss, final_validation_loss), self.verbosity) self.best_model = autoencoder(n_targets) self.best_model.load_state_dict(torch.load(self.path)) self.best_model.to(self.Device) y_enc_train = self.best_model.encoder(y_train_t) if self.Device is 'cpu': self.regressor.fit(X_train, y_enc_train.detach().numpy().ravel()) else: self.regressor.fit( X_train, y_enc_train.cpu().detach().numpy().ravel()) return self
[docs] def predict(self, X_test): """Predicts the encoded target variables and decodes them for the given test set Args: X_test (np.ndarray): Test set with descriptive variables Returns: np.ndarray: Predicted target variables """ y_pred_test = self.regressor.predict(X_test) y_pred_test_t = torch.tensor( y_pred_test, dtype=torch.float).unsqueeze(1).to(self.Device) y_pred_dec = self.best_model.decoder(y_pred_test_t) return y_pred_dec.detach().numpy() if self.Device is 'cpu' else y_pred_dec.cpu().detach().numpy()
def __split_training_set_to_batches(self, y_train_t, batch_size): if batch_size is None: return torch.split(y_train_t, len(y_train_t)) else: return torch.split(y_train_t, batch_size)
[docs] def score(self, X_test, y_test): """Returns Average Relative Root Mean Squared Error for given test data and targets Args: X_test (np.ndarray): Test samples y_test (np.ndarray): True targets """ return average_relative_root_mean_squared_error(self.predict(X_test), y_test)
[docs]class autoencoder(nn.Module): def __init__(self, n_targets): super(autoencoder, self).__init__() self.encoder = nn.Sequential( nn.Linear(n_targets, 1024), nn.ReLU(True), nn.Linear(1024, 512), nn.ReLU(True), nn.Linear(512, 256), nn.ReLU(True), nn.Linear(256, 128), nn.ReLU(True), nn.Linear(128, 64), nn.BatchNorm1d(64), nn.ReLU(True), nn.Linear(64, 12), nn.ReLU(True), # nn.Dropout(0.1), nn.Linear(12, 1)) self.decoder = nn.Sequential( nn.Linear(1, 12), # nn.Dropout(0.1), nn.ReLU(True), nn.Linear(12, 64), nn.ReLU(True), nn.BatchNorm1d(64), nn.Linear(64, 128), nn.ReLU(True), nn.Linear(128, 256), nn.ReLU(True), nn.Linear(256, 512), nn.ReLU(True), nn.Linear(512, 1024), nn.ReLU(True), nn.Linear(1024, n_targets))
[docs] def forward(self, x): x = self.encoder(x) x = self.decoder(x) return x
def _implements_SciKitLearn_API(object): fit = getattr(object, 'fit', None) predict = getattr(object, 'predict', None) if(fit is not None and predict is not None): return True return False