#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2014 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
'''A best-effort attempt to build computation dependency-graphs from method with dict-like objects (such as pandas),
inspired by XForms: http://lib.tkk.fi/Diss/2007/isbn9789512285662/article3.pdf
.. seealso::
Dependencies
@calculation
@calculation_factory [TODO]
execute_funcs_map()
'''
from collections import OrderedDict, defaultdict
from collections.abc import Mapping, Iterable
import inspect
import logging
import re
from networkx.exception import NetworkXError
import itertools as it
import networkx as nx
import pandas as pd
from .mymock import MagicMock
DEBUG= False
_root_name = 'R'
_root_len = len(_root_name)+1
log = logging.getLogger(__name__)
class DependenciesError(Exception):
def __init__(self, msg, item=None):
super(Exception, self).__init__(msg)
self.item = item
def _make_mock(*args, **kwargs):
return MagicMock(*args, **kwargs)
def harvest_funcs_factory(funcs_factory, root=None, func_rels=None):
new_func_rels = []
## Wrap and invoke funcs_factory with "rooted" mockups as args
# to collect mock_calls.
#
funcs_factory = _wrap_funcs_factory(funcs_factory)
(root, mocks) = funcs_factory.mockup_func_args(root=root)
cfuncs = funcs_factory(*mocks) ## The cfuncs are now wrapped children.
## Harvest cfunc deps as a list of 3-tuple (item, deps, funx)
# by inspecting root after each cfunc-call.
#
for cfunc in cfuncs:
root.reset_mock()
tmp = cfunc()
try: tmp += 2 ## Force dependencies from return values despite compiler-optimizations.
except:
pass
_harvest_mock_calls(root.mock_calls, cfunc, new_func_rels)
funcs_factory.reset()
_validate_func_relations(new_func_rels)
if func_rels is not None:
func_rels.extend(new_func_rels)
return new_func_rels
def harvest_func(func, root=None, func_rels=None):
new_func_rels = []
func = _wrap_standalone_func(func)
(root, mocks) = func.mockup_func_args(root=root)
tmp = func(*mocks)
try: tmp += 2 ## Force dependencies from return values despite compiler-optimizations.
except:
pass
func.reset()
_harvest_mock_calls(root.mock_calls, func, new_func_rels)
_validate_func_relations(new_func_rels)
if func_rels is not None:
func_rels.extend(new_func_rels)
return new_func_rels
def _harvest_mock_calls(mock_calls, func, func_rels):
## A map from 'pure.dot.paths --> call.__paths__
# filled-in and consumed )mostly) by _harvest_mock_call().
deps_set = OrderedDict()
#last_path = None Not needed!
for call in mock_calls:
last_path = _harvest_mock_call(call, func, deps_set, func_rels)
## Any remaining deps came from a last not-assignment (a statement) in func.
# Add them as non-dependent items.
#
if deps_set:
deps_set[last_path] = None ## We don't care about call dep-subprefixes(the value) anymore.
# if ret is None:
# item = last_path
# else:
# item = _parse_mock_arg(ret)
for dep in _filter_common_prefixes(deps_set.keys()):
_append_func_relation(dep, [], func, func_rels)
def _parse_mock_arg(mock):
mpath = _parse_mock_str(mock)[0]
return (_strip_magic_tail(mpath), mpath)
[docs]def _harvest_mock_call(mock_call, func, deps_set, func_rels):
'''Adds a 2-tuple (indep, [deps]) into indeps with all deps collected so far when it visits a __setartr__. '''
def parse_mock_path(mock):
mpath = _parse_mock_str(mock)[0]
try:
## Hack to consolidate 'dot.__getitem__.com' --> fot.Xt.com attributes.
# Just search if previous call is subprefix of this one.
prev_path = next(reversed(deps_set))
prev_call = deps_set[prev_path]
if (prev_call+'()' == call[:len(prev_call)+2]):
mpath = prev_path + mpath[len(prev_call)+_root_len+2:] # 4 = R.()
except (KeyError, StopIteration):
pass
return _strip_magic_tail(mpath)
(call, args, kw) = mock_call
deps_set.update((_parse_mock_arg(arg) for arg in args if isinstance(arg, MagicMock)))
deps_set.update((_parse_mock_arg(arg) for arg in kw.values() if isinstance(arg, MagicMock)))
path = parse_mock_path(mock_call.parent)
tail = call.split('.')[-1]
if (tail == '__getitem__'):
for item in _harvest_indexing(args[0]):
new_path = '%s.%s'%(path, item)
deps_set[new_path] = call
elif (tail == '__setitem__'):
deps = list(deps_set.keys())
deps_set.clear()
for item in _harvest_indexing(args[0]):
new_path ='%s.%s'%(path, item)
_append_func_relation(new_path, deps, func, func_rels)
return path
def _append_func_relation(item, deps, func, func_rels):
func_rels.append((item, deps, func))
[docs]def _harvest_indexing(index):
'''Harvest any strings, slices, etc, assuming to be DF's indices. '''
if isinstance(index, slice):
deps = _harvest_indexing(index.start) + _harvest_indexing(index.stop) + _harvest_indexing(index.step)
elif isinstance(index, str):
deps = [index]
elif isinstance(index, Iterable):
deps = [ii for i in index for ii in _harvest_indexing(i)]
else:
deps = []
return deps
[docs]def _strip_magic_tail(path):
''' some.path___with_.__magics__ --> some.path '''
pa_th = path.split('.')
while (pa_th[-1].startswith('__')):
del pa_th[-1]
if (not pa_th):
return []
path = '.'.join(pa_th)
return path[:-2] if path.endswith('()') else path
_mock_id_regex = re.compile(r"name='([^']+)' id='(\d+)'")
def _parse_mock_str(m):
return _mock_id_regex.search(m.__repr__()).groups()
def _validate_func_relations(func_rels):
try:
bad_items = [item for (item, _, _) in func_rels if not item.startswith(_root_name)]
if bad_items:
raise DependenciesError("Not all dependency-items are prefixed with root(%s): %s"%(_root_name, bad_items), bad_items)
bad_deps = [dep for (_, deps, _) in func_rels for dep in deps if not dep.startswith(_root_name)]
if bad_deps:
raise DependenciesError("Not all dependency-data are prefixed with root(%s): %s"%(_root_name, bad_deps), bad_deps)
## NO, check only for algo-generated errors.
# bad_funcs = [func for (_, _, func) in func_rels if not isinstance(func, _DepFunc)]
# if bad_funcs:
# raise DependenciesError("Not all dependency-funcs are _DepFunc instances: %s"%bad_funcs, bad_funcs)
except DependenciesError:
raise
except Exception as ex:
raise DependenciesError("Bad explicit func_relations(%s) (item not a string, or deps not a tuple, etc) due to: %s"%(func_rels, ex), func_rels) from ex
def _build_func_dependencies_graph(func_rels, graph = None):
if graph is None:
graph = nx.DiGraph()
func_rels = _consolidate_relations(func_rels)
for (path, (deps, funcs)) in func_rels.items():
if (deps):
deps = _filter_common_prefixes(deps)
graph.add_edges_from([(path, dep, {'funcs': funcs}) for dep in deps])
cycles = list(nx.simple_cycles(graph))
if cycles:
log.warning('Cyclic dependencies! %s', cycles)
return graph
[docs]def _consolidate_relations(relations):
'''(item1, deps, func), (item1, ...) --> {item1, (set(deps), set(funcs))}'''
rels = defaultdict()
rels.default_factory = lambda: (set(), set()) # (deps, funcs)
## Join all item's deps & funcs, and strip root-name.
#
for (item, deps, func) in relations:
(pdes, pfuncs) = rels[item[_root_len:]]
pdes.update([d[_root_len:] for d in deps])
pfuncs.add(func)
return rels
[docs]def _filter_common_prefixes(deps):
'''deps: not-empty set
example::
deps = ['a', 'a.b', 'b.cc', 'a.d', 'b', 'ac', 'a.c']
res = _filter_common_prefixes(deps)
assert res == ['a.b', 'a.c', 'a.d', 'ac', 'b.cc']
'''
deps = sorted(deps)
(it1, it2) = it.tee(deps)
s2 = next(it2)
ndeps = []
try:
while(True):
s1=next(it1)
s2=next(it2)
if s1+'.' != s2[:len(s1)+1]:
ndeps.append(s1)
except StopIteration:
ndeps.append(s2)
return ndeps
[docs]def _research_calculation_routes(graph, sources, dests):
'''Find nodes reaching 'dests' but not 'sources'.
sources: a list of nodes (existent or not) to search for all paths originating from them
dests: a list of nodes to search for all paths leading to them them
return: a 2-tuple with the graph and its nodes topologically-ordered
'''
## Remove unrelated dests already present in sources.
#
calc_out_nodes = set(dests)
calc_out_nodes -= set(sources)
calc_inp_nodes = set(graph.nbunch_iter(sources))
## Deps graph: all INPUT's deps broken
# To be used for planing functions_to_run.
#
deps_graph = graph.copy()
deps_graph.remove_edges_from(deps_graph.out_edges(calc_inp_nodes))
## Data_to_be_calced: all INPUTs erased
# To be used for topological-sorting.
#
data_graph = graph.copy()
data_graph.remove_nodes_from(calc_inp_nodes)
## FIXME: IS it NEEDED?
isolates = nx.isolates(data_graph)
log.debug('Isolates: %s', isolates)
data_graph.remove_nodes_from(isolates)
try:
calc_nodes = set(list(calc_out_nodes) + _all_predecessors(data_graph, calc_out_nodes))
except (KeyError, NetworkXError) as ex:
unknown = [node for node in calc_out_nodes if node not in graph]
raise DependenciesError('Unknown OUT-args(%s)!' % unknown, (graph, unknown)) from ex
else:
return (calc_inp_nodes, calc_out_nodes, calc_nodes, deps_graph)
def _all_predecessors(graph, nodes):
return [k for node in nodes for k in nx.bfs_predecessors(graph, node).keys()]
def _find_calculation_order(graph, calc_nodes):
subgraph = graph.subgraph(calc_nodes)
ordered_calc_nodes = list(reversed(nx.topological_sort(subgraph)))
return ordered_calc_nodes
def _extract_funcs_from_edges(graph, ordered_nodes):
# f=list(fs[0]['funcs'])[0]
funcs = [f for (_, _, d) in graph.edges_iter(ordered_nodes, True) if d
for f in d['funcs']] # a list of sets
## Remove duplicates and No-funcs whilist preserving order.
#
od = OrderedDict.fromkeys(funcs)
od.pop(None, None)
funcs = list(od)
return funcs
def _wrap_standalone_func(func):
return _DepFunc(func=func, is_funcs_factory=False)
def _wrap_funcs_factory(funcs_factory):
return _DepFunc(func=funcs_factory, is_funcs_factory=True)
def _wrap_child_func(funcs_factory, child_index):
if not isinstance(funcs_factory, _DepFunc):
funcs_factory = _wrap_funcs_factory(funcs_factory)
return _DepFunc(func=funcs_factory, _child_index=child_index)
[docs]class _DepFunc:
'''A wrapper for functions explored for relations, optionally allowing them to form a hierarchy of factories and produced functions.
It can be in 3 types:
* 0, standalone function: args given to function invocation are used immediatelt,
* 10, functions-factory: args are stored and will be used by the child-funcs returned,
* 20, child-func: created internally, and no args given when invoked, will use parent-factory's args.
Use factory methods to create one of the first 2 types:
* pdcalc._wrap_standalone_func()
* pdcalc._wrap_funcs_factory()
'''
TYPES = ['standalone', 'funcs_fact', 'child']
def __init__(self, func, is_funcs_factory=False, _child_index=None):
self.func = func
if is_funcs_factory: ## Factory
self._type = 1
self.child_funcs = None
assert _child_index == None, self
elif _child_index is not None: ## Child
self._type = 2
self.child_index = _child_index
assert func.is_funcs_factory(), self
else: ## Standalone
self._type = 0
assert _child_index == None, self
if not callable(func):
raise DependenciesError('Cannot create a _DepFunc for a non-callable(%s)!'%func, func)
def get_type(self):
return _DepFunc.TYPES[self._type]
def is_standalone_func(self):
return self._type == 0
def is_funcs_factory(self):
return self._type == 1
def is_child_func(self):
return self._type == 2
def reset(self):
if self.is_funcs_factory():
self.child_funcs = None
def is_reset(self):
assert self.is_funcs_factory(), self
return self.child_funcs is not None
def mockup_func_args(self, root=None):
assert not self.is_child_func(), self
if not root:
root = _make_mock(name=_root_name)
sig = inspect.signature(self.func)
mocks = []
for (name, param) in sig.parameters.items():
if param.kind == inspect.Parameter.VAR_KEYWORD:
log.warning('Any dependencies from **%s will be ignored for %s!', name, self)
break
mock = _make_mock()
mocks.append(mock)
root.attach_mock(mock, name)
return (root, mocks)
def __call__(self, *args, **kwargs):
if self.is_standalone_func(): ## Standalone
return self.func(*args, **kwargs)
elif self.is_funcs_factory(): ## Factory
cfuncs = self.func(*args, **kwargs)
if not cfuncs:
raise DependenciesError('%s returned %s as child-functions!'%(self.func, cfuncs), self.func)
self.child_funcs = cfuncs
return [_DepFunc(func=self, _child_index=i) for i in range(len(cfuncs))]
else: ## Child
parent_fact = self.func
assert parent_fact.is_funcs_factory(), self
## Use new args only if parent has previously been reset.
#
if (args or kwargs) and not parent_fact.is_reset():
parent_fact(*args, **kwargs) ## Ignore returned depfuncs, we are the children!
cfunc = parent_fact.child_funcs[self.child_index]
return cfunc()
def __repr__(self):
return self.__str__()
def __str__(self):
try:
if (self.is_child_func()):
return '_DepFunc<child>(%s, %s)'%(self.func.func, self.child_index)
return '_DepFunc<%s>(%s)'%(self.get_type(), self.func)
except:
return '_DepFunc<BAD_STR>(%s)'%self.func
##############################
## User Utilities
##############################
##
[docs]def tell_paths_from_named_args(named_args, arg_paths_extractor_func=default_arg_paths_extractor, paths=None):
'''
Builds the `sources` or the `dests` sequences params of :meth:`Dependencies.build_plan()` from a map of function-arguments
:param named_args: an ordered map ``{param_name --> param_value}`` similar to that returned by
``inspect.signature(func).bind(*args).arguments: BoundArguments``.
Use the utility :func:`name_all_func_args()` to generate such a map for some function.
:return: the `paths` updated with all ''dotted.vars'' found
'''
if paths is None:
paths = []
for (name, arg) in named_args.items():
arg_paths_extractor_func(name, arg, paths)
return paths
def name_all_func_args(func, *args, **kwargs):
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
return bound_args.arguments
##############################
## Classes
##############################
##
[docs]class Dependencies:
'''
Discovers and stores the rough functions-relationships needed to produce ExecutionPlanner
The relation-tuples are "rough" in the sense that they may contain duplicates etc
requiring cleanup by _consolidate_relations().
**Usage:**
Use the `harvest_{XXX}()` methods or the `add_func_rel()` method
to gather dependencies from functions and function-facts
and then use the `build_plan()` method to freeze them into a `plan`.
'''
@classmethod
[docs] def from_funcs_map(cls, funcs_map, deps = None):
'''
Factory method for building `Dependencies` by harvesting multiple functions and func_factories, at once
:param funcs_map: a mapping or a sequence of pairs ``(what --> bool_or_null)`` with values:
True
when `what` is a funcs_factory, a function returning a sequence of functions processing the data
False
when `what` is a standalone_function, or
None
when `what` is an explicit relation 3-tuple (item, deps, func) to be fed
directly to :func:`Dependencies.add_func_rel()`, where:
item
a ''dotted.varname'' string,
deps
a string or a sequence of ''dotted.varname'' strings, and
func
a standalone func or a funcs_factory as a 2-tuple (func, index)
Both items and deps are dot-separated sequence of varnames, such as 'foo.bar'
:param deps: if none, a new `Dependencies` instance is created to harvest relations
:return: a new (even inherited) or an updated `Dependencies` instance
.. Important:: All functions must accept exactly the same args, or else the results
will be undetermined and it will most-likely scream on execute_funcs_map.
'''
if deps is None:
deps = cls()
if isinstance(funcs_map, Mapping):
pairs = funcs_map.items()
else:
pairs = funcs_map
for (func, is_factory) in list(pairs):
try:
if is_factory is None: ## relation-tuple
funcs_map.pop(func)
deps.add_func_rel(*func)
elif is_factory: ## funcs_factory
deps.harvest_funcs_factory(func)
else: ## standalone-function
deps.harvest_func(func)
except DependenciesError as ex:
raise
except Exception as ex:
raise DependenciesError("Failed harvesting dependencies for %s due to: %s"%(func, ex), func) from ex
return deps
def __init__(self):
self._relation_tuples = []
def harvest_funcs_factory(self, funcs_factory):
root = _make_mock(name=_root_name)
harvest_funcs_factory(funcs_factory, root=root, func_rels=self._relation_tuples)
log.debug('DEPS collected(%i): %s', len(self._relation_tuples), self._relation_tuples)
def harvest_func(self, func):
root = _make_mock(name=_root_name)
harvest_func(func, root=root, func_rels=self._relation_tuples)
log.debug('DEPS collected(%i): %s', len(self._relation_tuples), self._relation_tuples)
[docs] def add_func_rel(self, item, deps=None, func=None):
'''
:param str item: a dotted.var
:param deps: a string or a sequence of dotted.vars
:param func: a standalone func, or a funcs_factory as a 2-tuple (func, index)
'''
## Prepare args and do some basic argument-checking.
#
try:
if func is not None:
if isinstance(func, tuple):
(funcs_factory, child_index) = func
func = _wrap_child_func(funcs_factory, child_index)
else:
func = _wrap_standalone_func(func)
except (TypeError, ValueError) as ex:
raise DependenciesError("Failed adding explicit func_relation(%s) due to bad functions\n Function must either be a standalone-function or a 2-tuple(funcS_factory, child_index)."%item) from ex
try:
item = _root_name + '.' + item
if deps is None:
deps = []
else:
if isinstance(deps, str):
deps = [deps]
deps = [_root_name + '.' + dep for dep in deps]
except TypeError as ex:
raise DependenciesError("Failed adding explicit func_relation(%s) due to bad item or dependencies strings!"%item) from ex
new_func_rels = []
_append_func_relation(item, deps, func, new_func_rels)
_validate_func_relations(new_func_rels)
self._relation_tuples.extend(new_func_rels)
def _build_deps_graph(self):
graph = _build_func_dependencies_graph(self._relation_tuples)
return graph
def _make_empty_plan(self):
return pd.Series(dict(calc_inp_nodes=[], calc_out_nodes=[], calc_nodes=[],
missing_data=[] if DEBUG else None, deps_graph=[]))
[docs] def build_plan(self, sources, dests):
'''
Builds an execution-plan that when executed with :func:`execute_plan()` it will produce `dests` from `sources`.
Turns any stored-relations into a the full dependencies-graph then trims it
only to those ''dotted.varname'' nodes reaching from `sources` to `dests`.
:param sequence dests: a list of ''dotted.varname''s that will be the outcome of the calculations
:param sources: a list of ''dotted.varname''s (existent or not) that are assumed to exist
when the execution wil start
:return: the new `plan` that can be fed to :func:`execute_plan()`
:rtype: pd.Series
'''
log.debug('EXISTING data(%i): %s', len(sources), sources)
log.debug('REQUESTED data(%i): %s', len(dests), dests)
plan = self._make_empty_plan()
plan.dests = dests
graph = self._build_deps_graph()
log.debug('GRAPH constructed(%i): %s', graph.size(), graph.edges(data=True))
(calc_inp_nodes, calc_out_nodes, unordered_calc_nodes, deps_graph) = \
_research_calculation_routes(graph, sources, dests)
plan.deps_graph = deps_graph
plan.calc_out_nodes = calc_out_nodes
plan.calc_inp_nodes = calc_inp_nodes
calc_nodes = _find_calculation_order(graph, unordered_calc_nodes)
plan.calc_nodes = calc_nodes
plan.deps = deps_graph.edges(calc_nodes, data=True)
if DEBUG:
missing_inp_nodes = _find_missing_input(calc_inp_nodes, deps_graph)
plan.missing_inp_nodes = missing_inp_nodes
else:
plan.missing_inp_nodes = None
funcs = _extract_funcs_from_edges(deps_graph, calc_nodes)
plan.funcs = funcs
return plan
##############################
## Top-level methods
##############################
##
## Used if no default-dependencies specified in the `calculation` decorator, below.
calculation_deps = Dependencies()
[docs]def calculation(deps = None):
'''A decorator that extracts dependencies from a function.
:param deps: if None, any :subscript:`2` extracted deps are added in the `calculation_deps` module-variable.
Example::
@calculation
def foo(a, r):
a['bc'] = a.aa
r['r1'] = a.bb
@calculation
def bar(a, r):
r['r2'] = a.bc
plan = calculation_deps.build_plan(['a.ab', 'a.bc', 'r'], ['r.r2'])
planner.execute_funcs_map(plan, 1, 2)
'''
if deps is None:
deps = calculation_deps
def decorator(func):
deps.harvest_func(func)
return func
return decorator
def execute_plan(plan, *args, **kwargs):
results = []
for func in plan.funcs:
try:
results.append(func(*args, **kwargs))
except Exception as ex:
raise DependenciesError("Failed executing %s due to: %s"%(func, ex), func) from ex
return results
[docs]def execute_funcs_factory(funcs_fact, dests, *args, **kwargs):
'''A one-off way to run calculations from a funcions-factory (see :func:`execute_funcs_map()`)'''
return execute_funcs_map({funcs_fact: True}, dests, *args, **kwargs)
[docs]def execute_funcs_map(funcs_map, dests, *args, **kwargs):
'''
A one-off way to run calculations from a funcs_map as defined in :meth:`Dependencies:from_funcs_map()`.
:param funcs_map: a dictionary similar to the one used by :meth:`Dependencies.from_funcs_map()`
:param seq dests: what is required as the final outcome of the execution,
ie: for the func-functory::
def some_funcs(foo, bar):
def func_1():
foo['a1'] = ...
...
def func_n(): ...
foo['a2'] = ...
bar['b1'] = ...
return [func_1, ..., func_n]
the `dests` may be::
['foo.a1', 'foo.a2', 'bar.b1']
:param args: the actual args to use for invoking the functions in the map,
and the names of these args would come rom the first function
to be invoked (which ever that might be!).
.. Note:: Do not use it to run the calculations repeatedly. Preffer to cache the function-relations
into an intermediate :class:`Dependencies` instance, instead.
'''
## Find the first func in the map and
# use it for naming the args.
#
a_func = None
for func in funcs_map.keys():
if callable(func):
a_func = func
break
else:
raise DependenciesError('No function found in funcs_map(%s)!'%funcs_map, funcs_map)
named_args = name_all_func_args(a_func, *args, **kwargs)
sources = tell_paths_from_named_args(named_args)
deps = Dependencies.from_funcs_map(funcs_map)
plan = deps.build_plan(sources, dests)
return execute_plan(plan, *args, **kwargs)