"""Convenience classes and functions that allow you to quickly build gp apps."""
import abc
import os
import sys
import time
import inspect
import random
import logging
import argparse
import operator
import dill
import numpy as np
import toolz
import deap
import deap.tools
from . import gp
from . import utils
from .assessment import SingleProcessFactory
logger = logging.getLogger(__name__)
[docs]def update_pareto_front(runner):
runner.pareto_front.update(runner.population)
[docs]def update_logbook_record(runner):
if not runner.mstats:
runner.mstats = create_stats(len(runner.population[0].fitness.values))
record = runner.mstats.compile(runner.population)
runner.logbook.record(gen=runner.step_count, evals=runner._evals, **record)
DEFAULT_CALLBACKS_GP_RUNNER = (update_pareto_front, update_logbook_record)
[docs]class GPRunner(object):
def __init__(
self, IndividualClass, algorithm_factory, assessment_runner, callbacks=DEFAULT_CALLBACKS_GP_RUNNER
):
"""Runner for gp problem sets.
Takes care of propper initialization, execution, and accounting of a gp run
(i.e. population creation, random state, generation count, hall of fame, and
logbook). The method init() has to be called once before stepping through
the evolution process with method step(); init() and step() invoke the
assessment runner.
:param IndividualClass: Class inherited from gp.AExpressionTree.
:param algorithm_factory: callable() -> gp algorithm, as defined in
gp.algorithms.
:param assessment_runner: callable(population) -> None, updates fitness
values of each invalid individual in population.
"""
self.IndividualClass = IndividualClass
self.algorithm_factory = algorithm_factory
self.assessment_runner = assessment_runner
self.pareto_front = []
self.logbook = ""
self.mstats = None
self.step_count = 0
self.callbacks = callbacks
[docs] def init(self, pop_size):
"""Initialize the gp run."""
self.pareto_front = deap.tools.ParetoFront()
self.logbook = deap.tools.Logbook()
self.mstats = None
self.step_count = 0
with utils.random_state(self):
self.population = self.IndividualClass.create_population(pop_size)
self.algorithm = self.algorithm_factory()
self._update()
[docs] def step(self):
"""Step through the evolution process."""
with utils.random_state(self):
self.population = self.algorithm.evolve(self.population)
self.step_count += 1
self._update()
def _update(self):
self._evals = self.assessment_runner(self.population)
for cb in self.callbacks:
cb(self)
[docs]def default_gprunner(Individual, assessment_runner, callbacks=DEFAULT_CALLBACKS_GP_RUNNER, **kwargs):
"""Create a default GPRunner instance.
For config options see `MateFactory`, `MutateFactory`, `AlgorithmFactory`.
"""
default_config = dict(
mating="cxonepoint",
mating_max_height=20,
mutation="mutuniform",
mutation_max_height=20,
algorithm="nsga2",
crossover_prob=0.5,
mutation_prob=0.2,
tournament_size=2,
select="nsga2",
create_method="halfandhalf",
create_min_height=1,
create_max_height=4,
mutate_tree_max=2,
mutate_tree_min=0,
)
default_config.update(kwargs)
mate = MateFactory.create(default_config, Individual)
mutate = MutateFactory.create(default_config, Individual)
select = SelectFactory.create(default_config)
create_method = CreateFactory.create(default_config, Individual)
AlgorithmFactory.create(
default_config, mate, mutate, select, create_method
) # A test run to check config params.
algorithm_factory = toolz.partial(
AlgorithmFactory.create, default_config, mate, mutate, select, create_method
)
return GPRunner(Individual, algorithm_factory, assessment_runner, callbacks=callbacks)
[docs]def make_checkpoint(app):
valid_checkpointing = app.checkpoint_file is not None and isinstance(app.args.checkpoint_frequency, int)
if not valid_checkpointing:
logger.warning("ValueError in checkpointing settings.")
return
if app.gp_runner.step_count % app.args.checkpoint_frequency == 0:
app.checkpoint()
logger.debug(f"Saved checkpoint to {app.checkpoint_file}.")
[docs]def log(app):
for line in app.gp_runner.logbook.stream.splitlines():
logger.info(line)
DEFAULT_CALLBACKS = make_checkpoint, log
[docs]class Application(object):
def __init__(self, config, gp_runner, checkpoint_file=None, callbacks=DEFAULT_CALLBACKS):
"""An application based on `GPRunner`.
Controls execution of the runner and adds checkpointing and logging
functionality; also defines a set of available command line options and
their default values.
To create a full console application one can use the factory function
default_console_app().
:param config: Container holding all configs
:type config: dict or argparse.Namespace
:param gp_runner: Instance of `GPRunner`
:param checkpoint_file: Path to checkpoint_file
:param callbacks:
"""
self.args = to_argparse_namespace(config)
self.gp_runner = gp_runner
self.checkpoint_file = checkpoint_file
self.pareto_fronts = []
self._initialized = False
self.callbacks = callbacks
@property
def assessment_runner(self):
return self.gp_runner.assessment_runner
@property
def logbook(self):
return self.gp_runner.logbook
[docs] def run(self, break_condition=None):
"""Run gp app.
:param break_condition: is called after every evolutionary step.
:type break_condition: callable(application)
:return: number of iterations executed during run.
"""
if break_condition is None:
break_condition = lambda app: False
iterations = 0
if self.args.pop_size < 1:
return iterations
if not self._initialized:
random.seed(self.args.seed)
self.gp_runner.init(self.args.pop_size)
self._update()
iterations += 1
while self.gp_runner.step_count < self.args.num_generations and not break_condition(self):
self.gp_runner.step()
self._update()
iterations += 1
return iterations
def _update(self):
for cb in self.callbacks:
try:
logger.debug(f"Running callback {cb}.")
cb(self)
except Exception as e:
logger.error(f"Error during execution of {cb}")
logger.warning(e)
[docs] def checkpoint(self):
"""Checkpoint current state of evolution."""
safe(
self.checkpoint_file,
args=self.args,
runner=self.gp_runner,
random_state=random.getstate(),
pareto_fronts=self.pareto_fronts,
callbacks=self.callbacks,
)
@property
def workdir(self):
return os.path.dirname(os.path.abspath(self.checkpoint_file))
[docs] @classmethod
def from_checkpoint(cls, file_name):
"""Create application from checkpoint file."""
cp = load(file_name)
app = cls(cp["args"], cp["runner"], file_name, callbacks=cp["callbacks"])
app.pareto_fronts = cp["pareto_fronts"]
app._initialized = True
random.setstate(cp["random_state"])
return app
[docs] @staticmethod
def add_options(parser):
"""Add available parser options."""
parser.add_argument(
"--pop_size",
"-p",
dest="pop_size",
metavar="n",
type=utils.argparse.non_negative_int,
default=10,
help="initial population size (default: 10)",
)
parser.add_argument(
"--num_generations",
"-n",
dest="num_generations",
metavar="n",
type=utils.argparse.non_negative_int,
default=10,
help="number of generations to evolve (default: 10)",
)
parser.add_argument(
"--seed",
dest="seed",
metavar="n",
type=utils.argparse.non_negative_int,
default=random.randint(0, sys.maxsize),
help="a seed for the random genrator (default: random.randint(0, sys.maxsize))",
)
parser.add_argument(
"--checkpoint_frequency",
"-f",
dest="checkpoint_frequency",
metavar="n",
type=utils.argparse.positive_int,
default=1,
help="do checkpointing every n generations (default: 1)",
)
[docs]def default_console_app(
IndividualClass, AssessmentRunnerClass, parser=argparse.ArgumentParser(), callbacks=DEFAULT_CALLBACKS
):
"""Factory function for a console application."""
Application.add_options(parser)
cp_group = parser.add_mutually_exclusive_group(required=False)
cp_group.add_argument(
"--resume",
dest="resume_file",
metavar="FILE",
type=str,
help="continue previous run from a checkpoint file",
)
cp_group.add_argument(
"-o",
dest="checkpoint_file",
metavar="FILE",
type=str,
default=os.path.join(".", "checkpoint.pickle"),
help="checkpoint to FILE (default: ./checkpoint.pickle)",
)
parser.add_argument(
"--verbose",
"-v",
dest="verbosity",
action="count",
default=0,
help="set verbose output; raise verbosity level with -vv, -vvv, -vvvv",
)
parser.add_argument(
"--logging_config",
"-l",
type=str,
default="logging.yaml",
help="set config file for logging; overides --verbose (default: logging.yaml)",
)
AlgorithmFactory.add_options(parser.add_argument_group("algorithm"))
group_breeding = parser.add_argument_group("breeding")
MateFactory.add_options(group_breeding)
MutateFactory.add_options(group_breeding)
SelectFactory.add_options(group_breeding)
CreateFactory.add_options(group_breeding)
ParallelizationFactory.add_options(parser.add_argument_group("parallel execution"))
args = parser.parse_args()
workdir = os.path.dirname(os.path.abspath(args.checkpoint_file))
if not os.path.exists(workdir):
raise RuntimeError('Path does not exist: "{}"'.format(workdir))
log_level = utils.logging.log_level(args.verbosity)
utils.logging.load_config(
config_file=args.logging_config, level=log_level, placeholders=dict(workdir=workdir)
)
if args.resume_file is not None:
logger.debug("Loading checkpoint {}".format(args.resume_file))
app = Application.from_checkpoint(args.resume_file)
return app, args
else:
mate = MateFactory.create(args, IndividualClass)
mutate = MutateFactory.create(args, IndividualClass)
select = SelectFactory.create(args)
create_method = CreateFactory.create(args, IndividualClass)
algorithm_factory = toolz.partial(AlgorithmFactory.create, args, mate, mutate, select, create_method)
parallel_factory = toolz.partial(ParallelizationFactory.create, args)
assessment_runner = AssessmentRunnerClass(parallel_factory)
gp_runner = GPRunner(IndividualClass, algorithm_factory, assessment_runner)
app = Application(args, gp_runner, args.checkpoint_file, callbacks=callbacks)
return app, args
[docs]class AFactory(object):
_mapping = {}
[docs] @classmethod
def create(cls, config, *args, **kwargs):
config = to_argparse_namespace(config)
return cls._create(config, *args, **kwargs)
[docs] @staticmethod
@abc.abstractmethod
def add_options(parser):
"""Add available parser options."""
raise NotImplementedError
[docs] @classmethod
def get_from_mapping(cls, key):
try:
func = cls._mapping[key]
except KeyError:
raise RuntimeError(f"Option {key} not supported")
return func
[docs]def get_mapping(group):
return {obj.__name__.lower(): obj for obj in group}
[docs]class AlgorithmFactory(AFactory):
"""Factory class for gp algorithms."""
_mapping = get_mapping(gp.all_algorithms)
@classmethod
def _create(cls, args, mate_func, mutate_func, select, create_func):
"""Setup gp algorithm."""
kwargs = locals().copy()
args.algorithm = args.algorithm.lower()
algorithm_class = cls.get_from_mapping(args.algorithm)
signature = inspect.signature(algorithm_class)
algorithm = algorithm_class(*tuple(kwargs.get(x) for x in signature.parameters))
algorithm.crossover_prob = args.crossover_prob
algorithm.mutation_prob = args.mutation_prob
return algorithm
[docs] @staticmethod
def add_options(parser):
"""Add available parser options."""
parser.add_argument(
"--algorithm",
type=str,
default="nsga2",
choices=list(AlgorithmFactory._mapping.keys()),
help="the gp algorithm (default: nsga2)",
)
parser.add_argument(
"--crossover_prob",
metavar="p",
type=utils.argparse.unit_interval,
default=0.5,
help="crossover probability for mating (default: 0.5)",
)
parser.add_argument(
"--mutation_prob",
metavar="p",
type=utils.argparse.unit_interval,
default=0.2,
help="mutation probability (default: 0.2)",
)
parser.add_argument(
"--tournament_size",
dest="tournament_size",
metavar="n",
type=utils.argparse.positive_int,
default=2,
help="tournament size for tournament selection (default: 2)",
)
[docs]class MateFactory(AFactory):
"""Factory class for gp mating functions."""
_mapping = get_mapping(gp.all_crossover)
@staticmethod
def _create(args, IndividualClass):
"""Setup mating function."""
args.mating = args.mating.lower()
mate = MateFactory.get_from_mapping(args.mating)(**vars(args))
static_limit_decorator = deap.gp.staticLimit(
key=operator.attrgetter("height"), max_value=args.mating_max_height
)
mate = static_limit_decorator(mate)
return mate
[docs] @staticmethod
def add_options(parser):
"""Add available parser options."""
parser.add_argument(
"--mating",
dest="mating",
type=str,
default="cxonepoint",
choices=list(MateFactory._mapping.keys()),
help="the mating method (default: cxonepoint)",
)
parser.add_argument(
"--mating-max-height",
dest="mating_max_height",
metavar="n",
type=utils.argparse.positive_int,
default=20,
help="limit for the expression tree height as a result of mating (default: 20)",
)
[docs]class MutateFactory(AFactory):
"""Factory class for gp mutation functions."""
_mapping = get_mapping(gp.all_mutations)
@staticmethod
def _create(args, IndividualClass):
"""Setup mutation function."""
args.mutation = args.mutation.lower()
mutate = MutateFactory.get_from_mapping(args.mutation)(IndividualClass.pset, **vars(args))
static_limit_decorator = deap.gp.staticLimit(
key=operator.attrgetter("height"), max_value=args.mutation_max_height
)
mutate = static_limit_decorator(mutate)
return mutate
[docs] @staticmethod
def add_options(parser):
"""Add available parser options."""
parser.add_argument(
"--mutation",
dest="mutation",
type=str,
default="mutuniform",
choices=list(MutateFactory._mapping.keys()),
help="the mutation method (default: mutuniform)",
)
parser.add_argument(
"--mutation-max-height",
dest="mutation_max_height",
metavar="n",
type=utils.argparse.positive_int,
default=20,
help="limit for the expression tree height as a result of mutation (default: 20)",
)
parser.add_argument(
"--mutate_tree_min",
dest="mutate_tree_min",
default=0,
metavar="min_",
type=utils.argparse.positive_int,
help="minimum value for tree based mutation methods (default: 0)",
)
parser.add_argument(
"--mutate_tree_max",
dest="mutate_tree_max",
default=2,
metavar="max_",
type=utils.argparse.positive_int,
help="maximum value for tree based mutation methods (default: 2)",
)
[docs]class SelectFactory(AFactory):
"""Factory class for selection"""
_mapping = {
"nsga2": deap.tools.selNSGA2,
"spea2": deap.tools.selSPEA2,
}
@staticmethod
def _create(args):
args.select = args.select.lower()
return SelectFactory.get_from_mapping(args.select)
[docs] @staticmethod
def add_options(parser):
"""Add available parser options."""
parser.add_argument(
"--select",
dest="select",
type=str,
default="nsga2",
choices=list(SelectFactory._mapping.keys()),
help="the selection method (default: nsga2)",
)
[docs]class CreateFactory(AFactory):
"""Factory class for creation"""
_mapping = {"halfandhalf": deap.gp.genHalfAndHalf}
@staticmethod
def _create(args, IndividualClass):
args.create_method = args.create_method.lower()
m = CreateFactory.get_from_mapping(args.create_method)
create_ = toolz.partial(
IndividualClass.create_population,
gen_method=m,
min=args.create_min_height,
max=args.create_max_height,
)
return create_
[docs] def add_options(parser):
parser.add_argument(
"--create_method",
dest="create_method",
type=str,
default="halfandhalf",
choices=list(CreateFactory._mapping.keys()),
help="the create method (default: halfandhalf)",
)
parser.add_argument(
"--create_max_height",
dest="create_max_height",
default=4,
type=utils.argparse.positive_int,
help="maximum value for tree based create methods (default: 4)",
)
parser.add_argument(
"--create_min_height",
dest="create_min_height",
default=1,
type=utils.argparse.positive_int,
help="maximum value for tree based create methods (default: 1)",
)
[docs]class ParallelizationFactory(AFactory):
"""Factory class for parallel execution schemes."""
@staticmethod
def _create(args):
return SingleProcessFactory
[docs] @staticmethod
def add_options(parser):
"""Add available parser options."""
# todo
[docs]class ConstraintsFactory(AFactory):
[docs] @staticmethod
def add_options(parser):
parser.add_argument(
"--constraints_timeout",
type=utils.argparse.non_negative_int,
default=60,
help="Seconds before giving up and using a new random individual (default: 60)",
)
parser.add_argument(
"--constraints_n_retries",
type=utils.argparse.non_negative_int,
default=30,
help="Number of genetic operation before giving up and using a new random individual (default: 30)",
)
parser.add_argument(
"--constraints_zero",
action="store_false",
default=True,
help="Discard zero individuals (default: True)",
)
parser.add_argument(
"--constraints_constant",
action="store_false",
default=True,
help="Discard constant individuals (default: True)",
)
parser.add_argument(
"--constraints_infty",
action="store_false",
default=True,
help="Discard individuals with infinities (default: True)",
)
parser.add_argument("--constraints_pretest", default=False, help="Path to pretest file.")
parser.add_argument(
"--constraints_pretest_function", type=str, default="chi", help="Path to pretest file."
)
parser.add_argument(
"--constraints_pretest_service", action="store_true", help="Use service for pretesting."
)
@staticmethod
def _create(config, com=None):
constraints = []
if config.constraints_zero or config.constraints_infty or config.constraints_constant:
constraints.append(
gp.NonFiniteExpression(
zero=config.constraints_zero,
infty=config.constraints_infty,
constant=config.constraints_constant,
)
)
if config.constraints_pretest:
constraints.append(gp.PreTest(config.constraints_pretest, fun=config.constraints_pretest_function))
# if config.constraints_pretest_service: # todo (enable after com refactor)
# constraints.append(gp.PreTestService(com))
return gp.Constraint(constraints)
[docs]def safe(file_name, **kwargs):
"""Dump kwargs to file."""
with open(file_name, "wb") as file:
dill.dump(kwargs, file)
[docs]def load(file_name):
"""Load data saved with safe()."""
with open(file_name, "rb") as file:
cp = dill.load(file)
return cp
[docs]def create_tmp_dir(prefix="run-"):
"""Create directory with current time as signature."""
start_date_str = time.strftime("%Y-%m-%d-%H%M%S", time.localtime())
workdir = os.path.relpath(prefix + start_date_str)
os.mkdir(workdir)
return workdir
def _create_logger(verbosity, config_file, workdir):
log_level = utils.logging.log_level(verbosity)
utils.logging.load_config(
config_file=config_file, default_level=log_level, placeholders=dict(workdir=workdir)
)
return logging.getLogger(__name__)
[docs]def create_stats(n):
"""Create deap.tools.MultiStatistics object for n fitness values."""
def val(i, ind):
return ind.fitness.values[i]
stats = dict()
for i in range(n):
stats["fit{}".format(i)] = deap.tools.Statistics(toolz.partial(val, i))
mstats = deap.tools.MultiStatistics(**stats)
mstats.register("min", np.nanmin)
mstats.register("max", np.nanmax)
return mstats
[docs]def to_argparse_namespace(d):
"""Return argparse.Namespace object created from dictionary d."""
if isinstance(d, argparse.Namespace):
return d
elif isinstance(d, dict):
return argparse.Namespace(**d)
else:
raise RuntimeError("Cannot convert {} to argparse.Namespace.".format(type(d)))