Source code for glyph.assessment

"""Some usefull classes/functions for the fitness assessment part in gp problems."""

import numpy as np
import scipy
import toolz
import functools
import warnings
import logging

import stopit
import deprecated

from glyph.gp.individual import _get_index
from glyph.utils import timeoutable


logger = logging.getLogger(__name__)


[docs]class SingleProcessFactory: map = map def __call__(self): return self
[docs]class AAssessmentRunner(object): def __init__(self, parallel_factory=SingleProcessFactory()): """Abstract runner for the (parallel) assessment of individuals in a population. Child classes have to at least override the measure() method, which might be executed in a different process or even on a remote machine depending on the parallelization scheme. Child classes may override the setup() method, which is executed once on object instantiation. Child classes may override the assign_fitness() method, which is executed in the main process. This can be usefull if you want to locally post-process the results of measure(), when collected from remote processes. :param parallel_factory: callable() -> obj, obj has to implement some kind of (parallel) map() method. """ self.parallel_factory = parallel_factory self._parallel = None self.setup()
[docs] def __call__(self, population): """Update the fitness of each individual in population that has an invalid fitness. :param population: a squence of individuals. """ if self._parallel is None: self._parallel = self.parallel_factory() invalid = [p for p in population if not p.fitness.valid] fitnesses = self._parallel.map(self.measure, invalid) for ind, fit in zip(invalid, fitnesses): self.assign_fitness(ind, fit) return len(invalid)
[docs] def __getstate__(self): """Modify pickling behavior for the class. All the attributes except 'parallel' can be pickled. """ state = self.__dict__.copy() del state["_parallel"] # Remove the unpicklable parallelization scheme. return state
[docs] def __setstate__(self, state): """Modify unpickling behavior for the class.""" self.__dict__.update(state) # Restore instance attributes. self._parallel = None # Restoring the parallelization scheme happens when the functor is called.
[docs] def setup(self): """Default implementation.""" pass
[docs] def assign_fitness(self, individual, fitness): """Assign a fitness value (as returned by self.measure()) to idividual. Default implementation. """ individual.fitness.values = fitness
[docs] def measure(self, individual): """Return a fitness value for individual.""" raise NotImplementedError
[docs]def measure(*funcs, pre=toolz.identity, post=toolz.identity): """ Combine several measurement functions into one. Optionaly do pre- and/or post-processing. :param funcs: a sequence of measure functions as returned by measure() (eg. `callable(*a, **kw) -> tuple`), and/or single valued functions (eg. `callable(*a, **kw)` -> numerical value). :param pre: some pre-processing function that is to be apllied on input *once* before passing the result to each function in funcs. :param post: some post-processing function that is to be apllied on the tuple of measure values as returned by the combined funcs. :returns: callable(input) -> tuple of measure values, where input is usually a phenotype (eg. an expression tree). """ def closure(*args, **kwargs) -> tuple: m = toolz.compose(post, _tt_flatten, toolz.juxt([tuple_wrap(f) for f in funcs]), pre) return m(*args, **kwargs) closure.size = sum(getattr(f, "size", 1) for f in funcs) return closure
[docs]def default_constants(ind): """Return a one for each different constant in the primitive set. :param ind: :type ind: `glyph.gp.individual.AExpressionTree` :returns: A value for each constant in the primitive set. """ if ind.pset.constants: consts_types = ind.pset.constants if len(consts_types) == 1 and "Symc" in consts_types: # symc case values = np.ones(len(_get_index(ind, consts_types[0]))) else: # sympy case values = np.ones(len(consts_types)) else: values = [] return values
[docs]def const_opt(measure, individual, lsq=False, default_constants=default_constants, f_kwargs=None, **kwargs): """Apply constant optimization :param measure: callable(individual, *f_args) -> scalar. :param individual: an individual tha is passed on to measure. :param bounds: bounds for the constant values (s. `scipy.optimize.minimize`). :param method: Type of solver. Should either be 'leastsq', or one of scipy.optimize.minimize's solvers. :returns: (popt, measure_opt), popt: the optimal values for the constants; measure_opt: the measure evaluated at popt. """ opt = scipy.optimize.minimize if not lsq else scipy.optimize.least_squares f_kwargs = f_kwargs or {} @functools.wraps(measure) def closure(consts): return measure(individual, *consts, **f_kwargs) p0 = default_constants(individual) popt = p0 measure_opt = None terminals = [t.name for t in individual.terminals] if any(constant in terminals for constant in individual.pset.constants): try: with warnings.catch_warnings(): # filter "unknown options for optimizer" warning warnings.filterwarnings("ignore", category=scipy.optimize.OptimizeWarning) res = opt(fun=closure, x0=p0, **kwargs) popt = res.x if res.x.shape else np.array([res.x]) measure_opt = res.fun if not res.success: logger.debug(f"Evaluating {str(individual)}: {res.message}") except ValueError: return p0, closure(p0) if measure_opt is None: measure_opt = closure(popt) return popt, measure_opt
[docs]@deprecated.deprecated(reason="Use const_opt instead.", version="0.4") def const_opt_scalar(*args, **kwargs): return const_opt(*args, **kwargs)
[docs]@deprecated.deprecated(reason="Use const_opt(*args, lsq=True, **kwargs) instead.", version="0.4") def const_opt_leastsq(measure, individual, default_constants=default_constants, f_kwargs=None, **kwargs): return const_opt( measure, individual, lsq=True, default_constants=default_constants, f_kwargs=f_kwargs, **kwargs )
[docs]def replace_nan(x, rep=np.infty): """Replace occurences of np.nan in x. :param x: Any data structure :type x: list, tuple, float, np.ndarray :param rep: value to replace np.nan with :returns: x without nan's """ @np.vectorize def repl(x): return rep if np.isnan(x) else x t = toolz.identity if isinstance(x, np.ndarray) else type(x) return t(repl(x))
def _tt_flatten(tt): """Flatten a tuple of tuples.""" return tuple(sum(tt, ())) def _argcount(func): """Return argument count of func.""" return func.__code__.co_argcount
[docs]def annotate(func, annotations): """Add annoations to func.""" if not hasattr(func, "__annoations__"): setattr(func, "__annotations__", annotations) else: func.__annotations__.update(annotations) return func
[docs]def returns(func, types): """Check func's annotation dictionary for return type tuple.""" try: return func.__annotations__["return"] in types except (AttributeError, KeyError): return False
[docs]def tuple_wrap(func): """Wrap func's return value into a tuple if it is not one already.""" types = tuple, list if returns(func, types=types): return func # No need to wrap. else: @functools.wraps(func) def closure(*args, **kwargs): res = func(*args, **kwargs) if isinstance(res, types): return tuple(res) else: return (res,) annotate(closure, {"return": tuple}) return closure
[docs]def max_fitness_on_timeout(max_fitness, timeout): """Decorate a function. Associate max_fitness with long running individuals. :param max_fitness: fitness of aborted individual calls. :param timeout: time until timeout :returns: fitness or max_fitness """ def decorate(f): @functools.wraps(f) def inner(*args, **kwargs): return timeoutable(default=max_fitness)(f)(*args, timeout=timeout, **kwargs) return inner return decorate
[docs]def expressional_complexity(ind): """Sum of length of all subtrees of the individual.""" return sum(len(ind[ind.searchSubtree(i)]) for i in range(len(ind)))
complexity_measures = { "ec_genotype": expressional_complexity, "ec_phenotype": lambda ind: expressional_complexity(ind.resolve_sc()), "num_nodes_genotype": len, "num_nodes_phenotype": lambda ind: len(ind.resolve_sc()), }