Source code for niapy.task

# encoding=utf8

"""The implementation of tasks."""

import logging
from enum import Enum

import numpy as np
from matplotlib import pyplot as plt
import matplotlib.ticker as ticker
from niapy.problems import Problem
from import limit
from niapy.util.factory import get_problem

logger = logging.getLogger("niapy.task.Task")

[docs]class OptimizationType(Enum): r"""Enum representing type of optimization. Attributes: MINIMIZATION (int): Represents minimization problems and is default optimization type of all algorithms. MAXIMIZATION (int): Represents maximization problems. """ MINIMIZATION = 1.0 MAXIMIZATION = -1.0
[docs]class Task: r"""Class representing an optimization task. Date: 2019 Author: Klemen Berkovič and others Attributes: problem (Problem): Optimization problem. dimension (int): Dimension of the problem. lower (numpy.ndarray): Lower bounds of the problem. upper (numpy.ndarray): Upper bounds of the problem. range (numpy.ndarray): Search range between upper and lower limits. optimization_type (OptimizationType): Optimization type to use. iters (int): Number of algorithm iterations/generations. evals (int): Number of function evaluations. max_iters (int): Maximum number of algorithm iterations/generations. max_evals (int): Maximum number of function evaluations. cutoff_value (float): Reference function/fitness values to reach in optimization. x_f (float): Best found individual function/fitness value. """
[docs] def __init__(self, problem=None, dimension=None, lower=None, upper=None, optimization_type=OptimizationType.MINIMIZATION, repair_function=limit, max_evals=np.inf, max_iters=np.inf, cutoff_value=None, enable_logging=False): r"""Initialize task class for optimization. Args: problem (Union[str, Problem]): Optimization problem. dimension (Optional[int]): Dimension of the problem. Will be ignored if problem is instance of the `Problem` class. lower (Optional[Union[float, Iterable[float]]]): Lower bounds of the problem. Will be ignored if problem is instance of the `Problem` class. upper (Optional[Union[float, Iterable[float]]]): Upper bounds of the problem. Will be ignored if problem is instance of the `Problem` class. optimization_type (Optional[OptimizationType]): Set the type of optimization. Default is minimization. repair_function (Optional[Callable[[numpy.ndarray, numpy.ndarray, numpy.ndarray, Dict[str, Any]], numpy.ndarray]]): Function for repairing individuals components to desired limits. max_evals (Optional[int]): Number of function evaluations. max_iters (Optional[int]): Number of generations or iterations. cutoff_value (Optional[float]): Reference value of function/fitness function. enable_logging (Optional[bool]): Enable/disable logging of improvements. """ if isinstance(problem, str): params = dict(dimension=dimension, lower=lower, upper=upper) params = {key: val for key, val in params.items() if val is not None} self.problem = get_problem(problem, **params) elif isinstance(problem, Problem): self.problem = problem if dimension is not None or lower is not None or upper is not None: logger.warning('An instance of the Problem class was passed in, `dimension`, `lower` and `upper` parameters will be ignored.') else: raise TypeError('Unsupported type for problem: {}'.format(type(problem))) self.optimization_type = optimization_type self.dimension = self.problem.dimension self.lower = self.problem.lower self.upper = self.problem.upper self.range = self.upper - self.lower self.repair_function = repair_function self.iters = 0 self.evals = 0 self.cutoff_value = -np.inf * optimization_type.value if cutoff_value is None else cutoff_value self.enable_logging = enable_logging self.x_f = np.inf * optimization_type.value self.max_evals = max_evals self.max_iters = max_iters self.n_evals = [] self.fitness_evals = [] # fitness improvements at self.n_evals evaluations self.fitness_iters = [] # best fitness at each iteration
[docs] def repair(self, x, rng=None): r"""Repair solution and put the solution in the random position inside of the bounds of problem. Args: x (numpy.ndarray): Solution to check and repair if needed. rng (Optional[numpy.random.Generator]): Random number generator. Returns: numpy.ndarray: Fixed solution. See Also: * :func:`` * :func:`` * :func:`` * :func:`` * :func:`` """ return self.repair_function(x, self.lower, self.upper, rng=rng)
[docs] def next_iter(self): r"""Increments the number of algorithm iterations.""" self.fitness_iters.append(self.x_f) self.iters += 1
[docs] def eval(self, x): r"""Evaluate the solution A. Args: x (numpy.ndarray): Solution to evaluate. Returns: float: Fitness/function values of solution. """ if self.stopping_condition(): return np.inf self.evals += 1 x_f = self.problem.evaluate(x) * self.optimization_type.value if x_f < self.x_f * self.optimization_type.value: self.x_f = x_f * self.optimization_type.value self.n_evals.append(self.evals) self.fitness_evals.append(x_f) if self.enable_logging:'evals:%d => %s' % (self.evals, self.x_f)) return x_f
[docs] def is_feasible(self, x): r"""Check if the solution is feasible. Args: x (Union[numpy.ndarray, Individual]): Solution to check for feasibility. Returns: bool: `True` if solution is in feasible space else `False`. """ return np.all((x >= self.lower) & (x <= self.upper))
[docs] def stopping_condition(self): r"""Check if optimization task should stop. Returns: bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`. """ return (self.evals >= self.max_evals) or (self.iters >= self.max_iters) or (self.cutoff_value * self.optimization_type.value >= self.x_f * self.optimization_type.value)
[docs] def stopping_condition_iter(self): r"""Check if stopping condition reached and increase number of iterations. Returns: bool: `True` if number of function evaluations or number of algorithm iterations/generations or reference values is reach else `False`. """ r = self.stopping_condition() self.next_iter() return r
[docs] def convergence_data(self, x_axis='iters'): r"""Get values of x and y-axis for plotting covariance graph. Args: x_axis (Literal['iters', 'evals']): Quantity to be displayed on the x-axis. Either 'iters' or 'evals'. Returns: Tuple[np.ndarray, np.ndarray]: 1. array of function evaluations. 2. array of fitness values. """ if x_axis == 'iters': return np.arange(self.iters), np.array(self.fitness_iters) else: # x_axis == 'evals' r1, r2 = [], [] for i, v in enumerate(self.n_evals): r1.append(v) r2.append(self.fitness_evals[i]) if i >= len(self.n_evals) - 1: break diff = self.n_evals[i + 1] - v if diff <= 1: continue for j in range(diff - 1): r1.append(v + j + 1) r2.append(self.fitness_evals[i]) return np.array(r1), np.array(r2)
[docs] def plot_convergence(self, x_axis='iters', title='Convergence Graph'): """Plot a simple convergence graph. Args: x_axis (Literal['iters', 'evals']): Quantity to be displayed on the x-axis. Either 'iters' or 'evals'. title (str): Title of the graph. """ x, fitness = self.convergence_data(x_axis) _, ax = plt.subplots() ax.plot(x, fitness) ax.xaxis.set_major_locator(ticker.MaxNLocator(integer=True)) if x_axis == 'iters': plt.xlabel('Iterations') else: plt.xlabel('Fitness Evaluations') plt.ylabel('Fitness') plt.title(title)