Source code for lale.helpers

# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ast
import contextlib
import json
import jsonschema
import jsonsubschema
import numpy as np
import pandas as pd
import os
import re
import sys
import time
import traceback
import urllib
import warnings
import yaml
import scipy.sparse
import importlib
import graphviz
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, log_loss, make_scorer
from sklearn.metrics.scorer import check_scoring
from sklearn.utils.metaestimators import _safe_split
from lale.util.numpy_to_torch_dataset import NumpyTorchDataset
from lale.util.hdf5_to_torch_dataset import HDF5TorchDataset
from torch.utils.data import DataLoader, TensorDataset
import copy
import logging
import importlib
import inspect
import pkgutil
import torch
import h5py
from typing import List
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

[docs]class assert_raises: def __init__(self, expected_exc_type): self.__expected_exc_type = expected_exc_type def __enter__(self): pass def __exit__(self, exc_type, exc_value, traceback): assert exc_type is self.__expected_exc_type print_yaml('error', str(exc_value), file=sys.stderr) return True
[docs]class assert_warns: def __init__(self, expected_exc_type): self.__expected_exc_type = expected_exc_type def __enter__(self): warnings.filterwarnings("error", category=self.__expected_exc_type) def __exit__(self, exc_type, exc_value, traceback): warnings.resetwarnings() assert exc_type is self.__expected_exc_type print_yaml('error', str(exc_value), file=sys.stderr) return True
[docs]def assignee_name(level=1): tb = traceback.extract_stack() file_name, line_number, function_name, text = tb[-(level+2)] tree = ast.parse(text, file_name) assert isinstance(tree, ast.Module) if len(tree.body) == 1 and isinstance(tree.body[0], ast.Assign): lhs = tree.body[0].targets if len(lhs) == 1 and isinstance(lhs[0], ast.Name): return lhs[0].id return None
[docs]def data_to_json(data, subsample_array = True): if type(data) is tuple: # convert to list return [data_to_json(elem, subsample_array) for elem in data] if type(data) is list: return [data_to_json(elem, subsample_array) for elem in data] elif type(data) is dict: return {key: data_to_json(data[key], subsample_array) for key in data} elif isinstance(data, np.ndarray): return ndarray_to_json(data, subsample_array) elif type(data) is scipy.sparse.csr_matrix: return ndarray_to_json(data.toarray(), subsample_array) elif isinstance(data, pd.DataFrame) or isinstance(data, pd.Series): np_array = data.values return ndarray_to_json(np_array, subsample_array) elif isinstance(data, torch.Tensor): np_array = data.detach().numpy() return ndarray_to_json(np_array, subsample_array) else: return data
[docs]def dict_without(orig_dict, key): return {k: orig_dict[k] for k in orig_dict if k != key}
[docs]def load_yaml(dir_name, file_name, meta_dir=True): current_dir = os.path.dirname(__file__) parent_dir = os.path.dirname(current_dir) if meta_dir: op_dir = os.path.join(parent_dir, 'meta_data', dir_name) else: op_dir = os.path.join(parent_dir, dir_name) schema_file = os.path.join(op_dir, file_name) with open(schema_file, 'r') as f: result = yaml.load(f) return result
[docs]def ndarray_to_json(arr, subsample_array=True): #sample 10 rows and no limit on columns if subsample_array: num_subsamples = [10, np.iinfo(np.int).max, np.iinfo(np.int).max] else: num_subsamples = [np.iinfo(np.int).max, np.iinfo(np.int).max, np.iinfo(np.int).max] def subarray_to_json(indices): if len(indices) == len(arr.shape): if isinstance(arr[indices], bool) or \ isinstance(arr[indices], int) or \ isinstance(arr[indices], float) or \ isinstance(arr[indices], str): return arr[indices] elif np.issubdtype(arr.dtype, np.bool_): return bool(arr[indices]) elif np.issubdtype(arr.dtype, np.integer): return int(arr[indices]) elif np.issubdtype(arr.dtype, np.number): return float(arr[indices]) elif arr.dtype.kind in ['U', 'S', 'O']: return str(arr[indices]) else: raise ValueError(f'Unexpected dtype {arr.dtype}, ' f'kind {arr.dtype.kind}, ' f'type {type(arr[indices])}.') else: assert len(indices) < len(arr.shape) return [subarray_to_json(indices + (i,)) for i in range(min(num_subsamples[len(indices)], arr.shape[len(indices)]))] return subarray_to_json(())
[docs]def validate_schema(value, schema, subsample_array=True): json_value = data_to_json(value, subsample_array) jsonschema.validate(json_value, schema)
JSON_META_SCHEMA_URL = 'http://json-schema.org/draft-04/schema#'
[docs]def json_meta_schema(): return jsonschema.Draft4Validator.META_SCHEMA
[docs]def validate_is_schema(value): if '$schema' in value: assert value['$schema'] == JSON_META_SCHEMA_URL jsonschema.validate(value, json_meta_schema())
[docs]def is_schema(value): if isinstance(value, dict): try: jsonschema.validate(value, json_meta_schema()) except: return False return True return False
[docs]class SubschemaError(Exception): def __init__(self, sub, sup, sub_name='sub', sup_name='super'): self.sub = sub self.sup = sup self.sub_name = sub_name self.sup_name = sup_name def __str__(self): summary = f'Expected {self.sub_name} to be a subschema of {self.sup_name}.' import lale.pretty_print sub = lale.pretty_print.schema_to_string(self.sub) sup = lale.pretty_print.schema_to_string(self.sup) details = f'\n{self.sub_name} = {sub}\n{self.sup_name} = {sup}' return summary + details
[docs]def validate_subschema(sub, sup, sub_name='sub', sup_name='super'): if not jsonsubschema.isSubschema(sub, sup): raise SubschemaError(sub, sup, sub_name, sup_name)
[docs]def cross_val_score_track_trials(estimator, X, y=None, scoring=accuracy_score, cv=5): """ Use the given estimator to perform fit and predict for splits defined by 'cv' and compute the given score on each of the splits. Parameters ---------- estimator: A valid sklearn_wrapper estimator X, y: Valid data and target values that work with the estimator scoring: string or a scorer object created using https://scikit-learn.org/stable/modules/generated/sklearn.metrics.make_scorer.html#sklearn.metrics.make_scorer. A string from sklearn.metrics.SCORERS.keys() can be used or a scorer created from one of sklearn.metrics (https://scikit-learn.org/stable/modules/classes.html#module-sklearn.metrics). A completely custom scorer object can be created from a python function following the example at https://scikit-learn.org/stable/modules/model_evaluation.html The metric has to return a scalar value, cv: an integer or an object that has a split function as a generator yielding (train, test) splits as arrays of indices. Integer value is used as number of folds in sklearn.model_selection.StratifiedKFold, default is 5. Note that any of the iterators from https://scikit-learn.org/stable/modules/cross_validation.html#cross-validation-iterators can be used here. :return: cv_results: a list of scores corresponding to each cross validation fold """ if isinstance(cv, int): cv = StratifiedKFold(cv) scorer = check_scoring(estimator, scoring=scoring) cv_results:List[float] = [] log_loss_results = [] time_results = [] for train, test in cv.split(X, y): X_train, y_train = _safe_split(estimator, X, y, train) X_test, y_test = _safe_split(estimator, X, y, test, train) start = time.time() trained = estimator.fit(X_train, y_train) score_value = scorer(trained, X_test, y_test) execution_time = time.time() - start # not all estimators have predict probability try: y_pred_proba = trained.predict_proba(X_test) logloss = log_loss(y_true=y_test, y_pred=y_pred_proba) log_loss_results.append(logloss) except BaseException: logger.debug("Warning, log loss cannot be computed") cv_results.append(score_value) time_results.append(execution_time) return np.array(cv_results).mean(), np.array(log_loss_results).mean(), np.array(execution_time).mean()
[docs]def cross_val_score(estimator, X, y=None, scoring=accuracy_score, cv=5): """ Use the given estimator to perform fit and predict for splits defined by 'cv' and compute the given score on each of the splits. :param estimator: A valid sklearn_wrapper estimator :param X, y: Valid data and target values that work with the estimator :param scoring: a scorer object from sklearn.metrics (https://scikit-learn.org/stable/modules/classes.html#module-sklearn.metrics) Default value is accuracy_score. :param cv: an integer or an object that has a split function as a generator yielding (train, test) splits as arrays of indices. Integer value is used as number of folds in sklearn.model_selection.StratifiedKFold, default is 5. Note that any of the iterators from https://scikit-learn.org/stable/modules/cross_validation.html#cross-validation-iterators can be used here. :return: cv_results: a list of scores corresponding to each cross validation fold """ if isinstance(cv, int): cv = StratifiedKFold(cv) cv_results = [] for train, test in cv.split(X, y): X_train, y_train = _safe_split(estimator, X, y, train) X_test, y_test = _safe_split(estimator, X, y, test, train) trained_estimator = estimator.fit(X_train, y_train) predicted_values = trained_estimator.predict(X_test) cv_results.append(scoring(y_test, predicted_values)) return cv_results
[docs]def create_operator_using_reflection(class_name, operator_name, param_dict): instance = None if class_name is not None: class_name_parts = class_name.split(".") assert(len(class_name_parts)) >1, "The class name needs to be fully qualified, i.e. module name + class name" module_name = ".".join(class_name_parts[0:-1]) class_name = class_name_parts[-1] module = importlib.import_module(module_name) class_ = getattr(module, class_name) if param_dict is None: instance = class_.create(name=operator_name) else: instance = class_.create(name=operator_name, kwargs = param_dict) return instance
[docs]def create_individual_op_using_reflection(class_name, operator_name, param_dict): instance = None if class_name is not None: class_name_parts = class_name.split(".") assert(len(class_name_parts)) >1, "The class name needs to be fully qualified, i.e. module name + class name" module_name = ".".join(class_name_parts[0:-1]) class_name = class_name_parts[-1] module = importlib.import_module(module_name) class_ = getattr(module, class_name) if param_dict is None: instance = class_() else: instance = class_(**param_dict) return instance
[docs]def to_graphviz(lale_operator, **dot_graph_attr): from lale.operators import Operator, Pipeline from lale.pretty_print import hyperparams_to_string if not isinstance(lale_operator, Operator): raise ValueError("The input to to_graphviz needs to be a valid LALE operator.") jsn = lale_operator.to_json() dot = graphviz.Digraph() dot.attr('graph', {**dot_graph_attr, 'rankdir': 'LR'}) dot.attr('node', fontsize='11', margin='0.06,0.03') if isinstance(lale_operator, Pipeline): nodes = jsn['steps'] else: nodes = [jsn] #figure out what things were called in the caller cls2sym, name2cls = {}, {} import inspect symtab = inspect.stack()[1][0].f_globals for sym, val in symtab.items(): if isinstance(val, Operator): cls = val.class_name() cls2sym[cls] = '?' if (cls in cls2sym) else sym cls2sym = {cls: cls2sym[cls] for cls in cls2sym if cls2sym[cls] != '?'} for node in nodes: name, cls = node['operator'], node['class'] if '|' in name: for step in node['steps']: name2cls[step['operator']] = step['class'] else: name2cls[name] = cls def name2sym(name): if '|' in name: step_syms = [name2sym(n.strip()) for n in name.split(sep='|')] return ' | '.join(step_syms) if name in name2cls: cls = name2cls[name] if cls in cls2sym: return cls2sym[cls] return name def name2label(name): sym = name2sym(name) if sym.find('|') == -1: l1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\n\2', sym) l2 = re.sub('([a-z0-9])([A-Z])', r'\1-\n\2', l1) label = re.sub(r'([^_\n-]_)([^_\n-])', r'\1-\n\2', l2) else: label = sym return label #do the actual visualizations try: edges = jsn['edges'] except KeyError: edges = [] def make_tooltip(node): sym = name2sym(node['operator']) if 'hyperparams' in node: hps = node['hyperparams'] if hps: result = f'{sym}({hyperparams_to_string(hps)})' if len(result) > 255: #too long for graphviz result = result[:252] + '...' return result return sym for i, node in enumerate(nodes): state2color = { 'trained': 'white', 'trainable': 'lightskyblue1', 'planned': 'skyblue2'} attrs = { 'style' :'filled', 'fillcolor': state2color[node['state']], 'tooltip': make_tooltip(node)} if 'documentation_url' in node: attrs = {**attrs, 'URL': node['documentation_url']} dot.node(str(i), name2label(node['operator']), **attrs) for edge in edges: dot.edge(str(edge[0]), str(edge[1])) return dot
[docs]def println_pos(message, out_file=sys.stdout): tb = traceback.extract_stack()[-2] match = re.search(r'<ipython-input-([0-9]+)-', tb[0]) if match: pos = 'notebook cell [{}] line {}'.format(match[1], tb[1]) else: pos = '{}:{}'.format(tb[0], tb[1]) strtime = time.strftime('%Y-%m-%d_%H-%M-%S') to_log = '{}: {} {}'.format(pos, strtime, message) print(to_log, file=out_file) if match: os.system('echo {}'.format(to_log))
[docs]def create_instance_from_hyperopt_search_space(lale_object, hyperparams): ''' Hyperparams is a n-tuple of dictionaries of hyper-parameters, each dictionary corresponds to an operator in the pipeline ''' #lale_object can either be an individual operator, a pipeline or an operatorchoice #Validate that the number of elements in the n-tuple is the same #as the number of steps in the current pipeline from lale.operators import IndividualOp from lale.operators import Pipeline from lale.operators import TrainablePipeline from lale.operators import OperatorChoice if isinstance(lale_object, IndividualOp): return lale_object(**dict_without(hyperparams, 'name')) elif isinstance(lale_object, Pipeline): if len(hyperparams) != len(lale_object.steps()): raise ValueError('The number of steps in the hyper-parameter space does not match the number of steps in the pipeline.') op_instances = [] edges = lale_object.edges() #op_map:Dict[PlannedOpType, TrainableOperator] = {} op_map = {} for op_index in range(len(hyperparams)): state_params = hyperparams[op_index] #TODO: Should ideally check if the class_name is the same as the class name of the op from self.operators() at op_index op_instance = create_instance_from_hyperopt_search_space(lale_object.steps()[op_index], state_params) op_instances.append(op_instance) orig_op = lale_object._steps[op_index] op_map[orig_op] = op_instance #trainable_edges:List[Tuple[TrainableOperator, TrainableOperator]] try: trainable_edges = [(op_map[x], op_map[y]) for (x, y) in edges] except KeyError as e: raise ValueError("An edge was found with an endpoint that is not a step (" + str(e) + ")") return TrainablePipeline(op_instances, trainable_edges, ordered=True) elif isinstance(lale_object, OperatorChoice): #Hyperopt search space for an OperatorChoice is generated as a dictionary with a single element #corresponding to the choice made, the only key is the index of the step and the value is #the params corresponding to that step. step_index, hyperparams = list(hyperparams.items())[0] step_object = lale_object.steps()[step_index] return create_instance_from_hyperopt_search_space(step_object, hyperparams)
[docs]def import_from_sklearn_pipeline(sklearn_pipeline): #For all pipeline steps, identify equivalent lale wrappers if present, #if not, call make operator on sklearn classes and create a lale pipeline. def get_equivalent_lale_op(sklearn_obj): module_name = "lale.lib.sklearn" from sklearn.base import clone from lale.operators import make_operator class_name = sklearn_obj.__class__.__name__ module = importlib.import_module(module_name) try: class_ = getattr(module, class_name) except AttributeError: class_ = make_operator(sklearn_obj, name=class_name) class_ = class_(**sklearn_obj.get_params()) class_._impl._sklearn_model = copy.deepcopy(sklearn_obj) return class_ from sklearn.pipeline import FeatureUnion, Pipeline from sklearn.base import BaseEstimator from lale.operators import make_pipeline, make_union if isinstance(sklearn_pipeline, Pipeline): nested_pipeline_steps = sklearn_pipeline.named_steps.values() nested_pipeline_lale_objects = [import_from_sklearn_pipeline(nested_pipeline_step) for nested_pipeline_step in nested_pipeline_steps] lale_op_obj = make_pipeline(*nested_pipeline_lale_objects) elif isinstance(sklearn_pipeline, FeatureUnion): transformer_list = sklearn_pipeline.transformer_list concat_predecessors = [import_from_sklearn_pipeline(transformer[1]) for transformer in transformer_list] lale_op_obj = make_union(*concat_predecessors) else: lale_op_obj = get_equivalent_lale_op(sklearn_pipeline) return lale_op_obj
[docs]def get_hyperparam_names(op): import inspect if op._impl.__module__.startswith('lale'): hp_schema = op.hyperparam_schema() params = next(iter(hp_schema.get('allOf', []))).get('properties', {}) return list(params.keys()) else: return inspect.getargspec(op._impl.__class__.__init__).args
[docs]def validate_method(op, m): if op._impl.__module__.startswith('lale'): assert (m in op._schemas['properties'].keys()) else: a = '' if m.startswith('input_'): a = m[len('input_'):] elif m.startswith('output_'): a = m[len('output_'):] if a: assert (hasattr(op._impl, a))
[docs]def caml_to_snake(name): s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
[docs]def get_lib_schema(impl): if impl.__module__.startswith('lale.lib'): m = importlib.import_module(impl.__module__) return m._combined_schemas module_name = impl.__module__.split('.')[0] class_name = caml_to_snake(impl.__class__.__name__) try: lib_name = '.'.join(['lale.lib', module_name, class_name]) m = importlib.import_module(lib_name) return m._combined_schemas except (ModuleNotFoundError, AttributeError): try: lib_name = '.'.join(['lale.lib.autogen', class_name]) m = importlib.import_module(lib_name) return m._combined_schemas except: return None
[docs]def signature_to_schema(sig): sig_schema = {'type': 'object', 'properties': {}} for name, param in sig.parameters.items(): ignored_kinds = [inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD] if param.kind not in ignored_kinds: param_schema = {} if param.default != inspect.Parameter.empty: param_schema['default'] = param.default sig_schema['properties'][name] = param_schema return sig_schema
[docs]def get_default_schema(impl): if hasattr(impl, '__init__'): sig = inspect.signature(impl.__init__) arg_schemas = signature_to_schema(sig) else: arg_schemas = {'type': 'object', 'properties': {}} return { '$schema': 'http://json-schema.org/draft-04/schema#', 'description': 'Combined schema for expected data and hyperparameters.', 'type': 'object', 'properties': { 'input_fit': {}, 'input_predict': {}, 'output': {}, 'hyperparams': { 'allOf': [arg_schemas]}}}
logger = logging.getLogger(__name__)
[docs]def wrap_imported_operators(): import lale.lib from lale.operators import Operator, make_operator calling_frame = inspect.stack()[1][0] symtab = calling_frame.f_globals lib_modules = [p.name for p in pkgutil.iter_modules(lale.lib.__path__)] for name, impl in symtab.items(): if inspect.isclass(impl) and not isinstance(impl, Operator): module = impl.__module__.split('.')[0] looks_like_op = hasattr(impl, 'fit') and ( hasattr(impl, 'predict') or hasattr(impl, 'transform')) if module in lib_modules or looks_like_op: logger.info(f'Lale:Wrapped operator:{name}') symtab[name] = make_operator(impl=impl, name=name)
[docs]class val_wrapper(): """This is used to wrap values that cause problems for hyper-optimizer backends lale will unwrap these when given them as the value of a hyper-parameter""" def __init__(self, base): self._base = base
[docs] def unwrap_self(self): return self._base
[docs] @classmethod def unwrap(cls, obj): if isinstance(obj, cls): return cls.unwrap(obj.unwrap_self()) else: return obj
[docs]def append_batch(data, batch_data): if data is None: return batch_data elif isinstance(data, np.ndarray): if isinstance(batch_data, np.ndarray): if len(data.shape) == 1 and len(batch_data.shape) == 1: return np.concatenate([data, batch_data]) else: return np.vstack((data, batch_data)) elif isinstance(data, tuple): X, y = data if isinstance(batch_data, tuple): batch_X, batch_y = batch_data X = append_batch(X, batch_X) y = append_batch(y, batch_y) return X, y elif isinstance(data, torch.Tensor): if isinstance(batch_data, torch.Tensor): return torch.cat((data, batch_data)) elif isinstance(data, h5py.File): if isinstance(batch_data, tuple): batch_X, batch_y = batch_data
#TODO:Handle dataframes
[docs]def create_data_loader(X, y = None, batch_size = 1): if isinstance(X, pd.DataFrame): X = X.to_numpy() if isinstance(y, pd.Series): y = y.to_numpy() elif isinstance(X, np.ndarray): dataset = NumpyTorchDataset(X, y) elif isinstance(X, str):#Assume that this is path to hdf5 file dataset = HDF5TorchDataset(X) elif isinstance(X, dict): #Assumed that it is data indexed by batch number #dataset = BatchDataDictDataset(X) return X.values() elif isinstance(X, torch.Tensor): dataset = TensorDataset(X, y) else: raise TypeError("Can not create a data loader for a dataset with type {}".format(type(X))) return DataLoader(dataset, batch_size=batch_size)
[docs]def write_batch_output_to_file(file_obj, file_path, total_len, batch_idx, batch_X, batch_y, batch_out_X, batch_out_y): if file_obj is None and file_path is None: raise ValueError("Only one of the file object or file path can be None.") if file_obj is None: file_obj = h5py.File(file_path, 'w') #estimate the size of the dataset based on the first batch output size transform_ratio = int(len(batch_out_X)/len(batch_X)) if len(batch_out_X.shape) == 1: h5_data_shape = (transform_ratio*total_len, ) if len(batch_out_X.shape) == 2: h5_data_shape = (transform_ratio*total_len, batch_out_X.shape[1]) elif len(batch_out_X.shape) == 3: h5_data_shape = (transform_ratio*total_len, batch_out_X.shape[1], batch_out_X.shape[2]) dataset = file_obj.create_dataset(name='X', shape=h5_data_shape, chunks=True, compression="gzip") if batch_out_y is None and batch_y is not None: batch_out_y = batch_y if batch_out_y is not None: if len(batch_out_y.shape) == 1: h5_labels_shape = (transform_ratio*total_len, ) elif len(batch_out_y.shape) == 2: h5_labels_shape = (transform_ratio*total_len, batch_out_y.shape[1]) dataset = file_obj.create_dataset(name='y', shape=h5_labels_shape, chunks=True, compression="gzip") dataset = file_obj['X'] dataset[batch_idx*len(batch_out_X):(batch_idx+1)*len(batch_out_X)] = batch_out_X if batch_out_y is not None or batch_y is not None: labels = file_obj['y'] if batch_out_y is not None: labels[batch_idx*len(batch_out_y):(batch_idx+1)*len(batch_out_y)] = batch_out_y else: labels[batch_idx*len(batch_y):(batch_idx+1)*len(batch_y)] = batch_y return file_obj
[docs]def best_estimator(obj): if hasattr(obj, '_impl'): #TrainedIndividualOp if hasattr(obj._impl, 'best_estimator'): #HyperoptRegressorImpl return obj._impl.best_estimator return None if hasattr(obj, 'best_estimator_'): #GridSearchCV if hasattr(obj.best_estimator_, 'to_lale'): #SKlearnCompatWrapper return obj.best_estimator_.to_lale() return obj.best_estimator_ return obj