#!/usr/bin/env python
# Created by "Thieu" at 09:48, 17/08/2023 ----------%
# Email: nguyenthieu2102@gmail.com %
# Github: https://github.com/thieu1995 %
# --------------------------------------------------%
import inspect
import numbers
import pickle
import pprint
import numpy as np
import pandas as pd
from pathlib import Path
from permetrics import RegressionMetric, ClassificationMetric
from sklearn.base import BaseEstimator
from mealpy import get_optimizer_by_class, Optimizer, get_all_optimizers, FloatVar
from sklearn.cluster import KMeans
from evorbf.helpers import validator
from evorbf.helpers.metrics import get_all_regression_metrics, get_all_classification_metrics
[docs]class CustomRBF:
"""Radial Basis Function
This class defines the general RBF model that:
+ use non-linear Gaussian function
+ use inverse matrix multiplication instead of Gradient-based
+ set up regulation term with hyperparameter `lamda`
Parameters
----------
size_hidden : int, default=10
The number of hidden nodes
center_finder : str, default="kmeans"
The method is used to find the cluster centers
sigmas : float, int, np.ndarray, list, tuple, default=2.0
The sigma values that are used in Gaussian function. In traditional RBF model, 1 sigma value is used
for all of hidden nodes. But in Nature-inspired Algorithms (NIAs) based RBF model, each
sigma is assigned to 1 hidden node.
reg_lambda : float, default=0.1
The lamda value is used in regularization term. If set to 0, then no L2 is applied
seed : int, default=None
The seed value is used for reproducibility.
"""
def __init__(self, size_hidden=10, center_finder="kmeans", sigmas=2.0, reg_lambda=0.1, seed=None):
self.size_hidden = size_hidden
self.center_finder = center_finder
self.sigmas = sigmas
self.reg_lambda = reg_lambda
self.seed = seed
self.centers, self.weights, self.weights_shape = None, None, None
self.regularization = None
[docs] def check_reg_lambda(self, reg_lambda):
if type(reg_lambda) is float and reg_lambda > 0.0:
return reg_lambda, True
else:
return reg_lambda, False
[docs] def set_reg_lambda(self, reg_lambda):
if type(reg_lambda) is float and reg_lambda > 0.0:
self.reg_lambda = reg_lambda
self.regularization = True
else:
self.reg_lambda, self.regularization = reg_lambda, False
[docs] @staticmethod
def calculate_centers(X, method="kmeans", n_clusters=5, seed=42):
if method == "kmeans":
kobj = KMeans(n_clusters=n_clusters, n_init='auto', random_state=seed).fit(X)
return kobj.cluster_centers_
elif method == "random":
generator = np.random.default_rng(seed)
return X[generator.choice(len(X), n_clusters, replace=False)]
[docs] @staticmethod
def calculate_rbf(X, c, sigma):
# Calculate Radial Basis Function (Gaussian)
# return np.exp(-np.sum((X - c)**2, axis=1) / (2 * sigmas**2))
return np.exp(-np.linalg.norm(X - c, axis=1)**2 / (2 * sigma**2))
[docs] def fit(self, X, y):
"""Fit the core to data matrix X and target(s) y.
Parameters
----------
X : ndarray or sparse matrix of shape (n_samples, n_features)
The input data.
y : ndarray of shape (n_samples,) or (n_samples, n_outputs)
The target values (class labels in classification, real numbers in regression).
Returns
-------
self : object
Returns a trained RBF core.
"""
# Check regularization
self.reg_lambda, self.regularization = self.check_reg_lambda(self.reg_lambda)
# Check sigmas
if isinstance(self.sigmas, (int, float, np.number)):
self.sigmas = [self.sigmas, ] * self.size_hidden
elif isinstance(self.sigmas, (list, tuple, np.ndarray)):
if len(self.sigmas) != self.size_hidden:
raise ValueError("sigmas must have equal length to size_hidden")
else:
raise ValueError("sigmas must be an number or list, tuple, or number array")
# Initialize centers
self.centers = self.calculate_centers(X, self.center_finder, self.size_hidden, self.seed)
# Calculate RBF layer outputs
rbf_layer = self.transform_X(X)
if self.regularization:
# Solve for weights using ridge regression (L2 regularization)
lambda_iden = self.reg_lambda * np.eye(rbf_layer.shape[1]) # L2 regularization
self.weights = np.linalg.inv(rbf_layer.T @ rbf_layer + lambda_iden) @ rbf_layer.T @ y
else:
# Solve for weights using pseudo-inverse
self.weights = np.linalg.pinv(rbf_layer) @ y
return self
[docs] def predict(self, X):
"""Predict using the Radial Basis Function core.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
Returns
-------
y : ndarray of shape (n_samples, n_outputs)
The predicted values.
"""
rbf_layer = self.transform_X(X)
return rbf_layer @ self.weights
[docs] def update_weights_from_solution(self, solution, X, y):
"""
This function is used for NIA-based RBF model. Whenever a solution is generated, it will call this function.
"""
if self.centers is None:
self.centers = self.calculate_centers(X, self.center_finder, self.size_hidden, self.seed)
if self.regularization:
self.sigmas = solution[:self.size_hidden]
self.reg_lambda = solution[-1]
rbf_layer = self.transform_X(X)
# Solve for weights using ridge regression (L2 regularization)
lambda_iden = self.reg_lambda * np.eye(rbf_layer.shape[1]) # L2 regularization
self.weights = np.linalg.inv(rbf_layer.T @ rbf_layer + lambda_iden) @ rbf_layer.T @ y
else:
self.sigmas = solution
rbf_layer = self.transform_X(X)
# Solve for weights using pseudo-inverse
self.weights = np.linalg.pinv(rbf_layer) @ y
[docs] def get_weights(self):
return self.weights
[docs] def set_weights(self, weights):
self.weights = weights
[docs] def get_weights_size(self):
return self.weights.size()
[docs]class BaseRbf(BaseEstimator):
"""
Defines the most general class for RBF network that inherits the BaseEstimator class of Scikit-Learn library.
Parameters
----------
size_hidden : int, default=10
The number of hidden nodes
center_finder : str, default="kmeans"
The method is used to find the cluster centers
sigmas : float, default=2.0
The sigma values that are used in Gaussian function. In traditional RBF model, 1 sigma value is used
for all of hidden nodes. But in Nature-inspired Algorithms (NIAs) based RBF model, each
sigma is assigned to 1 hidden node.
reg_lambda : float, default=0.1
The lamda value is used in regularization term. If set to 0, then no L2 is applied
seed : int, default=None
The seed value is used for reproducibility.
"""
SUPPORTED_CLS_METRICS = get_all_classification_metrics()
SUPPORTED_REG_METRICS = get_all_regression_metrics()
CLS_OBJ_LOSSES = None
def __init__(self, size_hidden=10, center_finder="kmeans", sigmas=2.0, reg_lambda=0.1, seed=None):
super().__init__()
self._net_class = CustomRBF
self.size_hidden = size_hidden
self.center_finder = center_finder
self.sigmas = sigmas
self.reg_lambda = reg_lambda
self.seed = seed
self.parameters = {}
self.network, self.obj_scaler, self.loss_train, self.n_labels = None, None, None, None
@staticmethod
def _check_method(method=None, list_supported_methods=None) -> str:
if type(method) is str:
return validator.check_str("method", method, list_supported_methods)
else:
raise ValueError(f"method should be a string and belongs to {list_supported_methods}")
def __repr__(self, **kwargs):
"""Pretty-print parameters like scikit-learn's Estimator.
"""
param_order = list(inspect.signature(self.__init__).parameters.keys())
param_dict = {k: getattr(self, k) for k in param_order}
param_str = ", ".join(f"{k}={repr(v)}" for k, v in param_dict.items())
if len(param_str) <= 80:
return f"{self.__class__.__name__}({param_str})"
else:
formatted_params = ",\n ".join(f"{k}={pprint.pformat(v)}" for k, v in param_dict.items())
return f"{self.__class__.__name__}(\n {formatted_params}\n)"
[docs] def create_network(self, X, y):
return None, None
[docs] def fit(self, X, y):
self.network, self.obj_scaler = self.create_network(X, y)
y_scaled = self.obj_scaler.transform(y)
self.network.fit(X, y_scaled)
return self
[docs] def predict(self, X):
"""Predict the outcome of the feature X"""
pred = self.network.predict(X)
return self.obj_scaler.inverse_transform(pred)
[docs] def predict_proba(self, X):
"""
It is used for classification problem. The returned results are the probability for each sample
"""
return self.network.predict(X)
def __evaluate_reg(self, y_true, y_pred, list_metrics=("MSE", "MAE")):
rm = RegressionMetric(y_true=y_true, y_pred=y_pred)
return rm.get_metrics_by_list_names(list_metrics)
def __evaluate_cls(self, y_true, y_pred, list_metrics=("AS", "RS")):
cm = ClassificationMetric(y_true, y_pred)
return cm.get_metrics_by_list_names(list_metrics)
def __score_reg(self, X, y):
y_pred = self.network.predict(X)
return RegressionMetric().pearson_correlation_coefficient_square(y_true=y, y_pred=y_pred)
def __scores_reg(self, X, y, list_metrics=("MSE", "MAE")):
y_pred = self.network.predict(X)
return self.__evaluate_reg(y_true=y, y_pred=y_pred, list_metrics=list_metrics)
def __score_cls(self, X, y):
y_pred = self.predict(X)
return ClassificationMetric().accuracy_score(y_true=y, y_pred=y_pred)
def __scores_cls(self, X, y, list_metrics=("AS", "RS")):
list_errors = list(set(list_metrics) & set(self.CLS_OBJ_LOSSES))
list_scores = list((set(self.SUPPORTED_CLS_METRICS.keys()) - set(self.CLS_OBJ_LOSSES)) & set(list_metrics))
t1 = {}
if len(list_errors) > 0:
if self.n_labels > 2:
y_pred = self.predict_proba(X)
else:
y_pred = self.predict(X)
t1 = self.__evaluate_cls(y_true=y, y_pred=y_pred, list_metrics=list_errors)
y_pred = self.predict(X)
t2 = self.__evaluate_cls(y_true=y, y_pred=y_pred, list_metrics=list_scores)
return {**t2, **t1}
[docs] def evaluate(self, y_true, y_pred, list_metrics=None):
"""Return the list of performance metrics of the prediction.
You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics
"""
pass
[docs] def score(self, X, y):
"""Return the default metric of the prediction."""
pass
[docs] def scores(self, X, y, list_metrics=None):
"""Return the list of metrics of the prediction."""
pass
[docs] def save_loss_train(self, save_path="history", filename="loss.csv"):
## Save loss train to csv file
Path(save_path).mkdir(parents=True, exist_ok=True)
if self.loss_train is None:
print(f"{self.__class__.__name__} core doesn't have training loss!")
else:
data = {"epoch": list(range(1, len(self.loss_train) + 1)), "loss": self.loss_train}
pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_metrics(self, y_true, y_pred, list_metrics=("RMSE", "MAE"), save_path="history", filename="metrics.csv"):
## Save metrics to csv file
Path(save_path).mkdir(parents=True, exist_ok=True)
results = self.evaluate(y_true, y_pred, list_metrics)
df = pd.DataFrame.from_dict(results, orient='index').T
df.to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_y_predicted(self, X, y_true, save_path="history", filename="y_predicted.csv"):
## Save the predicted results to csv file
Path(save_path).mkdir(parents=True, exist_ok=True)
y_pred = self.predict(X)
data = {"y_true": np.squeeze(np.asarray(y_true)), "y_pred": np.squeeze(np.asarray(y_pred))}
pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_model(self, save_path="history", filename="core.pkl"):
## Save core to pickle file
Path(save_path).mkdir(parents=True, exist_ok=True)
if filename[-4:] != ".pkl":
filename += ".pkl"
pickle.dump(self, open(f"{save_path}/{filename}", 'wb'))
[docs] @staticmethod
def load_model(load_path="history", filename="core.pkl"):
if filename[-4:] != ".pkl":
filename += ".pkl"
return pickle.load(open(f"{load_path}/{filename}", 'rb'))
[docs]class BaseNiaRbf(BaseRbf):
"""
Defines the most general class for Nature-inspired Algorithm-based RBF models that inherits the BaseRbf class
Note
----
+ In this model, the sigmas will be learned during the training process.
+ So the `sigmas` parameter is removed in the init function.
+ Besides, the `sigmas` is a list of value, each value represent a `sigma` for Gaussian function used in hidden node.
Parameters
----------
size_hidden : int, default=10
The number of hidden nodes
center_finder : str, default="kmeans"
The method is used to find the cluster centers
regularization : bool, default=True
Determine if L2 regularization technique is used or not.
If set to True, then the regularization lambda is learned during the training.
obj_name : None or str, default=None
The name of objective for the problem, also depend on the problem is classification and regression.
optim : str or instance of Optimizer class (from Mealpy library), default = "BaseGA"
The Metaheuristic Algorithm that use to solve the feature selection problem.
Current supported list, please check it here: https://github.com/thieu1995/mealpy.
If a custom optimizer is passed, make sure it is an instance of `Optimizer` class.
optim_params : None or dict of parameter, default=None
The parameter for the `optimizer` object.
If `None`, the default parameters of optimizer is used (defined in https://github.com/thieu1995/mealpy.)
If `dict` is passed, make sure it has at least `epoch` and `pop_size` parameters.
verbose : bool, default=True
Whether to print progress messages to stdout.
seed : int, default=None
The seed value is used for reproducibility.
lb : int, float, tuple, list, np.ndarray, optional
Lower bounds for sigmas in network.
ub : int, float, tuple, list, np.ndarray, optional
Upper bounds for sigmas in network.
mode : str, optional
Mode for optimizer (default is 'single').
n_workers : int, optional
Number of workers for parallel processing in optimizer (default is None).
termination : any, optional
Termination criteria for optimizer (default is None).
Notes
-----
- This class is designed to be easily extended for hybrid metaheuristic-based RBF models.
- Metrics can be customized using the Permetrics library: https://github.com/thieu1995/permetrics
"""
SUPPORTED_OPTIMIZERS = list(get_all_optimizers(verbose=False).keys())
SUPPORTED_CLS_OBJECTIVES = get_all_classification_metrics()
SUPPORTED_REG_OBJECTIVES = get_all_regression_metrics()
def __init__(self, size_hidden=10, center_finder="kmeans", regularization=True,
obj_name=None, optim="BaseGA", optim_params=None, verbose=True, seed=None,
lb=None, ub=None, mode='single', n_workers=None, termination=None):
super().__init__(size_hidden=size_hidden, center_finder=center_finder, seed=seed)
self.regularization = regularization
self.obj_name = obj_name
self.optim_params = optim_params
self.optim = optim
self.lb = lb
self.ub = ub
self.mode = mode
self.n_workers = n_workers
self.termination = termination
self.verbose = verbose
self.network, self.obj_scaler, self.loss_train, self.optimizer = None, None, None, None
def _set_optimizer(self, optim=None, optim_params=None):
"""
Validates the real optimizer based on the provided `optim` and `optim_pras`.
Parameters
----------
optim : str or Optimizer
The optimizer name or instance to be set.
optim_params : dict, optional
Parameters to configure the optimizer.
Returns
-------
Optimizer
An instance of the selected optimizer.
Raises
------
TypeError
If the provided optimizer is neither a string nor an instance of Optimizer.
"""
if isinstance(optim, str):
opt_class = get_optimizer_by_class(optim)
if isinstance(optim_params, dict):
return opt_class(**optim_params)
else:
return opt_class(epoch=500, pop_size=30)
elif isinstance(optim, Optimizer):
if isinstance(optim_params, dict):
if "name" in optim_params: # Check if key exists and remove it
optim.name = optim_params.pop("name")
optim.set_parameters(optim_params)
return optim
else:
raise TypeError(f"optimizer needs to set as a string and supported by Mealpy library.")
def _set_lb_ub(self, lb=None, ub=None, n_dims=None, lb_default=None, ub_default=None):
"""
Validates and sets the lower and upper bounds for optimization.
Parameters
----------
lb : list, tuple, np.ndarray, int, or float, optional
The lower bounds for sigmas in network.
ub : list, tuple, np.ndarray, int, or float, optional
The upper bounds for sigmas in network.
n_dims : int
The number of dimensions.
Returns
-------
tuple
A tuple containing validated lower and upper bounds.
Raises
------
ValueError
If the bounds are not valid.
"""
if lb is None:
lb = (lb_default,) * n_dims
elif isinstance(lb, numbers.Number):
lb = (lb, ) * n_dims
elif isinstance(lb, (list, tuple, np.ndarray)):
if len(lb) == 1:
lb = np.array(lb * n_dims, dtype=float)
else:
lb = np.array(lb, dtype=float).ravel()
if ub is None:
ub = (ub_default,) * n_dims
elif isinstance(ub, numbers.Number):
ub = (ub, ) * n_dims
elif isinstance(ub, (list, tuple, np.ndarray)):
if len(ub) == 1:
ub = np.array(ub * n_dims, dtype=float)
else:
ub = np.array(ub, dtype=float).ravel()
if len(lb) != len(ub):
raise ValueError(f"Invalid lb and ub. Their length should be equal to 1 or {n_dims}.")
return np.array(lb).ravel(), np.array(ub).ravel()
[docs] def objective_function(self, solution=None):
pass
[docs] def fit(self, X, y):
self.network, self.obj_scaler = self.create_network(X, y)
y_scaled = self.obj_scaler.transform(y)
self.X_temp, self.y_temp = X, y_scaled
self.optimizer = self._set_optimizer(self.optim, self.optim_params)
if self.regularization:
n_dims = self.size_hidden + 1
else:
n_dims = self.size_hidden
lb, ub = self._set_lb_ub(self.lb, self.ub, n_dims, lb_default=1e-6, ub_default=np.mean(np.max(X, axis=0)))
log_to = "console" if self.verbose else "None"
if self.obj_name is None:
raise ValueError("obj_name can't be None")
else:
if self.obj_name in self.SUPPORTED_REG_OBJECTIVES.keys():
minmax = self.SUPPORTED_REG_OBJECTIVES[self.obj_name]
elif self.obj_name in self.SUPPORTED_CLS_OBJECTIVES.keys():
minmax = self.SUPPORTED_CLS_OBJECTIVES[self.obj_name]
else:
raise ValueError("obj_name is not supported. Please check the library: permetrics to see the supported objective function.")
problem = {
"obj_func": self.objective_function,
"bounds": FloatVar(lb=lb, ub=ub),
"minmax": minmax,
"log_to": log_to,
}
self.optimizer.solve(problem, seed=self.seed, mode=self.mode, n_workers=self.n_workers, termination=self.termination)
self.network.update_weights_from_solution(self.optimizer.g_best.solution, X, y_scaled)
self.loss_train = np.array(self.optimizer.history.list_global_best_fit)
return self