Source code for niapy.algorithms.modified.shade

# encoding=utf8
import logging

import numpy as np

from niapy.algorithms.algorithm import Individual
from niapy.algorithms.basic.de import DifferentialEvolution
from niapy.util import objects_to_array

logging.basicConfig()
logger = logging.getLogger('niapy.algorithms.modified')
logger.setLevel('INFO')

__all__ = [
    'parent_medium',
    'cross_curr2pbest1',
    'SolutionSHADE',
    'SuccessHistoryAdaptiveDifferentialEvolution',
    'LpsrSuccessHistoryAdaptiveDifferentialEvolution'
]


def parent_medium(x, p, lower, upper, **_kwargs):
    r"""Repair solution and put the solution to the medium of the parent's value.

    Args:
        x (numpy.ndarray): Solution to check and repair if needed.
        p (numpy.ndarray): The parent of the solution.
        lower (numpy.ndarray): Lower bounds of search space.
        upper (numpy.ndarray): Upper bounds of search space.

    Returns:
        numpy.ndarray: Solution in search space.

    """
    ir = np.where(x < lower)    # values under the range are repaired to a medium of x_min and parents value
    x[ir] = (lower[ir] + p[ir]) / 2.0
    ir = np.where(x > upper)    # values over the range are repaired to a medium of x_max and parents value
    x[ir] = (upper[ir] + p[ir]) / 2.0
    return x


def cross_curr2pbest1(pop, ic, f, cr, rng, p_num, archive, arc_ind_cnt, task, **_kwargs):
    r"""Mutation strategy with crossover.

    Mutation:
        Name: current-to-pbest/1

        :math:`\mathbf{v}_{i, G} = \mathbf{x}_{i, G} + differential_weight \cdot (\mathbf{x}_{pbest, G} - \mathbf{x}_{i, G}) + differential_weight \cdot (\mathbf{x}_{r_1, G} - \mathbf{x}_{r_2, G})`
        where individual :math:`\mathbf{x}_{pbest, G}` is randomly selected from the top :math:`N \cdot pbest_factor (pbest_factor \in [0,1])` current population members,
        :math:`r_1` is an index representing a random current population member and :math:`r_2` is an index representing a random member of :math:`N_{G} \cup A`

    Crossover:
        Name: Binomial crossover

        :math:`\mathbf{u}_{j, i, G} = \begin{cases} \mathbf{v}_{j, i, G}, & \text{if $rand[0,1) \leq crossover_rate$ or $j=j_{rand}$}, \\ \mathbf{x}_{j, i, G}, & \text{otherwise}. \end{cases}`
        where :math:`j_{rand}` is an index representing a random problem dimension.

    Args:
        pop (numpy.ndarray[Individual]): Current population.
        ic (int): Index of individual being mutated.
        f (float): Scale factor.
        cr (float): Crossover probability.
        rng (numpy.random.Generator): Random generator.
        pbest_factor (float): Greediness factor.
        archive (numpy.ndarray): External archive.
        arc_ind_cnt (int): Number of individuals in the archive.
        task (Task): Optimization task.

    Returns:
        numpy.ndarray: mutated and mixed individual.

    """
    # Note: the population passed in the argument must be sorted by fitness!
    x_pbest = rng.integers(p_num)
    # a random individual is selected from the best p_num individuals of the population rng.integers
    p = [1 / (len(pop) - 1.0) if i != ic else 0 for i in range(len(pop))]
    r1 = rng.choice(len(pop), p=p)  # a random individual != to the current individual is selected from the population
    p = [1 / (len(pop) + arc_ind_cnt - 2.0) if i != ic and i != r1 else 0 for i in range(len(pop) + arc_ind_cnt)]
    r2 = rng.choice(len(pop) + arc_ind_cnt, p=p)
    # a second random individual != to the current individual and r1 is selected from the population U archive
    j = rng.integers(task.dimension)
    if r2 >= len(pop):
        r2 -= len(pop)
        v = [pop[ic][i] + f * (pop[x_pbest][i] - pop[ic][i]) + f * (pop[r1][i] - archive[r2][i])
             if rng.random() < cr or i == j else pop[ic][i] for i in range(task.dimension)]
        return parent_medium(np.asarray(v), pop[ic].x, task.lower, task.upper)
        # the mutant vector is repaired if needed

    else:
        v = [pop[ic][i] + f * (pop[x_pbest][i] - pop[ic][i]) + f * (pop[r1][i] - pop[r2][i])
             if rng.random() < cr or i == j else pop[ic][i] for i in range(task.dimension)]
        return parent_medium(np.asarray(v), pop[ic].x, task.lower, task.upper)
        # the mutant vector is repaired if needed


class SolutionSHADE(Individual):
    r"""Individual for SHADE algorithm.

    Attributes:
        differential_weight (float): Scale factor.
        crossover_probability (float): Crossover probability.

    See Also:
        :class:`niapy.algorithms.Individual`

    """

    def __init__(self, differential_weight=0.5, crossover_probability=0.5, **kwargs):
        r"""Initialize SolutionSHADE.

        Attributes:
            differential_weight (float): Scale factor.
            crossover_probability (float): Crossover probability.

        See Also:
            :func:`niapy.algorithm.Individual.__init__`

        """
        super().__init__(**kwargs)
        self.differential_weight = differential_weight
        self.crossover_probability = crossover_probability


[docs]class SuccessHistoryAdaptiveDifferentialEvolution(DifferentialEvolution): r"""Implementation of Success-history based adaptive differential evolution algorithm. Algorithm: Success-history based adaptive differential evolution algorithm Date: 2022 Author: Aleš Gartner License: MIT Reference paper: Ryoji Tanabe and Alex Fukunaga: Improving the Search Performance of SHADE Using Linear Population Size Reduction, Proc. IEEE Congress on Evolutionary Computation (CEC-2014), Beijing, July, 2014. Attributes: Name (List[str]): List of strings representing algorithm name extern_arc_rate (float): External archive size factor. pbest_factor (float): Greediness factor for current-to-pbest/1 mutation. hist_mem_size (int): Size of historical memory. See Also: * :class:`niapy.algorithms.basic.DifferentialEvolution` """ Name = ['SuccessHistoryAdaptiveDifferentialEvolution', 'SHADE']
[docs] @staticmethod def info(): r"""Get algorithm information. Returns: str: Algorithm information. See Also: * :func:`niapy.algorithms.Algorithm.info` """ return r"""Ryoji Tanabe and Alex Fukunaga: Improving the Search Performance of SHADE Using Linear Population Size Reduction, Proc. IEEE Congress on Evolutionary Computation (CEC-2014), Beijing, July, 2014."""
[docs] def __init__(self, population_size=540, extern_arc_rate=2.6, pbest_factor=0.11, hist_mem_size=6, *args, **kwargs): """Initialize SHADE. Args: population_size (Optional[int]): Population size. extern_arc_rate (Optional[float]): External archive size factor. pbest_factor (Optional[float]): Greediness factor for current-to-pbest/1 mutation. hist_mem_size (Optional[int]): Size of historical memory. See Also: * :func:`niapy.algorithms.basic.DifferentialEvolution.__init__` """ super().__init__(population_size, individual_type=kwargs.pop('individual_type', SolutionSHADE), *args, **kwargs) self.extern_arc_rate = extern_arc_rate self.pbest_factor = pbest_factor self.hist_mem_size = hist_mem_size
[docs] def set_parameters(self, population_size=540, extern_arc_rate=2.6, pbest_factor=0.11, hist_mem_size=6, **kwargs): r"""Set the parameters of an algorithm. Args: population_size (Optional[int]): Population size. extern_arc_rate (Optional[float]): External archive size factor. pbest_factor (Optional[float]): Greediness factor for current-to-pbest/1 mutation. hist_mem_size (Optional[int]): Size of historical memory. See Also: * :func:`niapy.algorithms.basic.DifferentialEvolution.set_parameters` """ super().set_parameters(population_size=population_size, individual_type=kwargs.pop('individual_type', SolutionSHADE), **kwargs) self.extern_arc_rate = extern_arc_rate self.pbest_factor = pbest_factor self.hist_mem_size = hist_mem_size
[docs] def get_parameters(self): r"""Get algorithm parameters. Returns: Dict[str, Any]: Algorithm parameters. """ d = DifferentialEvolution.get_parameters(self) d.update({ 'extern_arc_rate': self.extern_arc_rate, 'pbest_factor': self.pbest_factor, 'hist_mem_size': self.hist_mem_size, }) return d
[docs] def cauchy(self, loc, gamma): r"""Get cauchy random distribution with mean "loc" and standard deviation "gamma". Args: loc (float): Mean of the cauchy random distribution. gamma (float): Standard deviation of the cauchy random distribution. Returns: Union[numpy.ndarray[float], float]: Array of numbers. """ c = loc + gamma * np.tan(np.pi * (self.random() - 0.5)) return c if c > 0 else self.cauchy(loc, gamma)
[docs] def gen_ind_params(self, x, hist_cr, hist_f): r"""Generate new individual with new scale factor and crossover probability. Args: x (IndividualSHADE): Individual to apply function on. hist_cr (numpy.ndarray[float]): Historic values of crossover probability. hist_f (numpy.ndarray[float]): Historic values of scale factor. Returns: Individual: New individual with new parameters """ mi = self.integers(self.hist_mem_size) # a random pair of f cr is selected form historical memory m_cr = hist_cr[mi] m_f = hist_f[mi] cr = self.normal(m_cr, 0.1) if m_cr != -1 else 0 # cr is randomised from normal distribution and then repaired if needed cr = np.clip(cr, 0, 1) f = self.cauchy(m_f, 0.1) # f is randomised from cauchy distribution until the value is >0 and then repaired if needed f = np.clip(f, 0, 1) return self.individual_type(x=x.x, differential_weight=f, crossover_probability=cr, e=False)
[docs] def evolve(self, pop, hist_cr, hist_f, archive, arc_ind_cnt, task, **_kwargs): r"""Evolve current population. Args: pop (numpy.ndarray[IndividualSHADE]): Current population. hist_cr (numpy.ndarray[float]): Historic values of crossover probability. hist_f (numpy.ndarray[float]): Historic values of scale factor. archive (numpy.ndarray): External archive. arc_ind_cnt (int): Number of individuals in the archive. task (Task): Optimization task. Returns: numpy.ndarray: New population. """ new_pop = objects_to_array([self.gen_ind_params(xi, hist_cr, hist_f) for xi in pop]) p_num = np.int_(np.around(len(pop) * self.pbest_factor)) if p_num < 2: p_num = 2 # cr and f for mutation are computed for i, xi in enumerate(new_pop): new_pop[i].x = cross_curr2pbest1(pop, i, xi.differential_weight, xi.crossover_probability, self.rng, p_num, archive, arc_ind_cnt, task) # trial vectors are created for xi in new_pop: xi.evaluate(task, rng=self.random) return new_pop
[docs] def selection(self, pop, new_pop, archive, arc_ind_cnt, best_x, best_fitness, task, **kwargs): r"""Operator for selection. Args: pop (numpy.ndarray): Current population. new_pop (numpy.ndarray): New Population. archive (numpy.ndarray): External archive. arc_ind_cnt (int): Number of individuals in the archive. best_x (numpy.ndarray): Current global best solution. best_fitness (float): Current global best solutions fitness/objective value. task (Task): Optimization task. Returns: Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, int, numpy.ndarray, float]: 1. New selected individuals. 2. Scale factor values of successful new individuals. 3. Crossover probability values of successful new individuals. 4. Updated external archive. 5. Updated number of individuals in the archive. 6. New global best solution. 7. New global best solutions fitness/objective value. """ success_f = np.asarray([]) # array for storing successful f values success_cr = np.asarray([]) # array for storing successful cr values fitness_diff = np.asarray([]) # array for storing the difference of fitness of new individuals archive_size = np.int_(np.around(len(pop) * self.extern_arc_rate)) arr = np.copy(pop) for i, vi in enumerate(new_pop): if vi.f == pop[i].f: arr[i] = vi elif vi.f < pop[i].f: # trial vectors that have a better or equal fitness value are selected for the next generation if archive_size > 1: if arc_ind_cnt < archive_size: archive[arc_ind_cnt] = pop[i].x # parents that have worse fitness then their trial vector are stored into the external archive arc_ind_cnt += 1 else: rand_ind = self.integers(archive_size) # if the archive is full random archive members are replaced archive[rand_ind] = pop[i].x fitness_diff = np.append(fitness_diff, np.absolute(pop[i].f - vi.f)) success_f = np.append(success_f, vi.differential_weight) success_cr = np.append(success_cr, vi.crossover_probability) arr[i] = vi best_x, best_fitness = self.get_best(arr, np.asarray([ui.f for ui in arr]), best_x, best_fitness) return arr, success_f, success_cr, fitness_diff, archive, arc_ind_cnt, best_x, best_fitness
[docs] def post_selection(self, pop, arc, arc_ind_cnt, task, xb, fxb, **kwargs): r"""Post selection operator. Args: pop (numpy.ndarray): Current population. arc (numpy.ndarray): External archive. arc_ind_cnt (int): Number of individuals in the archive. task (Task): Optimization task. xb (numpy.ndarray): Global best solution. fxb (float): Global best fitness. Returns: Tuple[numpy.ndarray, numpy.ndarray, int, numpy.ndarray, float]: 1. Changed current population. 2. Updated external archive. 3. Updated number of individuals in the archive. 4. New global best solution. 5. New global best solutions fitness/objective value. """ return pop, arc, arc_ind_cnt, xb, fxb
[docs] def init_population(self, task): r"""Initialize starting population of optimization algorithm. Args: task (Task): Optimization task. Returns: Tuple[numpy.ndarray, numpy.ndarray, Dict[str, Any]]: 1. New population. 2. New population fitness values. 3. Additional arguments: * h_mem_cr (numpy.ndarray[float]): Historical values of crossover probability. * h_mem_f (numpy.ndarray[float]): Historical values of scale factor. * k (int): Historical memory current index. * archive (numpy.ndarray): External archive. * arc_ind_cnt (int): Number of individuals in the archive. See Also: * :func:`niapy.algorithms.Algorithm.init_population` """ pop, fitness, _ = DifferentialEvolution.init_population(self, task) # pop vectors are initialized randomly h_mem_cr = np.full(self.hist_mem_size, 0.5) h_mem_f = np.full(self.hist_mem_size, 0.5) # all values in the historical memory for parameters f and cr are initialized to 0.5 k = 0 # the starting memory position is set to 1 arc_size = np.int_(np.around(self.population_size * self.extern_arc_rate)) archive = np.zeros((arc_size, task.dimension)) # the external archive of max size pop_size * arc_rate is initialized arc_ind_cnt = 0 # the number of archive elements is set to 0 return pop, fitness, {'h_mem_cr': h_mem_cr, 'h_mem_f': h_mem_f, 'k': k, 'archive': archive, 'arc_ind_cnt': arc_ind_cnt}
[docs] def run_iteration(self, task, population, population_fitness, best_x, best_fitness, **params): r"""Core function of Success-history based adaptive differential evolution algorithm. Args: task (Task): Optimization task. population (numpy.ndarray): Current population. population_fitness (numpy.ndarray[float]): Current population function/fitness values. best_x (numpy.ndarray): Global best individual. best_fitness (float): Global best individual fitness/function value. **params (Dict[str, Any]): Additional arguments. Returns: Tuple[numpy.ndarray, numpy.ndarray[float], Dict[str, Any]]: 1. New population. 2. New population fitness/function values. 3. Additional arguments: * h_mem_cr (numpy.ndarray[float]): Historical values of crossover probability. * h_mem_f (numpy.ndarray[float]): Historical values of scale factor. * k (int): Historical memory current index. * archive (numpy.ndarray): External archive. * arc_ind_cnt (int): Number of individuals in the archive. """ h_mem_cr = params.pop('h_mem_cr') h_mem_f = params.pop('h_mem_f') k = params.pop('k') archive = params.pop('archive') arc_ind_cnt = params.pop('arc_ind_cnt') indexes = np.argsort(population_fitness) sorted_pop = population[indexes] # sorted population new_population = self.evolve(sorted_pop, h_mem_cr, h_mem_f, archive, arc_ind_cnt, task) # mutant vectors are created population, s_f, s_cr, fit_diff, archive, arc_ind_cnt, best_x, best_fitness = self.selection( sorted_pop, new_population, archive, arc_ind_cnt, best_x, best_fitness, task=task) # best individuals are selected for the next population num_of_success_params = len(s_f) if num_of_success_params > 0: # if children better than their parents were created the historical memory is updated m_sf_k = 0 m_cr_k = 0 sum_sf = 0 sum_cr = 0 diff_sum = np.sum(fit_diff) for i in range(num_of_success_params): weight = fit_diff[i] / diff_sum m_sf_k += weight * s_f[i] * s_f[i] sum_sf += weight * s_f[i] m_cr_k += weight * s_cr[i] * s_cr[i] sum_cr += weight * s_cr[i] h_mem_f[k] = m_sf_k / sum_sf # f and cr that are stored into the historic memory are calculated with the use of weighted Lehmer mean h_mem_cr[k] = -1 if sum_cr == 0 or h_mem_cr[k] == -1 else m_cr_k / sum_cr # the value of cr is updated if the previous value isn't terminal and max(s_cr)!=0 k = 0 if k + 1 >= self.hist_mem_size else k + 1 # historical memory position is increased, if over the size of the memory its set back to 1 population, archive, arc_ind_cnt, best_x, best_fitness = self.post_selection(population, archive, arc_ind_cnt, task, best_x, best_fitness) population_fitness = np.asarray([x.f for x in population]) best_x, best_fitness = self.get_best(population, population_fitness, best_x, best_fitness) return population, population_fitness, best_x, best_fitness, {'h_mem_cr': h_mem_cr, 'h_mem_f': h_mem_f, 'k': k, 'archive': archive, 'arc_ind_cnt': arc_ind_cnt}
[docs]class LpsrSuccessHistoryAdaptiveDifferentialEvolution(SuccessHistoryAdaptiveDifferentialEvolution): r"""Implementation of Success-history based adaptive differential evolution algorithm with Linear population size reduction. Algorithm: Success-history based adaptive differential evolution algorithm with Linear population size reduction Date: 2022 Author: Aleš Gartner License: MIT Reference paper: Ryoji Tanabe and Alex Fukunaga: Improving the Search Performance of SHADE Using Linear Population Size Reduction, Proc. IEEE Congress on Evolutionary Computation (CEC-2014), Beijing, July, 2014. Attributes: Name (List[str]): List of strings representing algorithm name See Also: * :class:`niapy.algorithms.modified.SuccessHistoryAdaptiveDifferentialEvolution` """ Name = ['LpsrSuccessHistoryAdaptiveDifferentialEvolution', 'L-SHADE']
[docs] def post_selection(self, pop, arc, arc_ind_cnt, task, xb, fxb, **kwargs): r"""Post selection operator. In this algorithm the post selection operator linearly reduces the population size. The size of external archive is also updated. Args: pop (numpy.ndarray): Current population. arc (numpy.ndarray): External archive. arc_ind_cnt (int): Number of individuals in the archive. task (Task): Optimization task. xb (numpy.ndarray): Global best solution. fxb (float): Global best fitness. Returns: Tuple[numpy.ndarray, numpy.ndarray, int, numpy.ndarray, float]: 1. Changed current population. 2. Updated external archive. 3. Updated number of individuals in the archive. 4. New global best solution. 5. New global best solutions fitness/objective value. """ pop_size = len(pop) max_nfe = task.max_evals nfe = task.evals next_pop_size = np.int_(np.around((((4.0 - self.population_size) / np.float_(max_nfe)) * nfe) + self.population_size)) if next_pop_size < 4: next_pop_size = 4 # the size of the next population is calculated # if the size of the new population is smaller than the current, # the worst pop_size - new_pop_size individuals are deleted if next_pop_size < pop_size: reduction = pop_size - next_pop_size for i in range(reduction): worst = 0 for j, e in enumerate(pop): worst = j if e.f > pop[worst].f else worst pop = np.delete(pop, worst) next_arc_size = np.int_(next_pop_size * self.extern_arc_rate) # the size of the new archive if arc_ind_cnt > next_arc_size: arc_ind_cnt = next_arc_size return pop, arc, arc_ind_cnt, xb, fxb