diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py index 7e8b7828a833..3b85eae331cd 100644 --- a/sdks/python/apache_beam/dataframe/expressions.py +++ b/sdks/python/apache_beam/dataframe/expressions.py @@ -64,7 +64,7 @@ def __init__( self._name = name self._proxy = proxy # Store for preservation through pickling. - self._id = _id or '%s_%s' % (name, id(self)) + self._id = _id or '%s_%s_%s' % (name, type(proxy).__name__, id(self)) def proxy(self): # type: () -> T return self._proxy @@ -255,9 +255,12 @@ def _get_allow_non_parallel(): @contextlib.contextmanager def allow_non_parallel_operations(allow=True): - old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow - yield - _ALLOW_NON_PARALLEL.value = old_value + if allow is None: + yield + else: + old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow + yield + _ALLOW_NON_PARALLEL.value = old_value class NonParallelOperation(Exception): diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 0b1c29637a59..2b2b1995cc08 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -218,11 +218,6 @@ def wrapper(self, *args, **kwargs): return wrapper -def _associative_agg_method(func): - # TODO(robertwb): Multi-level agg. - return _agg_method(func) - - def wont_implement_method(msg): def wrapper(self, *args, **kwargs): raise WontImplementError(msg) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 9e2e97aec48a..aacf4eea71f1 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -35,20 +35,41 @@ def __array__(self, dtype=None): transform = frame_base._elementwise_method( 'transform', restrictions={'axis': 0}) - def agg(self, *args, **kwargs): - return frame_base.DeferredFrame.wrap( - expressions.ComputedExpression( - 'agg', - lambda df: df.agg(*args, **kwargs), [self._expr], - preserves_partition_by=partitionings.Singleton(), - requires_partition_by=partitionings.Singleton())) + def aggregate(self, func, axis=0, *args, **kwargs): + if isinstance(func, list) and len(func) > 1: + # Aggregate each column separately, then stick them all together. + rows = [self.agg([f], *args, **kwargs) for f in func] + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_aggregate', + lambda *rows: pd.concat(rows), [row._expr for row in rows])) + else: + # We're only handling a single column. + base_func = func[0] if isinstance(func, list) else func + if _is_associative(base_func) and not args and not kwargs: + intermediate = expressions.elementwise_expression( + 'pre_aggregate', + lambda s: s.agg([base_func], *args, **kwargs), [self._expr]) + allow_nonparallel_final = True + else: + intermediate = self._expr + allow_nonparallel_final = None # i.e. don't change the value + with expressions.allow_non_parallel_operations(allow_nonparallel_final): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'aggregate', + lambda s: s.agg(func, *args, **kwargs), [intermediate], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Singleton())) - all = frame_base._associative_agg_method('all') - any = frame_base._associative_agg_method('any') - min = frame_base._associative_agg_method('min') - max = frame_base._associative_agg_method('max') - prod = product = frame_base._associative_agg_method('prod') - sum = frame_base._associative_agg_method('sum') + agg = aggregate + + all = frame_base._agg_method('all') + any = frame_base._agg_method('any') + min = frame_base._agg_method('min') + max = frame_base._agg_method('max') + prod = product = frame_base._agg_method('prod') + sum = frame_base._agg_method('sum') cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method( 'order-sensitive') @@ -150,18 +171,62 @@ def at(self, *args, **kwargs): def loc(self): return _DeferredLoc(self) - @frame_base.args_to_kwargs(pd.DataFrame) - @frame_base.populate_defaults(pd.DataFrame) - def aggregate(self, axis, **kwargs): + def aggregate(self, func, axis=0, *args, **kwargs): if axis is None: - return self.agg(axis=1, **kwargs).agg(axis=0, **kwargs) - return frame_base.DeferredFrame.wrap( + # Aggregate across all elements by first aggregating across columns, + # then across rows. + return self.agg(func, *args, **dict(kwargs, axis=1)).agg( + func, *args, **dict(kwargs, axis=0)) + elif axis in (1, 'columns'): + # This is an easy elementwise aggregation. + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'aggregate', + lambda df: df.agg(func, axis=1, *args, **kwargs), + [self._expr], + requires_partition_by=partitionings.Nothing())) + elif len(self._expr.proxy().columns) == 0 or args or kwargs: + # For these corner cases, just colocate everything. + return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'aggregate', - lambda df: df.agg(axis=axis, **kwargs), + lambda df: df.agg(func, *args, **kwargs), [self._expr], - # TODO(robertwb): Sub-aggregate when possible. requires_partition_by=partitionings.Singleton())) + else: + # In the general case, compute the aggregation of each column separately, + # then recombine. + if not isinstance(func, dict): + col_names = list(self._expr.proxy().columns) + func = {col: func for col in col_names} + else: + col_names = list(func.keys()) + aggregated_cols = [] + for col in col_names: + funcs = func[col] + if not isinstance(funcs, list): + funcs = [funcs] + aggregated_cols.append(self[col].agg(funcs, *args, **kwargs)) + # The final shape is different depending on whether any of the columns + # were aggregated by a list of aggregators. + with expressions.allow_non_parallel_operations(): + if any(isinstance(funcs, list) for funcs in func.values()): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_aggregate', + lambda *cols: pd.DataFrame( + {col: value for col, value in zip(col_names, cols)}), + [col._expr for col in aggregated_cols], + requires_partition_by=partitionings.Singleton())) + else: + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_aggregate', + lambda *cols: pd.Series( + {col: value[0] for col, value in zip(col_names, cols)}), + [col._expr for col in aggregated_cols], + requires_partition_by=partitionings.Singleton(), + proxy=self._expr.proxy().agg(func, *args, **kwargs))) agg = aggregate @@ -169,16 +234,27 @@ def aggregate(self, axis, **kwargs): memory_usage = frame_base.wont_implement_method('non-deferred value') - all = frame_base._associative_agg_method('all') - any = frame_base._associative_agg_method('any') + all = frame_base._agg_method('all') + any = frame_base._agg_method('any') cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method( 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') - max = frame_base._associative_agg_method('max') - min = frame_base._associative_agg_method('min') - mode = frame_base._agg_method('mode') + max = frame_base._agg_method('max') + min = frame_base._agg_method('min') + + def mode(self, axis=0, *args, **kwargs): + if axis == 1 or axis == 'columns': + raise frame_base.WontImplementError('non-deferred column values') + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'mode', + lambda df: df.mode(*args, **kwargs), + [self._expr], + #TODO(robertwb): Approximate? + requires_partition_by=partitionings.Singleton(), + preserves_partition_by=partitionings.Singleton())) @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) @@ -203,7 +279,7 @@ def dropna(self, axis, **kwargs): isna = frame_base._elementwise_method('isna') notnull = notna = frame_base._elementwise_method('notna') - prod = product = frame_base._associative_agg_method('prod') + prod = product = frame_base._agg_method('prod') @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) @@ -296,7 +372,7 @@ def sort_values(self, axis, **kwargs): stack = frame_base._elementwise_method('stack') - sum = frame_base._associative_agg_method('sum') + sum = frame_base._agg_method('sum') to_records = to_dict = to_numpy = to_string = ( frame_base.wont_implement_method('non-deferred value')) @@ -390,6 +466,12 @@ def wrapper(self, *args, **kargs): setattr(DeferredGroupBy, meth, _unliftable_agg(meth)) +def _is_associative(func): + return func in LIFTABLE_AGGREGATIONS or ( + getattr(func, '__name__', None) in LIFTABLE_AGGREGATIONS + and func.__module__ in ('numpy', 'builtins')) + + class _DeferredLoc(object): def __init__(self, frame): self._frame = frame diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 773b3bafd559..7d975ebe8759 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -16,6 +16,7 @@ from __future__ import absolute_import +import sys import unittest import numpy as np @@ -36,7 +37,7 @@ def _run_test(self, func, *args): expected = func(*args) actual = expressions.Session({}).evaluate(func(*deferred_args)._expr) self.assertTrue( - expected.equals(actual), + getattr(expected, 'equals', expected.__eq__)(actual), 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual)) def test_series_arithmetic(self): @@ -81,6 +82,26 @@ def test_loc(self): self._run_test(lambda df: df.loc[df.A > 10], df) self._run_test(lambda df: df.loc[lambda df: df.A > 10], df) + def test_series_agg(self): + s = pd.Series(list(range(16))) + self._run_test(lambda s: s.agg('sum'), s) + self._run_test(lambda s: s.agg(['sum']), s) + with beam.dataframe.allow_non_parallel_operations(): + self._run_test(lambda s: s.agg(['sum', 'mean']), s) + self._run_test(lambda s: s.agg(['mean']), s) + self._run_test(lambda s: s.agg('mean'), s) + + @unittest.skipIf(sys.version_info < (3, 6), 'Nondeterministic dict ordering.') + def test_dataframe_agg(self): + df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]}) + self._run_test(lambda df: df.agg('sum'), df) + with beam.dataframe.allow_non_parallel_operations(): + self._run_test(lambda df: df.agg(['sum', 'mean']), df) + self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df) + self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df) + self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df) + self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df) + class AllowNonParallelTest(unittest.TestCase): def _use_non_parallel_operation(self): diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index a270458233df..ed497f673a04 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -125,6 +125,7 @@ def default_label(self): return '%s:%s' % (self.stage.ops, id(self)) def expand(self, pcolls): + scalar_inputs = [expr for expr in self.stage.inputs if is_scalar(expr)] tabular_inputs = [ expr for expr in self.stage.inputs if not is_scalar(expr) @@ -180,6 +181,22 @@ def __init__(self, inputs, partitioning): self.ops = [] self.outputs = set() + def __repr__(self, indent=0): + if indent: + sep = '\n' + ' ' * indent + else: + sep = '' + return ( + "Stage[%sinputs=%s, %spartitioning=%s, %sops=%s, %soutputs=%s]" % ( + sep, + self.inputs, + sep, + self.partitioning, + sep, + self.ops, + sep, + self.outputs)) + # First define some helper functions. def output_is_partitioned_by(expr, stage, partitioning): if partitioning == partitionings.Nothing(): @@ -244,6 +261,11 @@ def expr_to_stages(expr): # It also must be declared as an output of the producing stage. expr_to_stage(arg).outputs.add(arg) stage.ops.append(expr) + # Ensure that any inputs for the overall transform are added + # in downstream stages. + for arg in expr.args(): + if arg in inputs: + stage.inputs.add(arg) # This is a list as given expression may be available in many stages. return [stage]