From 0858f29df681b6e3538b5793bd29739effdcabed Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 4 Aug 2020 11:08:49 -0700 Subject: [PATCH 1/5] [BEAM-9547] Lift associative aggregations. Also fix issue with inputs getting used in downstream stages. --- .../apache_beam/dataframe/expressions.py | 2 +- .../apache_beam/dataframe/frame_base.py | 5 - sdks/python/apache_beam/dataframe/frames.py | 119 +++++++++++++----- .../apache_beam/dataframe/frames_test.py | 19 ++- .../apache_beam/dataframe/transforms.py | 20 +++ 5 files changed, 130 insertions(+), 35 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py index 376efb870478..aaff10db032b 100644 --- a/sdks/python/apache_beam/dataframe/expressions.py +++ b/sdks/python/apache_beam/dataframe/expressions.py @@ -62,7 +62,7 @@ def __init__( self._name = name self._proxy = proxy # Store for preservation through pickling. - self._id = _id or '%s_%s' % (name, id(self)) + self._id = _id or '%s_%s_%s' % (name, type(proxy).__name__, id(self)) def proxy(self): # type: () -> T return self._proxy diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 7780cd3cee5a..deace2fbb96b 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -205,11 +205,6 @@ def wrapper(self, *args, **kwargs): return wrapper -def _associative_agg_method(func): - # TODO(robertwb): Multi-level agg. - return _agg_method(func) - - def wont_implement_method(msg): def wrapper(self, *args, **kwargs): raise WontImplementError(msg) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 89e9154ab16a..79fe9dc3376a 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -35,20 +35,34 @@ def __array__(self, dtype=None): transform = frame_base._elementwise_method( 'transform', restrictions={'axis': 0}) - def agg(self, *args, **kwargs): - return frame_base.DeferredFrame.wrap( - expressions.ComputedExpression( - 'agg', - lambda df: df.agg(*args, **kwargs), [self._expr], - preserves_partition_by=partitionings.Singleton(), - requires_partition_by=partitionings.Singleton())) - - all = frame_base._associative_agg_method('all') - any = frame_base._associative_agg_method('any') - min = frame_base._associative_agg_method('min') - max = frame_base._associative_agg_method('max') - prod = product = frame_base._associative_agg_method('prod') - sum = frame_base._associative_agg_method('sum') + def agg(self, func, axis=0, *args, **kwargs): + if isinstance(func, list) and len(func) > 1: + rows = [self.agg([f], *args, **kwargs) for f in func] + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_aggregate', + lambda *rows: pd.concat(rows), [row._expr for row in rows])) + else: + base_func = func[0] if isinstance(func, list) else func + if _is_associative(base_func) and not args and not kwargs: + intermediate = expressions.elementwise_expression( + 'pre_agg', + lambda s: s.agg([base_func], *args, **kwargs), [self._expr]) + else: + intermediate = self._expr + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'agg', + lambda s: s.agg(func, *args, **kwargs), [intermediate], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Singleton())) + + all = frame_base._agg_method('all') + any = frame_base._agg_method('any') + min = frame_base._agg_method('min') + max = frame_base._agg_method('max') + prod = product = frame_base._agg_method('prod') + sum = frame_base._agg_method('sum') cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method( 'order-sensitive') @@ -159,17 +173,49 @@ def at(self, *args, **kwargs): def loc(self): return _DeferredLoc(self) - def aggregate(self, *args, **kwargs): - if 'axis' in kwargs and kwargs['axis'] is None: - return self.agg(*args, **dict(kwargs, axis=1)).agg( - *args, **dict(kwargs, axis=0)) - return frame_base.DeferredFrame.wrap( + def aggregate(self, func, axis=0, *args, **kwargs): + if axis is None: + return self.agg(func, *args, **dict(kwargs, axis=1)).agg( + func, *args, **dict(kwargs, axis=0)) + elif axis in (1, 'columns'): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'aggregate', + lambda df: df.agg(func, axis=1, *args, **kwargs), + [self._expr], + requires_partition_by=partitionings.Nothing())) + elif len(self._expr.proxy().columns) == 0 or args or kwargs: + return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'aggregate', - lambda df: df.agg(*args, **kwargs), + lambda df: df.agg(func, *args, **kwargs), [self._expr], - # TODO(robertwb): Sub-aggregate when possible. requires_partition_by=partitionings.Singleton())) + else: + if not isinstance(func, dict): + func = {col: func for col in self._expr.proxy().columns} + aggregated_cols = [] + col_names = list(func.keys()) # Reify the order. + for col in col_names: + funcs = func[col] + if not isinstance(funcs, list): + funcs = [funcs] + aggregated_cols.append(self[col].agg(funcs, *args, **kwargs)) + if any(isinstance(funcs, list) for funcs in func.values()): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_aggregate', + lambda *cols: pd.DataFrame( + {col: value for col, value in zip(col_names, cols)}), + [col._expr for col in aggregated_cols])) + else: + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_aggregate', + lambda *cols: pd.Series( + {col: value[0] for col, value in zip(col_names, cols)}), + [col._expr for col in aggregated_cols], + proxy=self._expr.proxy().agg(func, *args, **kwargs))) agg = aggregate @@ -177,16 +223,27 @@ def aggregate(self, *args, **kwargs): memory_usage = frame_base.wont_implement_method('non-deferred value') - all = frame_base._associative_agg_method('all') - any = frame_base._associative_agg_method('any') + all = frame_base._agg_method('all') + any = frame_base._agg_method('any') cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method( 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') - max = frame_base._associative_agg_method('max') - min = frame_base._associative_agg_method('min') - mode = frame_base._agg_method('mode') + max = frame_base._agg_method('max') + min = frame_base._agg_method('min') + + def mode(self, axis=0, *args, **kwargs): + if axis == 1 or axis == 'columns': + raise frame_base.WontImplementError('non-deferred column values') + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'mode', + lambda df: df.mode(*args, **kwargs), + [self._expr], + #TODO(robertwb): Approximate? + requires_partition_by=partitionings.Singleton(), + preserves_partition_by=partitionings.Singleton())) def dropna( self, @@ -221,7 +278,7 @@ def dropna( isna = frame_base._elementwise_method('isna') notnull = notna = frame_base._elementwise_method('notna') - prod = product = frame_base._associative_agg_method('prod') + prod = product = frame_base._agg_method('prod') def quantile(self, q=0.5, axis=0, *args, **kwargs): if axis != 0: @@ -319,7 +376,7 @@ def sort_values( stack = frame_base._elementwise_method('stack') - sum = frame_base._associative_agg_method('sum') + sum = frame_base._agg_method('sum') to_records = to_dict = to_numpy = to_string = ( frame_base.wont_implement_method('non-deferred value')) @@ -413,6 +470,12 @@ def wrapper(self, *args, **kargs): setattr(DeferredGroupBy, meth, _unliftable_agg(meth)) +def _is_associative(func): + return func in LIFTABLE_AGGREGATIONS or ( + getattr(func, '__name__', None) in LIFTABLE_AGGREGATIONS + and func.__module__ in ('numpy', 'builtins')) + + class _DeferredLoc(object): def __init__(self, frame): self._frame = frame diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 1383a529f5c7..bd3182723560 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -35,7 +35,7 @@ def _run_test(self, func, *args): expected = func(*args) actual = expressions.Session({}).evaluate(func(*deferred_args)._expr) self.assertTrue( - expected.equals(actual), + getattr(expected, 'equals', expected.__eq__)(actual), 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual)) def test_series_arithmetic(self): @@ -80,6 +80,23 @@ def test_loc(self): self._run_test(lambda df: df.loc[df.A > 10], df) self._run_test(lambda df: df.loc[lambda df: df.A > 10], df) + def test_series_agg(self): + s = pd.Series(range(16)) + self._run_test(lambda s: s.agg('sum'), s) + self._run_test(lambda s: s.agg(['sum']), s) + self._run_test(lambda s: s.agg(['sum', 'mean']), s) + self._run_test(lambda s: s.agg(['mean']), s) + self._run_test(lambda s: s.agg('mean'), s) + + def test_dataframe_agg(self): + df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]}) + self._run_test(lambda df: df.agg('sum'), df) + self._run_test(lambda df: df.agg(['sum', 'mean']), df) + self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df) + self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df) + self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df) + self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index a270458233df..7900c1dcf256 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -125,6 +125,7 @@ def default_label(self): return '%s:%s' % (self.stage.ops, id(self)) def expand(self, pcolls): + scalar_inputs = [expr for expr in self.stage.inputs if is_scalar(expr)] tabular_inputs = [ expr for expr in self.stage.inputs if not is_scalar(expr) @@ -180,6 +181,22 @@ def __init__(self, inputs, partitioning): self.ops = [] self.outputs = set() + def __repr__(self, indent=0): + if indent: + sep = '\n' + ' ' * indent + else: + sep = '' + return ( + "Stage[%sinputs=%s, %spartitioning=%s, %sops=%s, %soutputs=%s]" % ( + sep, + self.inputs, + sep, + self.partitioning, + sep, + self.ops, + sep, + self.outputs)) + # First define some helper functions. def output_is_partitioned_by(expr, stage, partitioning): if partitioning == partitionings.Nothing(): @@ -244,6 +261,9 @@ def expr_to_stages(expr): # It also must be declared as an output of the producing stage. expr_to_stage(arg).outputs.add(arg) stage.ops.append(expr) + for arg in expr.args(): + if arg in inputs: + stage.inputs.add(arg) # This is a list as given expression may be available in many stages. return [stage] From 7a169fd059670561fe925c79b77d06b4e40c33d5 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 5 Aug 2020 16:38:23 -0700 Subject: [PATCH 2/5] fix lint, py 3.5 issue --- sdks/python/apache_beam/dataframe/frames_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index bd3182723560..ec770370ddf3 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -16,6 +16,7 @@ from __future__ import absolute_import +import sys import unittest import numpy as np @@ -81,13 +82,14 @@ def test_loc(self): self._run_test(lambda df: df.loc[lambda df: df.A > 10], df) def test_series_agg(self): - s = pd.Series(range(16)) + s = pd.Series(list(range(16))) self._run_test(lambda s: s.agg('sum'), s) self._run_test(lambda s: s.agg(['sum']), s) self._run_test(lambda s: s.agg(['sum', 'mean']), s) self._run_test(lambda s: s.agg(['mean']), s) self._run_test(lambda s: s.agg('mean'), s) + @unittest.skipIf(sys.version_info < (3, 6), 'Nondeterministic dict ordering.') def test_dataframe_agg(self): df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]}) self._run_test(lambda df: df.agg('sum'), df) From c5ddbe63b3ad77f1cf5f769b5ce59178118e4ff6 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 5 Aug 2020 16:43:01 -0700 Subject: [PATCH 3/5] cleanup --- sdks/python/apache_beam/dataframe/frames.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index d3a800b75f55..3cb0c5a36806 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -184,9 +184,11 @@ def aggregate(self, func, axis=0, *args, **kwargs): requires_partition_by=partitionings.Singleton())) else: if not isinstance(func, dict): - func = {col: func for col in self._expr.proxy().columns} + col_names = list(self._expr.proxy().columns) + func = {col: func for col in col_names} + else: + col_names = list(func.keys()) aggregated_cols = [] - col_names = list(func.keys()) # Reify the order. for col in col_names: funcs = func[col] if not isinstance(funcs, list): From afaf3e67be30b0324d40dc85e78cc69b9608365d Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 12 Aug 2020 17:17:29 -0700 Subject: [PATCH 4/5] fixup --- sdks/python/apache_beam/dataframe/frames.py | 22 +++++++++++++++---- .../apache_beam/dataframe/transforms.py | 2 ++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 3cb0c5a36806..1216d657134c 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -35,28 +35,32 @@ def __array__(self, dtype=None): transform = frame_base._elementwise_method( 'transform', restrictions={'axis': 0}) - def agg(self, func, axis=0, *args, **kwargs): + def aggregate(self, func, axis=0, *args, **kwargs): if isinstance(func, list) and len(func) > 1: + # Aggregate each column separately, then stick them all together. rows = [self.agg([f], *args, **kwargs) for f in func] return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'join_aggregate', lambda *rows: pd.concat(rows), [row._expr for row in rows])) else: + # We're only handling a single column. base_func = func[0] if isinstance(func, list) else func if _is_associative(base_func) and not args and not kwargs: intermediate = expressions.elementwise_expression( - 'pre_agg', + 'pre_aggregate', lambda s: s.agg([base_func], *args, **kwargs), [self._expr]) else: intermediate = self._expr return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( - 'agg', + 'aggregate', lambda s: s.agg(func, *args, **kwargs), [intermediate], preserves_partition_by=partitionings.Singleton(), requires_partition_by=partitionings.Singleton())) + agg = aggregate + all = frame_base._agg_method('all') any = frame_base._agg_method('any') min = frame_base._agg_method('min') @@ -166,9 +170,12 @@ def loc(self): def aggregate(self, func, axis=0, *args, **kwargs): if axis is None: + # Aggregate across all elements by first aggregating across columns, + # then across rows. return self.agg(func, *args, **dict(kwargs, axis=1)).agg( func, *args, **dict(kwargs, axis=0)) elif axis in (1, 'columns'): + # This is an easy elementwise aggregation. return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'aggregate', @@ -176,6 +183,7 @@ def aggregate(self, func, axis=0, *args, **kwargs): [self._expr], requires_partition_by=partitionings.Nothing())) elif len(self._expr.proxy().columns) == 0 or args or kwargs: + # For these corner cases, just colocate everything. return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'aggregate', @@ -183,6 +191,8 @@ def aggregate(self, func, axis=0, *args, **kwargs): [self._expr], requires_partition_by=partitionings.Singleton())) else: + # In the general case, compute the aggregation of each column separately, + # then recombine. if not isinstance(func, dict): col_names = list(self._expr.proxy().columns) func = {col: func for col in col_names} @@ -194,13 +204,16 @@ def aggregate(self, func, axis=0, *args, **kwargs): if not isinstance(funcs, list): funcs = [funcs] aggregated_cols.append(self[col].agg(funcs, *args, **kwargs)) + # The final shape is different depending on whether any of the columns + # were aggregated by a list of aggregators. if any(isinstance(funcs, list) for funcs in func.values()): return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'join_aggregate', lambda *cols: pd.DataFrame( {col: value for col, value in zip(col_names, cols)}), - [col._expr for col in aggregated_cols])) + [col._expr for col in aggregated_cols], + requires_partition_by=partitionings.Singleton())) else: return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( @@ -208,6 +221,7 @@ def aggregate(self, func, axis=0, *args, **kwargs): lambda *cols: pd.Series( {col: value[0] for col, value in zip(col_names, cols)}), [col._expr for col in aggregated_cols], + requires_partition_by=partitionings.Singleton(), proxy=self._expr.proxy().agg(func, *args, **kwargs))) agg = aggregate diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 7900c1dcf256..ed497f673a04 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -261,6 +261,8 @@ def expr_to_stages(expr): # It also must be declared as an output of the producing stage. expr_to_stage(arg).outputs.add(arg) stage.ops.append(expr) + # Ensure that any inputs for the overall transform are added + # in downstream stages. for arg in expr.args(): if arg in inputs: stage.inputs.add(arg) From 238d1e55343b384972e852c1b58c008ffa30740a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 13 Aug 2020 09:35:05 -0700 Subject: [PATCH 5/5] merge and update wrt non-parallel-operations decorator and check --- .../apache_beam/dataframe/expressions.py | 9 ++-- sdks/python/apache_beam/dataframe/frames.py | 44 ++++++++++--------- .../apache_beam/dataframe/frames_test.py | 18 ++++---- 3 files changed, 40 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py index 183ade912c04..3b85eae331cd 100644 --- a/sdks/python/apache_beam/dataframe/expressions.py +++ b/sdks/python/apache_beam/dataframe/expressions.py @@ -255,9 +255,12 @@ def _get_allow_non_parallel(): @contextlib.contextmanager def allow_non_parallel_operations(allow=True): - old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow - yield - _ALLOW_NON_PARALLEL.value = old_value + if allow is None: + yield + else: + old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow + yield + _ALLOW_NON_PARALLEL.value = old_value class NonParallelOperation(Exception): diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 1216d657134c..aacf4eea71f1 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -50,14 +50,17 @@ def aggregate(self, func, axis=0, *args, **kwargs): intermediate = expressions.elementwise_expression( 'pre_aggregate', lambda s: s.agg([base_func], *args, **kwargs), [self._expr]) + allow_nonparallel_final = True else: intermediate = self._expr - return frame_base.DeferredFrame.wrap( - expressions.ComputedExpression( - 'aggregate', - lambda s: s.agg(func, *args, **kwargs), [intermediate], - preserves_partition_by=partitionings.Singleton(), - requires_partition_by=partitionings.Singleton())) + allow_nonparallel_final = None # i.e. don't change the value + with expressions.allow_non_parallel_operations(allow_nonparallel_final): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'aggregate', + lambda s: s.agg(func, *args, **kwargs), [intermediate], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Singleton())) agg = aggregate @@ -206,23 +209,24 @@ def aggregate(self, func, axis=0, *args, **kwargs): aggregated_cols.append(self[col].agg(funcs, *args, **kwargs)) # The final shape is different depending on whether any of the columns # were aggregated by a list of aggregators. - if any(isinstance(funcs, list) for funcs in func.values()): - return frame_base.DeferredFrame.wrap( + with expressions.allow_non_parallel_operations(): + if any(isinstance(funcs, list) for funcs in func.values()): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_aggregate', + lambda *cols: pd.DataFrame( + {col: value for col, value in zip(col_names, cols)}), + [col._expr for col in aggregated_cols], + requires_partition_by=partitionings.Singleton())) + else: + return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'join_aggregate', - lambda *cols: pd.DataFrame( - {col: value for col, value in zip(col_names, cols)}), + lambda *cols: pd.Series( + {col: value[0] for col, value in zip(col_names, cols)}), [col._expr for col in aggregated_cols], - requires_partition_by=partitionings.Singleton())) - else: - return frame_base.DeferredFrame.wrap( - expressions.ComputedExpression( - 'join_aggregate', - lambda *cols: pd.Series( - {col: value[0] for col, value in zip(col_names, cols)}), - [col._expr for col in aggregated_cols], - requires_partition_by=partitionings.Singleton(), - proxy=self._expr.proxy().agg(func, *args, **kwargs))) + requires_partition_by=partitionings.Singleton(), + proxy=self._expr.proxy().agg(func, *args, **kwargs))) agg = aggregate diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 5b84d58cb28d..7d975ebe8759 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -86,19 +86,21 @@ def test_series_agg(self): s = pd.Series(list(range(16))) self._run_test(lambda s: s.agg('sum'), s) self._run_test(lambda s: s.agg(['sum']), s) - self._run_test(lambda s: s.agg(['sum', 'mean']), s) - self._run_test(lambda s: s.agg(['mean']), s) - self._run_test(lambda s: s.agg('mean'), s) + with beam.dataframe.allow_non_parallel_operations(): + self._run_test(lambda s: s.agg(['sum', 'mean']), s) + self._run_test(lambda s: s.agg(['mean']), s) + self._run_test(lambda s: s.agg('mean'), s) @unittest.skipIf(sys.version_info < (3, 6), 'Nondeterministic dict ordering.') def test_dataframe_agg(self): df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]}) self._run_test(lambda df: df.agg('sum'), df) - self._run_test(lambda df: df.agg(['sum', 'mean']), df) - self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df) - self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df) - self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df) - self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df) + with beam.dataframe.allow_non_parallel_operations(): + self._run_test(lambda df: df.agg(['sum', 'mean']), df) + self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df) + self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df) + self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df) + self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df) class AllowNonParallelTest(unittest.TestCase):