# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, Generic, Iterator, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union, overload
import json
import jsonschema
import random
import numpy as np
from enum import Enum
Freqs = Dict[str, int]
PGO = Dict[str, Dict[str, Freqs]]
[docs]class DefaultValue(Enum):
token = 0
_default_value = DefaultValue.token
Def = TypeVar('Def')
Defaultable = Union[DefaultValue, Def]
[docs]def remove_defaults(it):
return filter((lambda x: x[1] is not _default_value), it)
XDK = TypeVar('XDK')
XDV = TypeVar('XDV')
[docs]def remove_defaults_dict(d:Dict[XDK, Defaultable[XDV]])->Dict[XDK, XDV]:
return dict(remove_defaults(d.items()))
# utilites to load a pgo from json-ish
[docs]def load_pgo_file(filepath) -> PGO:
with open(filepath) as json_file:
json_data = json.load(json_file)
return load_pgo_data(json_data)
[docs]def load_pgo_data(json_data) -> PGO:
jsonschema.validate(json_data, _input_schema)
norm = normalize_pgo_type(json_data)
return norm
# TODO: Add support for falling back on an underlying distribution
# with some probability
T = TypeVar('T')
[docs]class FrequencyDistribution(Generic[T]):
""" Represents the distribution implied by a histogram
"""
freq_dist:np.array # Array[T,int]
vals:np.array # Array[T]
cumulative_freqs:np.array # Array[int]
[docs] @classmethod
def asIntegerValues(cls, freqs:Iterable[Tuple[Any, int]], inclusive_min:Optional[int]=None, inclusive_max:Optional[int]=None)->'FrequencyDistribution[int]':
freqs = freqsAsIntegerValues(freqs, inclusive_min=inclusive_min, inclusive_max=inclusive_max)
return cls(list(freqs), dtype=int)
[docs] @classmethod
def asFloatValues(cls, freqs:Iterable[Tuple[Any, int]], inclusive_min:Optional[int]=None, inclusive_max:Optional[int]=None)->'FrequencyDistribution[float]':
freqs = freqsAsFloatValues(freqs, inclusive_min=inclusive_min, inclusive_max=inclusive_max)
return cls(list(freqs), dtype=float)
[docs] @classmethod
def asEnumValues(cls, freqs:Iterable[Tuple[Any, int]], values:List[Any])->'FrequencyDistribution[Any]':
freqs = freqsAsEnumValues(freqs, values=values)
return cls(list(freqs), dtype=object)
def __init__(self, freqs:Iterable[Tuple[Defaultable[T], int]], dtype=object):
# we need them to be sorted for locality
sorted_freq_list = sorted(freqs, key = (lambda k: (k[0] is _default_value, None if k[0] is _default_value else k[0])))
freqs_array = np.array(sorted_freq_list, dtype=[('value',object),('frequency',int)])
# freqs_array.sort(order='value')
self.freq_dist = freqs_array
self.vals = freqs_array['value']
self.cumulative_freqs = np.cumsum(freqs_array['frequency'])
def __len__(self):
return np.int_(self.cumulative_freqs[-1])
@overload
def __getitem__(self, key: int) -> T: ...
@overload
def __getitem__(self, key: Sequence[int]) -> Sequence[T]: ...
@overload
def __getitem__(self, key: slice) -> Sequence[T]: ...
def __getitem__(self, key: Union[int, Sequence[int], slice]) -> Union[T, Sequence[T]]:
indices:Sequence[int]
single = False
if isinstance(key, (int, float)):
single = True
indices = [key]
elif isinstance(key, slice):
# TODO: this could be made more efficient
indices = range(key.start or 0, key.stop or len(self), key.step or 1)
else:
indices = key
val_indices:Sequence[int] = np.searchsorted(self.cumulative_freqs, indices, side='right')
values = self.vals[val_indices].tolist()
if single:
assert len(values) == 1
return values[0]
else:
return values
[docs] def sample(self)->T:
l = len(self)
i = random.randrange(l)
return self[i]
[docs] def samples(self, count:int)->Sequence[T]:
l = len(self)
i:Sequence[int] = [random.randrange(l) for _ in range(count)]
return self[i]
# utiltities to convert and sample from a PGO frequency distribution
DEFAULT_STR = "default"
[docs]def freqsAsIntegerValues(freqs:Iterable[Tuple[Any, int]], inclusive_min:Optional[int]=None, inclusive_max:Optional[int]=None)->Iterator[Tuple[Defaultable[int], int]]:
""" maps the str values to integers, and skips anything that does not look like an integer"""
for v,f in freqs:
try:
if v == DEFAULT_STR:
yield _default_value, f
continue
i = int(v)
if inclusive_min is not None and inclusive_min > i:
continue
if inclusive_max is not None and inclusive_max < i:
continue
yield i,f
except:
pass
[docs]def freqsAsFloatValues(freqs:Iterable[Tuple[Any, int]], inclusive_min:Optional[float]=None, inclusive_max:Optional[float]=None)->Iterator[Tuple[Defaultable[float], int]]:
""" maps the str values to integers, and skips anything that does not look like an integer"""
for v,f in freqs:
try:
if v == DEFAULT_STR:
yield _default_value, f
continue
i = float(v)
if inclusive_min is not None and inclusive_min > i:
continue
if inclusive_max is not None and inclusive_max < i:
continue
yield i,f
except:
pass
# TODO: we can get a dictionary from freqs (before items() was called)
# and then lookup values in it (since values is likely smaller then freqs)
# or, of course, check which one is smaller and iterate through it
[docs]def freqsAsEnumValues(freqs:Iterable[Tuple[Any, int]],values:List[Any])->Iterator[Tuple[Defaultable[Any], int]]:
""" only keeps things that match the string representation of values in the enumeration.
converts from the string to the value as represented in the enumeration.
"""
def as_str(v)->str:
""" There are some quirks in how the PGO files
encodes values relative to python's str method
"""
if v is None:
return "none"
elif v is True:
return "true"
elif v is False:
return "false"
else:
return str(v)
value_lookup = {as_str(k):k for k in values}
for v,f in freqs:
if v == DEFAULT_STR:
yield _default_value, f
continue
if v in value_lookup:
yield value_lookup[v],f
_input_type = Dict[str, Dict[str, Union[int, Dict[str, Union[str, int]]]]]
# For now, we skip things of the form
# alg -> {default: number}
# (i.e. without parameters)
[docs]def normalize_pgo_type(data: _input_type) -> PGO:
return {alg: {param_keys: {param_values: int(param_counts)
for param_values, param_counts in v2.items()}
for param_keys, v2 in v1.items() if isinstance(v2, dict)}
for alg, v1 in data.items()}
_input_schema: Any = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'description': 'Input format for pgo files. Keys are the name of the algorithm',
'type': 'object',
'additionalProperties': {
"anyOf": [{
'description': 'Keys are the parameter names',
'type': 'object',
'additionalProperties': {
'description': 'Keys are value names',
'type': 'object',
'additionalProperties': {
'anyOf': [{
'description': 'the number of times this value was found',
'type': 'integer'
},
{
'description': 'the number of times this value was found',
'type': 'string'
}]
}
}
},
{
'description': 'default value for the optimizer',
'type': 'object',
'additionalProperties': False,
'required': ['default'],
'properties': {
'default': {'anyOf': [{
'description': 'the number of times the default was found',
'type': 'integer'
},
{
'description': 'the number of times the default was found',
'type': 'string'
}]}
}
}]
}
}