# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List, Optional
import os
import logging
import math
import re
from lale.search.search_space import *
from lale.util.Visitor import Visitor
from lale.search import schema2search_space as opt
from lale import helpers
from hyperopt import hp
from hyperopt.pyll import scope
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
[docs]def search_space_to_hp_expr(space:SearchSpace, name:str):
return SearchSpaceHPExprVisitor.run(space, name)
[docs]def search_space_to_hp_str(space:SearchSpace, name:str)->str:
return SearchSpaceHPStrVisitor.run(space, name)
[docs]def search_space_to_str_for_comparison(space:SearchSpace, name:str)->str:
return SearchSpaceHPStrVisitor.run(space, name, counter=None, useCounter=False)
[docs]def mk_label(label, counter, useCounter = True):
if counter is None or not useCounter:
return label
else:
return f"{label}{counter}"
[docs]@scope.define
def pgo_sample(pgo, sample):
return pgo[sample]
[docs]class SearchSpaceHPExprVisitor(Visitor):
[docs] @classmethod
def run(cls, space:SearchSpace, name:str):
visitor = cls(name)
space_:Any = space
return space_.accept(visitor, name)
def __init__(self, name:str):
super(SearchSpaceHPExprVisitor, self).__init__()
[docs] def visitSearchSpaceEnum(self, space:SearchSpaceEnum, path:str, counter=None):
def as_hp_vals(v):
# Lists are not "safe" to pass to hyperopt without wrapping
if isinstance(v, (list,tuple)):
return helpers.val_wrapper(v)
else:
return v
if len(space.vals) == 1:
return as_hp_vals(space.vals[0])
else:
return hp.choice(mk_label(path, counter), [ as_hp_vals(v) for v in space.vals])
visitSearchSpaceConstant = visitSearchSpaceEnum
visitSearchSpaceBool = visitSearchSpaceEnum
[docs] def visitSearchSpaceNumber(self, space:SearchSpaceNumber, path:str, counter=None):
label = mk_label(path, counter)
if space.pgo is not None:
return scope.pgo_sample(space.pgo, hp.quniform(label, 0, len(space.pgo)-1, 1))
dist = "uniform"
if space.distribution:
dist = space.distribution
if space.maximum is None:
raise ValueError(f"maximum not specified for a number with distribution {dist} for {path}")
max = space.getInclusiveMax()
# These distributions need only a maximum
if dist == "integer":
if not space.discrete:
raise ValueError(f"integer distribution specified for a non discrete numeric type for {path}")
return hp.randint(label, max)
if space.minimum is None:
raise ValueError(f"minimum not specified for a number with distribution {dist} for {path}")
min = space.getInclusiveMin()
if dist == "uniform":
if space.discrete:
return scope.int(hp.quniform(label, min, max, 1))
else:
return hp.uniform(label, min, max)
elif dist == "loguniform":
# for log distributions, hyperopt requires that we provide the log of the min/max
if min <= 0:
raise ValueError(f"minimum of 0 specified with a {dist} distribution for {path}. This is not allowed; please set it (possibly using minimumForOptimizer) to be positive")
if min > 0:
min = math.log(min)
if max > 0:
max = math.log(max)
if space.discrete:
return scope.int(hp.qloguniform(label, min, max, 1))
else:
return hp.loguniform(label, min, max)
else:
raise ValueError(f"Unknown distribution type: {dist} for {path}")
[docs] def array_single_expr_(self, space:SearchSpaceArray, path:str, num):
p = mk_label(path, num) + "_"
# mypy does not know about the accept method, since it was
# added via the VisitorMeta class
contents:Any = space.contents
ret = [contents.accept(self, p, counter=x) for x in range(num)]
return tuple(ret) if space.is_tuple else ret
[docs] def visitSearchSpaceArray(self, space:SearchSpaceArray, path:str, counter=None):
assert space.maximum >= space.minimum
p = mk_label(path, counter)
cp = p + "_"
if space.minimum == space.maximum:
return self.array_single_expr_(space, cp, space.minimum)
else:
exprs = [self.array_single_expr_(space, cp, x) for x in range(space.minimum, space.maximum+1)]
res = hp.choice(p, exprs)
return res
[docs] def visitSearchSpaceList(self, space:SearchSpaceList, path:str, counter=None):
p = mk_label(path, counter)
# mypy does not know about the accept method, since it was
# added via the VisitorMeta class
contents:List[Any] = space.contents
ret = [sub.accept(self, p, counter=x) for x,sub in enumerate(contents)]
return tuple(ret) if space.is_tuple else ret
[docs] def visitSearchSpaceObject(self, space:SearchSpaceObject, path:str, counter=None):
search_space = {}
any_path = mk_label(path, counter) + "_" + "combos"
search_space['name']=space.longName
child_counter = None
def asexpr(key, e):
nonlocal child_counter
if e is None:
return None
else:
ee = e.accept(self, path + "_" + key, counter=child_counter)
if child_counter is None:
child_counter = 1
else:
child_counter = child_counter + 1
return ee
def choice_as_tuple_expr(c):
assert len(space.keys) == len(c)
ret = [asexpr(space.keys[ind], e) for ind, e in enumerate(c)]
return ret
choices = [choice_as_tuple_expr(c) for c in space.choices]
valid_hyperparam_combinations = hp.choice(any_path, choices)
i = 0
for k in space.keys:
search_space[k] = valid_hyperparam_combinations[i]
i = i + 1
return search_space
[docs]class SearchSpaceHPStrVisitor(Visitor):
pgo_dict:Dict[str, FrequencyDistribution]
[docs] @classmethod
def run(cls, space:SearchSpace, name:str, counter=None, useCounter=True):
visitor = cls(name)
space_:Any = space
return space_.accept(visitor, name, counter=counter, useCounter=useCounter)
def __init__(self, name:str):
super(SearchSpaceHPStrVisitor, self).__init__()
self.pgo_dict = {}
[docs] def visitSearchSpaceEnum(self, space:SearchSpaceEnum, path:str, counter=None, useCounter=True):
def val_as_str(v):
if v is None:
return "null"
elif isinstance(v, str):
return f"'{v}'"
else:
return str(v)
if len(space.vals) == 1:
return val_as_str(space.vals[0])
else:
vals_str = "[" + ", ".join([val_as_str(v) for v in space.vals]) + "]"
return f"hp.choice('{mk_label(path, counter, useCounter)}', {vals_str})"
visitSearchSpaceConstant = visitSearchSpaceEnum
visitSearchSpaceBool = visitSearchSpaceEnum
[docs] def visitSearchSpaceNumber(self, space:SearchSpaceNumber, path:str, counter=None, useCounter=True):
label = mk_label(path, counter, useCounter)
if space.pgo is not None:
self.pgo_dict[label] = space.pgo
return f"scope.pgo_sample(pgo_{label}, hp.quniform('{label}', {0}, {len(space.pgo)-1}, 1))"
dist = "uniform"
if space.distribution:
dist = space.distribution
if space.maximum is None:
raise ValueError(f"maximum not specified for a number with distribution {dist} for {path}")
max = space.getInclusiveMax()
# These distributions need only a maximum
if dist == "integer":
if not space.discrete:
raise ValueError(f"integer distribution specified for a non discrete numeric type for {path}")
return f"hp.randint('{label}', {max})"
if space.minimum is None:
raise ValueError(f"minimum not specified for a number with distribution {dist} for {path}")
min = space.getInclusiveMin()
if dist == "uniform":
if space.discrete:
return f"hp.quniform('{label}', {min}, {max}, 1)"
else:
return f"hp.uniform('{label}', {min}, {max})"
elif dist == "loguniform":
# for log distributions, hyperopt requires that we provide the log of the min/max
if min <= 0:
raise ValueError(f"minimum of 0 specified with a {dist} distribution for {path}. This is not allowed; please set it (possibly using minimumForOptimizer) to be positive")
if min > 0:
min = math.log(min)
if max > 0:
max = math.log(max)
if space.discrete:
return f"hp.qloguniform('{label}', {min}, {max}, 1)"
else:
return f"hp.loguniform('{label}', {min}, {max})"
else:
raise ValueError(f"Unknown distribution type: {dist} for {path}")
[docs] def array_single_str_(self, space:SearchSpaceArray, path:str, num, useCounter=True)->str:
p = mk_label(path, num, useCounter=useCounter) + "_"
ret = "(" if space.is_tuple else "["
# mypy does not know about the accept method, since it was
# added via the VisitorMeta class
contents:Any = space.contents
ret += ",".join((contents.accept(self, p, counter=x, useCounter=useCounter) for x in range(num)))
ret += ")" if space.is_tuple else "]"
return ret
[docs] def visitSearchSpaceArray(self, space:SearchSpaceArray, path:str, counter=None, useCounter=True)->str:
assert space.maximum >= space.minimum
p = mk_label(path, counter, useCounter=useCounter)
cp = p + "_"
if space.minimum == space.maximum:
return self.array_single_str_(space, cp, space.minimum, useCounter=useCounter)
else:
res = "hp.choice(" + p + ", ["
res += ",".join((self.array_single_str_(space, cp, x, useCounter=useCounter) for x in range(space.minimum, space.maximum+1)))
res += "])"
return res
[docs] def visitSearchSpaceList(self, space:SearchSpaceList, path:str, counter=None, useCounter=True)->str:
p = mk_label(path, counter, useCounter=useCounter)
ret = "(" if space.is_tuple else "["
# mypy does not know about the accept method, since it was
# added via the VisitorMeta class
contents:List[Any] = space.contents
ret += ",".join((sub.accept(self, p, counter=x, useCounter=useCounter) for x,sub in enumerate(contents)))
ret += ")" if space.is_tuple else "]"
return ret
# Note that this breaks in bad ways if there is a nested SearchSpaceObject
# It really relies on there being a top level object and no sub-objects
[docs] def visitSearchSpaceObject(self, space:SearchSpaceObject, path:str, counter=None, useCounter=True):
s_ret = []
space_name = "search_space"
any_name = "valid_hyperparam_combinations"
any_path = mk_label(path, counter, useCounter=useCounter) + "_" + "combos"
s_ret.append(space_name + " = {}")
s_ret.append(f"{space_name}['name'] = {space.longName}")
child_counter = None
def cstr(key, x):
nonlocal child_counter
if x is None:
return "None"
else:
s = s = x.accept(self, path + "_" + key, child_counter, useCounter=useCounter)
if child_counter is None:
child_counter = 1
else:
child_counter = child_counter + 1
return s
def choice_as_tuple_str(c):
assert len(space.keys) == len(c)
ret = [cstr(space.keys[ind], e) for ind, e in enumerate(c)]
return ret
str_choices = "[" + ",".join(["(" + ",".join(choice_as_tuple_str(c)) + ")" for c in space.choices]) + "]"
s_ret.append(f"{any_name} = hp.choice('{any_path}', {str_choices})")
i = 0
for k in space.keys:
s_ret.append(f"{space_name}['{k}'] = {any_name}[{i}]")
i = i + 1
pgo_decls_str:str = ""
if self.pgo_dict:
header = """@scope.define
def pgo_sample(pgo, sample):
return pgo[sample]
"""
# use this to sort the pgo_labels by the unique ind
# appended to the key.
# This puts them in the order they appear in the hyperopt
# expression, making it easier to read
def last_num(kv):
matches = re.search('(\d+)$', kv[0])
if matches is None:
return 0
else:
return int(matches.group(0))
pgo_decls:List[str] = []
for k,v in sorted(self.pgo_dict.items(), key=last_num):
l = v.freq_dist.tolist()
pgo_decls.append(f"pgo_{k} = {l}")
pgo_decls_str = "\n".join(pgo_decls) + "\n"
return header + "\n" + pgo_decls_str + "\n".join(pgo_decls + s_ret)
else:
return s_ret