Source code for octis.optimization.optimizer_tool

import matplotlib.pyplot as plt
import numpy as np
from skopt.learning import (
    GaussianProcessRegressor, RandomForestRegressor, ExtraTreesRegressor)
from skopt import Optimizer as skopt_optimizer
from skopt.utils import dimensions_aslist
import os
import importlib
import sys
import octis.configuration.defaults as defaults
from pathlib import Path

framework_path = Path(os.path.dirname(os.path.realpath(__file__)))
framework_path = str(framework_path.parent)


[docs]def importClass(class_name, module_name, module_path): """ Import a class runtime based on its module and name :param class_name: name of the class :type class_name: str :param module_name: name of the module :type module_name: str :param module_path: absolute path to the module :type module_path: str :return: class object :rtype: class """ spec = importlib.util.spec_from_file_location( module_name, module_path, submodule_search_locations=[]) module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module spec.loader.exec_module(module) importlib.invalidate_caches() imported_class = getattr(module, class_name) return imported_class
[docs]def load_model(optimization_object): """ Load the topic model for the resume of the optimization :param optimization_object: dictionary of optimization attributes saved in the json file :type optimization_object: dict :return: topic model used during the BO. :rtype: object model """ model_parameters = optimization_object['model_attributes'] use_partitioning = optimization_object['use_partitioning'] model_name = optimization_object['model_name'] module_path = os.path.join(framework_path, "models") module_path = os.path.join(module_path, model_name + ".py") model = importClass(model_name, model_name, module_path) model_instance = model() model_instance.hyperparameters.update(model_parameters) model_instance.use_partitions = use_partitioning return model_instance
[docs]def select_metric(metric_parameters, metric_name): """ Select the metric for the resume of the optimization :param metric_parameters: metric parameters :type metric_parameters: list :param metric_name: name of the metric :type metric_name: str :return: metric :rtype: metric object """ module_path = os.path.join(framework_path, "evaluation_metrics") module_name = defaults.metric_parameters[metric_name]["module"] module_path = os.path.join(module_path, module_name + ".py") Metric = importClass(metric_name, metric_name, module_path) metric = Metric(**metric_parameters) return metric
[docs]def choose_optimizer(optimizer): """ Choose a surrogate model for Bayesian Optimization :param optimizer: list of setting of the BO experiment :type optimizer: Optimizer :return: surrogate model :rtype: scikit object """ params_space_list = dimensions_aslist(optimizer.search_space) estimator = None # Choice of the surrogate model # Random forest if optimizer.surrogate_model == "RF": estimator = RandomForestRegressor( n_estimators=100, min_samples_leaf=3, random_state=optimizer.random_state) # Extra Tree elif optimizer.surrogate_model == "ET": estimator = ExtraTreesRegressor( n_estimators=100, min_samples_leaf=3, random_state=optimizer.random_state) # GP Minimize elif optimizer.surrogate_model == "GP": estimator = GaussianProcessRegressor( kernel=optimizer.kernel, random_state=optimizer.random_state) # Random Search elif optimizer.surrogate_model == "RS": estimator = "dummy" if estimator == "dummy": opt = skopt_optimizer( params_space_list, base_estimator=estimator, acq_func=optimizer.acq_func, acq_optimizer='sampling', initial_point_generator=optimizer.initial_point_generator, random_state=optimizer.random_state) else: opt = skopt_optimizer( params_space_list, base_estimator=estimator, acq_func=optimizer.acq_func, acq_optimizer='sampling', n_initial_points=optimizer.n_random_starts, initial_point_generator=optimizer.initial_point_generator, # work only for version skopt 8.0!!! acq_optimizer_kwargs={ "n_points": 10000, "n_restarts_optimizer": 5, "n_jobs": 1}, acq_func_kwargs={"xi": 0.01, "kappa": 1.96}, random_state=optimizer.random_state) return opt
[docs]def convergence_res(values, optimization_type="minimize"): """ Compute the list of values to plot the convergence plot (i.e. the best seen at each iteration) :param values: the result(s) for which to compute the convergence trace. :type values: list :param optimization_type: "minimize" if the problem is a minimization problem, "maximize" otherwise :type optimization_type: str :return: a list with the best min seen for each iteration :rtype: list """ values2 = values.copy() if optimization_type == "minimize": for i in range(1, len(values2)): if values2[i] > values2[i - 1]: values2[i] = values2[i - 1] else: for i in range(1, len(values2)): if values2[i] < values2[i - 1]: values2[i] = values2[i - 1] return values2
[docs]def early_condition(values, n_stop, n_random): """ Compute the early-stop criterium to stop or not the optimization. :param values: values obtained by Bayesian Optimization :type values: list :param n_stop: Range of points without improvement :type n_stop: int :param n_random: Random starting points :type n_random: int :return: 'True' if early stop condition reached, 'False' otherwise :rtype: bool """ n_min_len = n_stop + n_random if len(values) >= n_min_len: values = convergence_res(values, optimization_type="minimize") worst = values[len(values) - n_stop] best = values[-1] diff = worst - best if diff == 0: return True return False
[docs]def plot_model_runs(model_runs, current_call, name_plot): """ Save a boxplot of the data (Works only when optimization_runs is 1). :param model_runs: dictionary of all the model runs. :type model_runs: dict :param current_call: number of calls computed by BO :type current_call: int :param name_plot: Name of the plot :type name_plot: str """ values = [model_runs["iteration_" + str(i)] for i in range(current_call + 1)] plt.ioff() plt.xlabel('number of calls') plt.grid(True) plt.boxplot(values) plt.savefig(name_plot + ".png") plt.close()
[docs]def plot_bayesian_optimization(values, name_plot, log_scale=False, conv_max=True): """ Save a convergence plot of the result of a Bayesian_optimization. :param values: List of objective function values :type values: list :param name_plot: Name of the plot :type name_plot: str :param log_scale: 'True' if log scale for y-axis, 'False' otherwise :type log_scale: bool, optional :param conv_max: 'True' for a minimization problem, 'False' for a maximization problem :type conv_max: bool, optional """ if conv_max: # minimization problem -->maximization problem values = [-val for val in values] media = convergence_res(values, optimization_type="maximize") xlabel = 'max f(x) after n calls' else: # minimization problem media = convergence_res(values, optimization_type="minimize") xlabel = 'min f(x) after n calls' array = [i for i in range(len(media))] plt.ioff() plt.plot(array, media, color='blue', label="res") if log_scale: plt.yscale('log') plt.ylabel(xlabel) plt.xlabel('Number of calls n') plt.legend(loc='best') plt.tight_layout() plt.grid(True) plt.savefig(name_plot + ".png") plt.close()
[docs]def convert_type(obj): """ Convert a numpy object to a python object :param obj: object to be checked :type obj: numpy object :return: python object :rtype: python object """ if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return obj
[docs]def check_instance(obj): """ Check if a specific object con be inserted in the json file. :param obj: an object of the optimization to be saved :type obj: [str,float, int, bool, etc.] :return: 'True' if the object is json format, 'False' otherwise :rtype: bool """ types = [str, float, int, bool] for t in types: if isinstance(obj, t): return True return False
[docs]def save_search_space(search_space): """ Save the search space in the json file :param search_space: dictionary of the search space (scikopt object) :type search_space: dict :return: dictionary for the seach space, which can be saved in a json file :rtype: dict """ from skopt.space.space import Real, Categorical, Integer ss = dict() for key in list(search_space.keys()): if type(search_space[key]) == Real: ss[key] = [ 'Real', search_space[key].bounds, search_space[key].prior] elif type(search_space[key]) == Integer: ss[key] = [ 'Integer', search_space[key].bounds, search_space[key].prior] elif type(search_space[key]) == Categorical: ss[key] = [ 'Categorical', search_space[key].categories, search_space[key].prior] return ss
[docs]def load_search_space(search_space): """ Load the search space from the json file :param search_space: dictionary of the search space (insertable in a json file) :type dict: :return: dictionary for the search space (for scikit optimize) :rtype: dict """ from skopt.space.space import Real, Categorical, Integer ss = dict() for key in list(search_space.keys()): if search_space[key][0] == 'Real': ss[key] = Real( low=search_space[key][1][0], high=search_space[key][1][1], prior=search_space[key][2]) elif search_space[key][0] == 'Integer': ss[key] = Integer( low=search_space[key][1][0], high=search_space[key][1][1], prior=search_space[key][2]) elif search_space[key][0] == 'Categorical': ss[key] = Categorical(categories=search_space[key][1]) return ss