Source code for lale.pretty_print

# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ast
import astunparse
import importlib
import inspect
import json
import pprint
import re

import lale.helpers
import lale.operators

[docs]def hyperparams_to_string(hps, op=None): if op: for k, v in hps.items(): pass #TODO: use enums where possible def value_to_string(value): return pprint.pformat(value, width=10000, compact=True) strings = [f'{k}={value_to_string(v)}' for k, v in hps.items()] return ', '.join(strings)
[docs]def to_camel_case(name): s1 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name) return s1.lower()
[docs]def indiv_op_to_string(op, name=None, module_name=None): assert isinstance(op, lale.operators.IndividualOp) if name is None: name = op.name() if module_name is None: import_stmt = '' else: if module_name.startswith('lale.'): op_name = op.name() else: op_name = op.class_name().split('.')[-1] if name == op_name: import_stmt = f'from {module_name} import {op_name}' else: import_stmt = f'from {module_name} import {op_name} as {name}' if hasattr(op._impl, "fit") and isinstance(op, lale.operators.TrainableIndividualOp): hps = hyperparams_to_string(op.hyperparams(), op) op_expr = f'{name}({hps})' else: op_expr = name if module_name is None: return op_expr else: return (import_stmt, op_expr)
[docs]def pipeline_to_string(pipeline, cls2name, show_imports): assert isinstance(pipeline, lale.operators.Pipeline) def shallow_copy_graph(pipeline): if isinstance(pipeline, lale.operators.OperatorChoice): return [pipeline], {pipeline:[]}, {pipeline:[]} steps = [*pipeline.steps()] preds = { step: [] for step in steps } succs = { step: [] for step in steps } for (src, dst) in pipeline.edges(): preds[dst].append(src) succs[src].append(dst) return steps, preds, succs class Seq: def __init__(self, src, dst): self._src = src self._dst = dst def src(self): return self._src def dst(self): return self._dst class Par: def __init__(self, s0, s1): self._s0 = s0 self._s1 = s1 def s0(self): return self._s0 def s1(self): return self._s1 def find_seq(steps, preds, succs): for src in steps: if len(succs[src]) == 1: dst = succs[src][0] if len(preds[dst]) == 1: return Seq(src, dst) return None def find_par(steps, preds, succs): for i0 in range(len(steps)): for i1 in range(i0 + 1, len(steps)): s0, s1 = steps[i0], steps[i1] preds0, preds1 = preds[s0], preds[s1] if len(preds0) == len(preds1) and set(preds0) == set(preds1): succs0, succs1 = succs[s0], succs[s1] if len(succs0)==len(succs1) and set(succs0)==set(succs1): return Par(s0, s1) return None def replace_seq(old_steps, old_preds, old_succs, seq): new_steps, new_preds, new_succs = [], {}, {} for step in old_steps: #careful to keep topological order if step is seq.src(): new_steps.append(seq) new_preds[seq] = old_preds[seq.src()] new_succs[seq] = old_succs[seq.dst()] elif step is not seq.dst(): new_steps.append(step) def map_step(step): if step in [seq.src(), seq.dst()]: return seq return step new_preds[step] = [map_step(pred) for pred in old_preds[step]] new_succs[step] = [map_step(succ) for succ in old_succs[step]] return new_steps, new_preds, new_succs def replace_par(old_steps, old_preds, old_succs, par): new_steps, new_preds, new_succs = [], {}, {} for step in old_steps: #careful to keep topological order if step is par.s0(): new_steps.append(par) new_preds[par] = old_preds[step] new_succs[par] = old_succs[step] elif step is not par.s1(): new_steps.append(step) new_preds[step] = [] for pred in old_preds[step]: if pred is par.s0(): new_preds[step].append(par) elif pred is not par.s1(): new_preds[step].append(pred) new_succs[step] = [] for succ in old_succs[step]: if succ is par.s0(): new_succs[step].append(par) elif succ is not par.s1(): new_succs[step].append(succ) return new_steps, new_preds, new_succs def introduce_structure(steps, preds, succs): progress = True while progress: seq = find_seq(steps, preds, succs) if seq: steps, preds, succs = replace_seq(steps, preds, succs, seq) par = find_par(steps, preds, succs) if par: steps, preds, succs = replace_par(steps, preds, succs, par) progress = seq or par if len(steps) == 1: return steps[0] else: return steps, preds, succs def get_module(op): class_name = op.class_name() def has_op(module_name, op_name): module = importlib.import_module(module_name) if hasattr(module, op_name): op = getattr(module, op_name) if isinstance(op, lale.operators.IndividualOp): return op.class_name() == class_name else: return hasattr(op, '__init__') and hasattr(op, 'fit') and ( hasattr(op, 'predict') or hasattr(op, 'transform')) return False mod_name_1 = class_name[:class_name.rfind('.')] mod_name_2 = mod_name_1[:mod_name_1.rfind('.')] if has_op(mod_name_2, op.name()): return mod_name_2 elif has_op(mod_name_1, op.name()): return mod_name_1 op_name = class_name[class_name.rfind('.')+1:] if has_op(mod_name_2, op_name): return mod_name_2 assert has_op(mod_name_1, op_name) return mod_name_1 class CodeGenState: def __init__(self): self.imports = [] self.assigns = [] self.irreducibles = [] self.pipeline = [] self._names = {'lale','pipeline','get_pipeline_of_applicable_type'} self.op2import = {} self.op2assign = {} def gensym(self, prefix): if prefix in self._names: suffix = 1 while f'{prefix}_{suffix}' in self._names: suffix += 1 result = f'{prefix}_{suffix}' else: result = prefix self._names |= {result} return result gen = CodeGenState() def code_gen_rec(graph): if type(graph) is tuple: steps, preds, succs = graph dummy = gen.gensym('step') step2name = {} for step in steps: if isinstance(step, lale.operators.IndividualOp): step2name[step] = code_gen_rec(step) else: name = gen.gensym('step') expr = code_gen_rec(step) gen.irreducibles.append(f'{name} = {expr}') step2name[step] = name make_pipeline = 'get_pipeline_of_applicable_type' gen.imports.append(f'from lale.operators import {make_pipeline}') gen.pipeline = 'pipeline = {}(\n steps=[{}],\n edges=[{}])' \ .format(make_pipeline, ', '.join([step2name[step] for step in steps]), ', '.join([f'({step2name[src]},{step2name[tgt]})' for src in steps for tgt in succs[src]])) return None elif isinstance(graph, Seq): def parens(op): result = code_gen_rec(op) if isinstance(op, Par) or isinstance(op, lale.operators.OperatorChoice): return f'({result})' return result return f'{parens(graph.src())} >> {parens(graph.dst())}' elif isinstance(graph, Par): def parens(op): result = code_gen_rec(op) if isinstance(op, Seq) or isinstance(op, lale.operators.OperatorChoice): return f'({result})' return result return f'{parens(graph.s0())} & {parens(graph.s1())}' elif isinstance(graph, lale.operators.OperatorChoice): def parens(op): result = code_gen_rec(op) if isinstance(op, Seq) or isinstance(op, Par): return f'({result})' return result printed_steps = [parens(step) for step in graph.steps()] return ' | '.join(printed_steps) elif isinstance(graph, lale.operators.IndividualOp): name = gen.gensym(cls2name[graph.class_name()]) module_name = get_module(graph) import_stmt, op_expr = indiv_op_to_string(graph, name, module_name) gen.imports.append(import_stmt) if re.fullmatch(r'.+\(.+\)', op_expr): new_name = gen.gensym(to_camel_case(name)) gen.assigns.append(f'{new_name} = {op_expr}') return new_name else: return name else: assert False, f'unexpected type {type} of graph {graph}' def code_gen_top(graph): expr = code_gen_rec(graph) if expr: gen.pipeline = f'pipeline = {expr}' code = gen.imports if show_imports else [] code = code + gen.assigns + gen.irreducibles + [gen.pipeline] result = '\n'.join(code) return result steps, preds, succs = shallow_copy_graph(pipeline) graph = introduce_structure(steps, preds, succs) return code_gen_top(graph)
[docs]def schema_to_string(schema): s1 = json.dumps(schema) s2 = ast.parse(s1) s3 = astunparse.unparse(s2).strip() s4 = re.sub(r'}, {\n (\s+)', r'},\n\1{ ', s3) s5 = re.sub(r'\[{\n (\s+)', r'[\n\1{ ', s4) s6 = re.sub(r"[^\n]+'\$schema':[^\n]+\n", "", s5) while True: s7 = re.sub(r',\n\s*([\]}])', r'\1', s6) if s6 == s7: break s6 = s7 s8 = re.sub(r'{\s+}', r'{}', s7) return s8
[docs]def to_string(arg, show_imports=True, call_depth=2): def get_cls2name(): frame = inspect.stack()[call_depth][0] result = {} all_items = [*frame.f_locals.items(), *frame.f_globals.items()] for nm, op in all_items: if isinstance(op, lale.operators.IndividualOp) and nm[0].isupper(): cls = op.class_name() if cls not in result: result[cls] = nm return result if lale.helpers.is_schema(arg): return schema_to_string(arg) elif isinstance(arg, lale.operators.IndividualOp): return indiv_op_to_string(arg) elif isinstance(arg, lale.operators.Pipeline): return pipeline_to_string(arg, get_cls2name(), show_imports) else: raise ValueError(f'Unexpected argument type {type(arg)} for {arg}')
[docs]def ipython_display(arg, show_imports=True): import IPython.display pretty_printed = to_string(arg, show_imports, call_depth=3) markdown = IPython.display.Markdown(f'```python\n{pretty_printed}\n```') IPython.display.display(markdown)