"""Provide Individual class for gp."""
import abc
import copy
import functools
import itertools
import logging
import re
import sys
import deap.base
import deap.gp
import numpy as np
import sympy
import glyph.utils
logger = logging.getLogger(__name__)
[docs]def sc_qout(x, y):
"""SC is the quotient of the number of nodes of its left and right child-trees x and y"""
return x / y
[docs]def sc_mmqout(x, y, cmin=-1, cmax=1):
"""SC is the minimum-maximum quotient of the number of nodes of both
child-trees x and y mapped into the constant interval [cmin, cmax]"""
return cmin + min(x, y) / max(x, y) * (cmax - cmin)
[docs]class StructConst(deap.gp.Primitive):
def __init__(self, func, arity=2):
"""
:param func: evaluate left and right subtree and assign a constant.
"""
self.func = func
super().__init__("SC", [deap.gp.__type__] * arity, deap.gp.__type__)
[docs] @staticmethod
def get_len(expr, tokens=("(,")):
regex = "|".join("\\{}".format(t) for t in tokens)
return len(re.split(regex, expr))
[docs] def as_terminal(self, *args):
vals = list(map(len, args))
return deap.gp.Terminal(self.func(*vals), False, deap.gp.__type__)
[docs]def add_sc(pset, func):
"""Adds a structural constant to a given primitive set.
:param func: `callable(x, y) -> float` where x and y are the expressions of the left and right subtree
:param pset: You may want to use `sympy_primitive_set` or `numpy_primitive_set` without symbolic constants.
:type pset: `deap.gp.PrimitiveSet`
"""
sc = StructConst(func)
pset._add(sc)
pset.prims_count += 1
return pset
def _build_args_string(pset, consts):
args = ",".join(arg for arg in pset.args)
if consts:
if args:
args += ","
args += ",".join("{}=1.0".format(arg) for arg in consts)
return args
[docs]def sympy_primitive_set(
categories=("algebraic", "trigonometric", "exponential"), arguments=["y_0"], constants=[]
):
"""Create a primitive set with sympy primitves.
:param arguments: variables to use in primitive set
:param constants: symbolic constants to use in primitive set
:param categories: an optional list of function categories for the primitive set. The following are available
'algebraic', 'neg', 'trigonometric', 'exponential', 'exponential', 'logarithm', 'sqrt'.
:return: `deap.gp.PrimitiveSet`
"""
pset = deap.gp.PrimitiveSet("main", arity=0)
if "algebraic" in categories:
pset.addPrimitive(sympy.Add, arity=2)
pset.addPrimitive(sympy.Mul, arity=2)
if "neg" in categories:
pset.addPrimitive(functools.partial(sympy.Mul, -1.0), arity=1, name="Neg")
if "trigonometric" in categories:
pset.addPrimitive(sympy.sin, arity=1)
pset.addPrimitive(sympy.cos, arity=1)
if "exponential" in categories:
pset.addPrimitive(sympy.exp, arity=1)
if "logarithm" in categories:
pset.addPrimitive(sympy.ln, arity=1)
if "sqrt" in categories:
pset.addPrimitive(sympy.sqrt, arity=1)
# Use sympy symbols for argument representation.
for symbol in itertools.chain(arguments, constants):
pset.addTerminal(sympy.Symbol(symbol), name=symbol)
pset.args = arguments
pset.constants = constants
return pset
[docs]def sympy_phenotype(individual):
"""Compile python function from individual.
Uses sympy's lambdify function. Terminals from the primitive set will be
used as parameters to the constructed lambda function; primitives (like
sympy.exp) will be converted into numpy expressions (eg. numpy.exp).
:param individual:
:type individual: `glyph.gp.individual.AExpressionTree`
:return: lambda function
"""
pset = individual.pset
args = _build_args_string(pset, pset.constants)
expr = sympy.sympify(deap.gp.compile(str(individual), pset))
func = sympy.utilities.lambdify(args, expr, modules="numpy")
return func
[docs]def numpy_primitive_set(arity, categories=("algebraic", "trigonometric", "exponential", "symc")):
"""Create a primitive set based on numpys vectorized functions.
:param arity: Number of variables in the primitive set
:param categories:
:return: `deap.gp.PrimitiveSet`
.. note :: All functions will be closed, that is non-defined values will be mapped to 1. 1/0 = 1!
"""
pset = deap.gp.PrimitiveSet("main", arity)
# Use primitive set built-in for argument representation.
pset.renameArguments(**{"ARG{}".format(i): "x_{}".format(i) for i in range(arity)})
pset.args = pset.arguments
if "symc" in categories:
symc = 1.0
pset.addTerminal(symc, "Symc")
pset.constants = ["Symc"]
else:
pset.constants = []
def close_function(func, value):
@functools.wraps(func)
@glyph.utils.numeric.silent_numpy
def closure(*args):
res = func(*args)
if isinstance(res, np.ndarray):
res[np.isinf(res)] = value
res[np.isnan(res)] = value
elif np.isinf(res) or np.isnan(res):
res = value
return res
return closure
if "algebraic" in categories:
pset.addPrimitive(np.add, 2, name="Add")
pset.addPrimitive(np.subtract, 2, name="Sub")
pset.addPrimitive(np.multiply, 2, name="Mul")
_div = close_function(np.divide, 1)
pset.addPrimitive(_div, 2, name="Div")
if "trigonometric" in categories:
pset.addPrimitive(np.sin, arity=1)
pset.addPrimitive(np.cos, arity=1)
if "exponential" in categories:
pset.addPrimitive(np.exp, arity=1)
_log = close_function(np.log, 1)
pset.addPrimitive(_log, arity=1)
if "sqrt" in categories:
_sqrt = close_function(np.sqrt, 1)
pset.addPrimitive(_sqrt, arity=1)
if "constants" in categories:
pset.addTerminal(np.pi, name="pi")
pset.addTerminal(np.e, name="e")
return pset
def _get_index(ind, c):
return [i for i, node in enumerate(ind) if node.name == c]
[docs]def numpy_phenotype(individual):
""" Lambdify the individual
:param individual:
:type individual: `glyph.gp.individual.AExpressionTree`
:return: lambda function
:Note:
In constrast to sympy_phenotype the callable will have a variable number of keyword arguments depending on the number
of symbolic constants in the individual.
:Example:
>>> pset = numpy_primitive_set(1)
>>> MyIndividual = Individual(pset=pset)
>>> ind = MyIndividual.from_string("Add(x_0, Symc)")
>>> f = numpy_phenotype(ind)
>>> f(1, 1)
2
"""
pset = individual.pset
if pset.constants:
c = pset.constants[0]
index = _get_index(individual, c)
else:
index = []
consts = ["c_{}".format(i) for i in range(len(index))]
args = _build_args_string(pset, consts)
expr = str(individual)
for c_ in consts:
expr = expr.replace(c, c_, 1)
func = sympy.utilities.lambdify(args, expr, modules=pset.context)
return func
[docs]class Individual(type):
"""Metaclass to set the primitive set in ExpressionTree types."""
[docs] def __new__(mcs, pset, name="MyIndividual", **kwargs):
"""Construct a new expression tree type.
Args:
pset: :class:`deap.gp.PrimitiveSet`
name: name of the expression tree class
kwargs: additional attributes
Returns: expression tree class
"""
cls = super().__new__(mcs, name, (AExpressionTree,), dict(pset=pset, **kwargs))
setattr(sys.modules[__name__], name, cls)
return cls
def __init__(cls, pset, name="MyIndividual", **kwargs): # noqa
super().__init__(name, (AExpressionTree,), dict(pset=pset, **kwargs))
[docs]class AExpressionTree(deap.gp.PrimitiveTree):
hasher = str
def __init__(self, content):
"""Abstract base class for the genotype.
Derived classes need to specify a primitive set from which the expression
tree can be build, as well as a phenotype method.
"""
super(AExpressionTree, self).__init__(content)
self.fitness = Measure()
def __repr__(self):
return self.to_polish(replace_struct=False)
def __str__(self):
return self.to_polish()
[docs] def resolve_sc(self):
"""Evaluate StructConst in individual top to bottom."""
nodes = type(self)(self[:])
# structure based constants need to be evaluated top to bottom first
is_sc = lambda n: isinstance(n, StructConst)
while any(n for n in nodes if is_sc(n)):
# get the first structure based constant
i, node = next(filter((lambda x: is_sc(x[1])), enumerate(nodes)))
slice_ = nodes.searchSubtree(i)
# find its subtree
subtree = type(self)(nodes[slice_])
# replace the subtree by a Terminal representing its numeric value
# child_trees helper function yields a list of tree which are used as arguments by the first primitive
args = list(child_trees(subtree))
nodes[slice_] = [node.as_terminal(*args)]
return nodes
[docs] def to_polish(self, for_sympy=False, replace_struct=True):
"""Symbolic representation of the expression tree."""
nodes = self.resolve_sc() if replace_struct else self
repr = ""
stack = []
for node in nodes:
stack.append((node, []))
while len(stack[-1][1]) == stack[-1][0].arity:
prim, args = stack.pop()
repr = prim.format(*args) if not for_sympy else convert_inverse_prim(prim, args)
if len(stack) == 0:
break
stack[-1][1].append(repr)
return repr
def __hash__(self):
return hash(self.hasher(self))
def __eq__(self, other):
return hash(self) == hash(other)
@property
def const_opt(self):
return getattr(self, "popt", ())
@property
def terminals(self):
"""Return terminals that occur in the expression tree."""
return [primitive for primitive in self if primitive.arity == 0]
@property
@abc.abstractmethod
def pset(self):
pass
[docs] @classmethod
def from_string(cls, string):
return super(AExpressionTree, cls).from_string(string, cls.pset)
[docs] @classmethod
def create(cls, gen_method=deap.gp.genHalfAndHalf, min=1, max=4):
return cls(gen_method(cls.pset, min_=min, max_=max))
[docs] @classmethod
def create_population(cls, size, gen_method=deap.gp.genHalfAndHalf, min=1, max=4):
"""Create a list of individuals of class Individual."""
if size < 0:
raise RuntimeError("Cannot create population of size {}".format(size))
return [cls.create(gen_method=gen_method, min=min, max=max) for _ in range(size)]
[docs]def nd_phenotype(nd_tree, backend=sympy_phenotype):
"""
:param nd_tree:
:type nd_tree: ANDimTree
:param backend: sympy_phenotype or numpy_phenotype
:return: lambda function
"""
funcs = [backend(t) for t in nd_tree]
return lambda *x: [f(*x) for f in funcs]
[docs]class NDIndividual(type):
def __new__(mcs, base, name="MyNDIndividual", **kwargs):
cls = super().__new__(mcs, name, (ANDimTree,), dict(base=base, **kwargs))
setattr(sys.modules[__name__], name, cls)
return cls
def __init__(cls, base, name="MyNDIndividual", **kwargs): # noqa
"""Construct a new n-dimensional expression tree type.
Args:
base (Individual): one dimensional base class
name: name of the n-dimensional expression tree class
**kwargs: addtional attributes
Returns: n-dimensional expression tree class
"""
super().__init__(name, (ANDimTree,), dict(base=base, **kwargs))
[docs]class ANDimTree(list):
""" A naive tree class representing a vector-valued expression.
Each dimension is encoded as a expression tree.
"""
def __init__(self, trees):
super().__init__(trees)
self.dim = len(trees)
self.fitness = Measure()
@property
@abc.abstractmethod
def base(self):
pass
def __repr__(self):
return str([str(tree) for tree in self])
[docs] @classmethod
def create_individual(cls, ndim):
return cls(cls.base.create_population(ndim))
[docs] @classmethod
def create_population(cls, size, ndim):
return [cls.create_individual(ndim) for _ in range(size)]
@property
def height(self):
return len(self)
@property
def pset(self):
return self.base.pset
@property
def terminals(self):
"""Return terminals that occur in the expression tree."""
return [primitive for primitive in itertools.chain.from_iterable(self) if primitive.arity == 0]
[docs] @classmethod
def from_string(cls, strs):
return cls([cls.base.from_string(s) for s in strs])
[docs]class Measure(deap.base.Fitness):
"""
This is basically a wrapper around deap.base.Fitness.
It provides the following enhancements over the base class:
- more adequate naming
- copy constructable
- no weight attribute
"""
weights = itertools.repeat(-1)
def __init__(self, values=()):
if len(values) > 0:
self.values = values
def __iter__(self):
return iter(self.values)
def __len__(self):
return len(self.values)
[docs] def get_values(self):
return tuple(-1.0 * v for v in self.wvalues)
[docs] def set_values(self, values):
self.wvalues = tuple(-1.0 * v for v in values)
[docs] def del_values(self):
self.wvalues = ()
values = property(get_values, set_values, del_values)
[docs]def convert_inverse_prim(prim, args):
"""
Convert inverse prims according to:
[Dd]iv(a,b) -> Mul[a, 1/b]
[Ss]ub(a,b) -> Add[a, -b]
We achieve this by overwriting the corresponding format method of the sub and div prim.
"""
prim = copy.copy(prim)
converter = {
"sub": lambda *args_: "Add({}, Mul(-1,{}))".format(*args_),
"div": lambda *args_: "Mul({}, Pow({}, -1))".format(*args_),
"add": lambda *args_: "Add({}, {})".format(*args_),
"mul": lambda *args_: "Mul({}, {})".format(*args_),
}
prim_formatter = converter.get(prim.name.lower(), prim.format)
return prim_formatter(*args)
@glyph.utils.Memoize
def simplify_this(expr, timeout=5):
"""
:param expr:
:type expr: str
:return: Sympy representation of simplified expression
:warning: does not respect closures
"""
with glyph.utils.random_state(
simplify_this
): # to avoid strange side effects of sympy testcases and random
with glyph.utils.Timeout(timeout):
try:
return sympy.simplify(expr.to_polish(for_sympy=True))
except Exception as e:
logger.debug(f"Exception during simplification of {expr}: {e}.")
return expr
return expr
[docs]def child_trees(ind):
"""Yield all child tree which are used as arguments for the head node of ind."""
start = 1
while start < len(ind):
slice_ = ind.searchSubtree(start)
yield type(ind)(ind[slice_])
start += slice_.stop - 1
@glyph.utils.Memoize
def simplify_constant(ind):
"""Trims subtrees of symbolic constants.
Recursively applying these rules:
S f(*[symc]*f.arity) = symc
S f(x,... symc) = f(x,... symc)
S symc = symc
For deeper trees, try to trim down lower levels first.
An individual cannot be trimmed down further if its a fixpoint of S.
"""
symc = ind.pset.mapping[ind.pset.constants[0]]
if symc in ind:
root = ind.root
# the tree is just a function of constants
if len(ind) > 1 and all(i == symc for i in ind[1:]):
return type(ind)([symc])
# root is just a constant or tree is a function of a variable and cannot be trimmed down.
elif len(ind) == 1 or len(ind) == root.arity + 1:
return ind
# try to simplify all children of root
else:
acc = [simplify_constant(child) for child in child_trees(ind)]
new_ind = type(ind)([root] + sum(acc, []))
# cannot be simplified
if ind == new_ind:
return ind
# can it be simplified further?
else:
return simplify_constant(new_ind)
else:
return ind
def _constant_normal_form(expr, variables=()):
"""experimental
"""
if expr.is_constant(*variables):
return sympy.Symbol("c")
args = expr.args
if not args:
return expr
elif isinstance(expr, sympy.Pow):
return type(expr)(*[_constant_normal_form(a, variables=variables) for a in args[:-1]] + [args[-1]])
else:
res = type(expr)(*[_constant_normal_form(a, variables=variables) for a in args])
if res == expr:
return expr
else:
return _constant_normal_form(res, variables=variables)
[docs]def pretty_print(expr, constants, consts_values, count=0):
"""Replace symbolic constants in the str representation of an individual
by their numeric values.
This checks for
- c followed by ")" or ","
- c followed by infix operators
- c
"""
for k, v in zip(constants, consts_values):
c = str(k)
p1 = r"{c}(?=[,)])".format(c=c)
p2 = r"^{c}$".format(c=c)
p3 = r"(?<=[*+-/]){c}|(?<=[*+-/] ){c}|{c}(?=\s?[*+-/])".format(c=c)
pattern = r"|".join((p1, p2, p3))
expr = re.sub(pattern, str(v), expr, count=count)
return expr