# Utils
import json
import time
from pathlib import Path
import numpy as np
# utils from skopt and sklearn
from sklearn.gaussian_process.kernels import *
from skopt.space.space import *
from octis.dataset.dataset import Dataset
# utils from other files of the framework
from octis.models.model import save_model_output
from octis.optimization.optimizer_evaluation import OptimizerEvaluation
from octis.optimization.optimizer_tool import (
choose_optimizer, early_condition, load_model, select_metric)
from octis.optimization.optimizer_tool import (
load_search_space, plot_bayesian_optimization, plot_model_runs)
[docs]class Optimizer:
"""
Class Optimizer to perform Bayesian Optimization on Topic Model
"""
[docs] def optimize(
self, model, dataset, metric, search_space, extra_metrics=None,
number_of_call=5, n_random_starts=1,
initial_point_generator="lhs", # work only for version skopt 8.0!!!
optimization_type='Maximize', model_runs=5, surrogate_model="RF",
kernel=1.0 * Matern(
length_scale=1.0, length_scale_bounds=(1e-1, 10.0), nu=1.5),
acq_func="LCB", random_state=False, x0=None, y0=None,
save_models=True, save_step=1, save_name="result",
save_path="results/", early_stop=False, early_step=5,
plot_best_seen=False, plot_model=False, plot_name="B0_plot",
log_scale_plot=False, topk=10):
"""
Perform hyper-parameter optimization for a Topic Model
:param model: model with hyperparameters to optimize
:type model: OCTIS Topic Model
:param dataset: dataset for the model dataset
:type dataset: OCTIS dataset
:param metric: metric used for the optimization
:type metric: OCTIS metric
:param search_space: a dictionary of hyperparameters to optimize
(each parameter is defined as a skopt space)
:type search_space: skopt space object
:param extra_metrics: list of extra-metrics to compute during the
optimization
:type extra_metrics: list of metrics, optional
:param number_of_call: number of evaluations of metric
:type number_of_call: int, optional
:param n_random_starts: number of evaluations of metric with random
points before approximating it with surrogate model
:type n_random_starts: int, optional
:param initial_point_generator: set an initial point generator.
Can be either "random", "sobol", "halton" ,"hammersly","lhs"
:type initial_point_generator: str, optional
:param optimization_type: Set "Maximize" if you want to maximize
metric, "Minimize" if you want to minimize
:type optimization type: str, optional
:param model_runs:
:type: int, optional
:param surrogate_model: set a surrogate model. Can be either "GP"
(Gaussian Process), "RF" (Random Forest) or "ET" (Extra-Tree)
:type: str, optional
:param kernel: set a kernel function
:param acq_func: Function to minimize over the surrogate model.
Can be either: "LCB" (Lower Confidence Bound), "EI" (Expected
improvement) OR "PI" (Probability of Improvement)
:type: str, optional
:param random_state: Set random state to something other than None for
reproducible results.
:type: int, optional
:param x0: List of initial input points.
:type: list, optional
:param y0: Evaluation of initial input points.
:type: list, optional
:param save_models: if 'True' save all the topic models generated
during the optimization process
:type: bool, optional
:param save_step: decide how much to save the results of the
optimization
:type: int, optional
:param save_name: name of the file where the results of the
optimization will be saved
:type: str, optional
:param save_path: Path where the results of the optimization (json file
) will be saved
:type save_path: str, optional
:param early_stop: if "True" stop the optimization if there is no
improvement after early_step evaluations
:type early_stop: bool, optional
:param early_step: number of iterations with no improvement after which
optimization will be stopped (if early_stop is True)
:type early_step: int, optional
:param plot_best_seen: If "True" save a convergence plot of the result
of a Bayesian_optimization (i.e. the best seen for each iteration)
:type plot_best_seen: bool, optional
:param plot_model: If "True" save the boxplot of all the model runs
:type plot_model: bool, optional
:param plot_name: Set the name of the plots (best_seen and model_runs).
:type plot_name: str, optional
:param log_scale_plot: if "True" use the logarithmic scale for the
plots.
:type log_scale_plot: bool, optional
:param topk:
:type topk: int, optional
:return: OptimizerEvaluation object
:rtype: class
"""
# Set the attributes
if extra_metrics is None:
extra_metrics = []
if y0 is None:
y0 = []
if x0 is None:
x0 = dict()
self.model = model
self.dataset = dataset
self.metric = metric
self.search_space = search_space
self.extra_metrics = extra_metrics
self.optimization_type = optimization_type
self.number_of_call = number_of_call
self.n_random_starts = n_random_starts
self.initial_point_generator = initial_point_generator
self.model_runs = model_runs
self.surrogate_model = surrogate_model
self.kernel = kernel
self.acq_func = acq_func
self.random_state = random_state
self.x0 = x0
self.y0 = y0
self.save_path = save_path
self.save_step = save_step
self.save_name = save_name
self.save_models = save_models
self.early_stop = early_stop
self.early_step = early_step
self.plot_model = plot_model
self.plot_best_seen = plot_best_seen
self.plot_name = plot_name
self.log_scale_plot = log_scale_plot
self.topk = topk
self.hyperparameters = list(sorted(self.search_space.keys()))
self.dict_model_runs = dict()
self.number_of_previous_calls = 0
self.current_call = 0
self.time_eval = []
self.name_optimized_metric = metric.__class__.__name__
self.dict_model_runs[self.name_optimized_metric] = dict()
# Info about extra metrics
i = 0
self.extra_metric_names = []
for extra_metric in extra_metrics:
self.extra_metric_names.append(
str(i) + '_' + extra_metric.__class__.__name__)
self.dict_model_runs[self.extra_metric_names[i]] = dict()
i = i + 1
# Control about the correctness of BO parameters
if self._check_bo_parameters() == -1:
print("ERROR: wrong initialitation of BO parameters")
return None
# Create the directory where the results are saved
Path(self.save_path).mkdir(parents=True, exist_ok=True)
# Initialize the directories about model_runs
if self.save_models:
self.model_path_models = self.save_path + "models/"
Path(self.model_path_models).mkdir(parents=True, exist_ok=True)
# Choice of the optimizer
opt = choose_optimizer(self)
# Perform Bayesian Optimization
results = self._optimization_loop(opt)
return results
[docs] def resume_optimization(self, name_path, extra_evaluations=0):
"""
Restart the optimization from the json file.
:param name_path: path of the json file
:type name_path: str
:param extra_evaluations: extra iterations for the BO optimization
:type extra_evaluations: int
:return: object with the results of the optimization
:rtype: object
"""
# Restore of the parameters
res, opt = self._restore_parameters(name_path)
# Set the number of total calls
self.number_of_call = self.number_of_call + extra_evaluations
# Check if there are other iterations to do
if self.number_of_previous_calls == self.number_of_call:
return OptimizerEvaluation(self, BO_results=res)
# Control about the correctness of BO parameters
if self._check_bo_parameters() == -1:
print("ERROR: wrong initialization of BO parameters")
return None
results = self._optimization_loop(opt)
return results
def _objective_function(self, hyperparameter_values):
"""
Evaluate the objective function
:param hyperparameter_values: hyper-parameters of the Topic Model
:type hyperparameter_values: list
:return: value of the objective function
:rtype: float
"""
# Retrieve parameters labels
params = {}
for i in range(len(self.hyperparameters)):
params[self.hyperparameters[i]] = hyperparameter_values[i]
# Compute the score of the hyper-parameter configuration
different_model_runs = []
different_model_runs_extra_metrics = [[] for i in range(len(
self.extra_metrics))]
for i in range(self.model_runs):
# Prepare model
model_output = self.model.train_model(
self.dataset, params, self.topk)
# Score of the model
score = self.metric.score(model_output)
different_model_runs.append(score)
# Update of the extra metric values
for j, extra_metric in enumerate(self.extra_metrics):
different_model_runs_extra_metrics[j].append(
extra_metric.score(model_output))
# Save the model for each run
if self.save_models:
name = str(self.current_call) + "_" + str(i)
save_model_path = self.model_path_models + name
save_model_output(model_output, save_model_path)
# Update of the dictionaries
self.dict_model_runs[self.name_optimized_metric][
'iteration_' + str(self.current_call)] = different_model_runs
for j, extra_metric in enumerate(self.extra_metrics):
self.dict_model_runs[self.extra_metric_names[j]][
'iteration_' + str(self.current_call)] = (
different_model_runs_extra_metrics[j])
# The output for BO is the median over different_model_runs
result = np.median(different_model_runs)
if self.optimization_type == 'Maximize':
result = - result
# Boxplot for matrix_model_runs
if self.plot_model:
name_plot = ''.join([
self.save_path, self.plot_name,
"_model_runs_", self.name_optimized_metric])
plot_model_runs(
self.dict_model_runs[
self.name_optimized_metric], self.current_call, name_plot)
# Boxplot of extrametrics (if any)
for j in range(len(self.extra_metrics)):
name_plot = ''.join([
self.save_path, self.plot_name,
"_model_runs_" + self.extra_metric_names[j]])
plot_model_runs(self.dict_model_runs[
self.extra_metric_names[j]], self.current_call, name_plot)
return result
def _optimization_loop(self, opt):
"""
Perform the optimization through Bayesian Optimization
:return: result of the optimization
:rtype: class
"""
results = None
# For loop to perform Bayesian Optimization
for i in range(self.number_of_previous_calls, self.number_of_call):
# Next point proposed by BO and evaluation of the objective
# function
print("Current call: ", self.current_call)
start_time = time.time()
# Next point proposed by BO and evaluation of the objective
# function
if i < self.lenx0:
next_x = [self.x0[name][i] for name in self.hyperparameters]
# next_x = self.x0[i]
if len(self.y0) == 0:
f_val = self._objective_function(next_x)
else:
self.dict_model_runs[self.name_optimized_metric][
'iteration_' + str(i)] = self.y0[i]
f_val = -self.y0[i] if (
self.optimization_type == 'Maximize') else self.y0[i]
else:
next_x = opt.ask()
f_val = self._objective_function(next_x)
# Update the opt using (next_x,f_val)
res = opt.tell(next_x, f_val)
# Update the computational time for next_x (BO+Function evaluation)
end_time = time.time()
total_time_function = end_time - start_time
self.time_eval.append(total_time_function)
# Plot best seen
if self.plot_best_seen:
plot_bayesian_optimization(
res.func_vals,
self.save_path + self.plot_name + "_best_seen",
self.log_scale_plot,
conv_max=self.optimization_type == 'Maximize')
# Create an object related to the BO optimization
results = OptimizerEvaluation(self, BO_results=res)
# Save the object
if i % self.save_step == 0:
name_json = self.save_path + self.save_name + ".json"
results.save(name_json)
# Early stop condition
if i >= len(self.x0) and self.early_stop and early_condition(
res.func_vals, self.early_step, self.n_random_starts):
print("Stop because of early stopping condition")
break
# Update current_call
self.current_call = self.current_call + 1
return results
def _load_metric(self, optimization_object, dataset):
"""
Load the metric from the json file, useful for the resume method
:param optimization_object: dictionary of the information saved during
the optimization
:type optimization_object: dict
:param dataset: the considered dataset
:type dataset: OCTIS dataset object
"""
# Optimized Metric
self.name_optimized_metric = optimization_object['metric_name']
metric_parameters = optimization_object['metric_attributes']
if self.name_optimized_metric.startswith('Coherence'):
metric_parameters.update({'texts': dataset.get_corpus()})
if self.name_optimized_metric.startswith('F1Score'):
metric_parameters.update({'dataset': dataset})
self.metric = select_metric(
metric_parameters, self.name_optimized_metric)
# Extra metrics
self.extra_metrics = []
self.extra_metric_names = optimization_object['extra_metric_names']
dict_extra_metric_parameters = optimization_object[
'extra_metric_attributes']
for name in self.extra_metric_names:
metric_parameters = dict_extra_metric_parameters[name]
if 'Coherence' in name:
metric_parameters.update({'texts': dataset.get_corpus()})
if 'F1Score' in name:
metric_parameters.update({'dataset': dataset})
metric = select_metric(metric_parameters, name[2:])
self.extra_metrics.append(metric)
def _restore_parameters(self, name_path):
"""
Restore the BO parameters from the json file
:param name_path: name of the json file
:type name_path: str
:return: result of BO optimization (scikit-optimize object),
surrogate model (scikit-learn object)
:rtype: tuple
"""
# Load the previous results
with open(name_path, 'rb') as file:
optimization_object = json.load(file)
self.search_space = load_search_space(
optimization_object["search_space"])
self.acq_func = optimization_object["acq_func"]
self.surrogate_model = optimization_object["surrogate_model"]
self.kernel = eval(optimization_object["kernel"])
self.optimization_type = optimization_object["optimization_type"]
self.model_runs = optimization_object["model_runs"]
self.save_models = optimization_object["save_models"]
self.save_step = optimization_object["save_step"]
self.save_name = optimization_object["save_name"]
self.save_models = optimization_object["save_models"]
self.save_path = optimization_object["save_path"]
self.early_stop = optimization_object["early_stop"]
self.early_step = optimization_object["early_step"]
self.plot_model = optimization_object["plot_model"]
self.plot_best_seen = optimization_object["plot_best_seen"]
self.plot_name = optimization_object["plot_name"]
self.log_scale_plot = optimization_object["log_scale_plot"]
self.random_state = optimization_object["random_state"]
self.dict_model_runs = optimization_object['dict_model_runs']
self.number_of_previous_calls = optimization_object['current_call'] + 1
self.current_call = optimization_object['current_call'] + 1
self.number_of_call = optimization_object['number_of_call']
self.save_path = optimization_object['save_path']
self.x0 = optimization_object['x0']
self.y0 = optimization_object['y0']
self.n_random_starts = optimization_object['n_random_starts']
self.initial_point_generator = optimization_object[
'initial_point_generator']
self.topk = optimization_object['topk']
self.time_eval = optimization_object["time_eval"]
res = None
# Load the dataset
dataset = Dataset()
if not optimization_object["is_cached"]:
dataset.load_custom_dataset_from_folder(
optimization_object["dataset_path"])
else:
dp = optimization_object["dataset_path"][:-(len(
optimization_object["dataset_name"]) + len("_py3.pkz"))]
dataset.fetch_dataset(
optimization_object["dataset_name"], data_home=dp)
self.dataset = dataset
# Load the metric
self._load_metric(optimization_object, dataset)
# Load the model
self.model = load_model(optimization_object)
# Creation of the hyperparameters
self.hyperparameters = list(sorted(self.search_space.keys()))
# Choice of the optimizer
opt = choose_optimizer(self)
# Update number_of_call for restarting
for i in range(self.number_of_previous_calls):
next_x = [optimization_object["x_iters"][key][i]
for key in self.hyperparameters]
if self.optimization_type == 'Maximize':
f_val = -optimization_object["f_val"][i]
else:
f_val = optimization_object["f_val"][i]
res = opt.tell(next_x, f_val)
# Create the directory where the results are saved
Path(self.save_path).mkdir(parents=True, exist_ok=True)
self.model_path_models = self.save_path + "models/"
return res, opt
def _check_bo_parameters(self):
"""
Check the correctness of BO parameters
:return: -1 if there is an error, 0 otherwise
:rtype: bool
"""
if self.optimization_type not in ['Maximize', 'Minimize']:
print("Error: optimization type must be Maximize or Minimize")
return -1
if self.surrogate_model not in ['RF', 'RS', 'GP', 'ET']:
print("Error: surrogate model must be RF, ET, RS or GP")
return -1
if self.acq_func not in ['PI', 'EI', 'LCB']:
print("Error: acquisition function must be PI, EI or LCB")
return -1
if self.number_of_call <= 0:
print("Error: number_of_call can't be <= 0")
return -1
if self.number_of_call - len(self.x0) <= 0:
print("Error: number_of_call is less then len(x0)")
return None
if not isinstance(self.model_runs, int):
print("Error: model_run must be an integer")
return -1
if not isinstance(self.number_of_call, int):
print("Error: number_of_call must be an integer")
return -1
if not isinstance(self.n_random_starts, int):
print("Error: n_random_starts must be an integer")
return -1
if not isinstance(self.save_step, int):
print("Error: save_step must be an integer")
return -1
if not isinstance(self.save_step, int):
print("Error: save_step must be an integer")
return -1
if self.n_random_starts <= 0:
print("Error: the number of initial_points must be >=1 !!!")
return -1
if self.initial_point_generator not in [
'lhs', 'sobol', 'halton', 'hammersly', 'grid', 'random']:
print("Error: wrong initial_point_generator")
return -1
if not isinstance(self.x0, dict):
print("Error: x0 must be a dictionary!")
return -1
if not isinstance(self.y0, list):
print("Error: y0 must be a dictionary!")
return -1
if len(self.x0) > 0:
self.lenx0 = len(list(self.x0.values())[0])
for i in range(len(self.x0.values())):
lenC = len(list(self.x0.values())[i])
if lenC != self.lenx0:
print("Error: dimension of x0 is not consistent!")
return -1
if len(self.y0) > 0:
if self.lenx0 != len(self.y0):
print("Error: different dimension for x0 and y0!")
return -1
else:
self.lenx0 = 0
self.leny0 = 0
if self.plot_name.endswith(".png"):
self.plot_name = self.plot_name[:-4]
if self.save_name.endswith(".json"):
self.save_name = self.save_name[:-5]
if self.save_path[-1] != '/':
self.save_path = self.save_path + '/'
return 0