Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions sdks/python/apache_beam/dataframe/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
self._name = name
self._proxy = proxy
# Store for preservation through pickling.
self._id = _id or '%s_%s' % (name, id(self))
self._id = _id or '%s_%s_%s' % (name, type(proxy).__name__, id(self))

def proxy(self): # type: () -> T
return self._proxy
Expand Down Expand Up @@ -255,9 +255,12 @@ def _get_allow_non_parallel():

@contextlib.contextmanager
def allow_non_parallel_operations(allow=True):
old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow
yield
_ALLOW_NON_PARALLEL.value = old_value
if allow is None:
yield
else:
old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow
yield
_ALLOW_NON_PARALLEL.value = old_value


class NonParallelOperation(Exception):
Expand Down
5 changes: 0 additions & 5 deletions sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ def wrapper(self, *args, **kwargs):
return wrapper


def _associative_agg_method(func):
# TODO(robertwb): Multi-level agg.
return _agg_method(func)


def wont_implement_method(msg):
def wrapper(self, *args, **kwargs):
raise WontImplementError(msg)
Expand Down
136 changes: 109 additions & 27 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,41 @@ def __array__(self, dtype=None):
transform = frame_base._elementwise_method(
'transform', restrictions={'axis': 0})

def agg(self, *args, **kwargs):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'agg',
lambda df: df.agg(*args, **kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
def aggregate(self, func, axis=0, *args, **kwargs):
if isinstance(func, list) and len(func) > 1:
# Aggregate each column separately, then stick them all together.
rows = [self.agg([f], *args, **kwargs) for f in func]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *rows: pd.concat(rows), [row._expr for row in rows]))
else:
# We're only handling a single column.
base_func = func[0] if isinstance(func, list) else func
if _is_associative(base_func) and not args and not kwargs:
intermediate = expressions.elementwise_expression(
'pre_aggregate',
lambda s: s.agg([base_func], *args, **kwargs), [self._expr])
allow_nonparallel_final = True
else:
intermediate = self._expr
allow_nonparallel_final = None # i.e. don't change the value
with expressions.allow_non_parallel_operations(allow_nonparallel_final):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda s: s.agg(func, *args, **kwargs), [intermediate],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))

all = frame_base._associative_agg_method('all')
any = frame_base._associative_agg_method('any')
min = frame_base._associative_agg_method('min')
max = frame_base._associative_agg_method('max')
prod = product = frame_base._associative_agg_method('prod')
sum = frame_base._associative_agg_method('sum')
agg = aggregate

all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
min = frame_base._agg_method('min')
max = frame_base._agg_method('max')
prod = product = frame_base._agg_method('prod')
sum = frame_base._agg_method('sum')

cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
Expand Down Expand Up @@ -150,35 +171,90 @@ def at(self, *args, **kwargs):
def loc(self):
return _DeferredLoc(self)

@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def aggregate(self, axis, **kwargs):
def aggregate(self, func, axis=0, *args, **kwargs):
if axis is None:
return self.agg(axis=1, **kwargs).agg(axis=0, **kwargs)
return frame_base.DeferredFrame.wrap(
# Aggregate across all elements by first aggregating across columns,
# then across rows.
return self.agg(func, *args, **dict(kwargs, axis=1)).agg(
func, *args, **dict(kwargs, axis=0))
elif axis in (1, 'columns'):
# This is an easy elementwise aggregation.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, axis=1, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Nothing()))
elif len(self._expr.proxy().columns) == 0 or args or kwargs:
# For these corner cases, just colocate everything.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(axis=axis, **kwargs),
lambda df: df.agg(func, *args, **kwargs),
[self._expr],
# TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
else:
# In the general case, compute the aggregation of each column separately,
# then recombine.
if not isinstance(func, dict):
col_names = list(self._expr.proxy().columns)
func = {col: func for col in col_names}
else:
col_names = list(func.keys())
aggregated_cols = []
for col in col_names:
funcs = func[col]
if not isinstance(funcs, list):
funcs = [funcs]
aggregated_cols.append(self[col].agg(funcs, *args, **kwargs))
# The final shape is different depending on whether any of the columns
# were aggregated by a list of aggregators.
with expressions.allow_non_parallel_operations():
if any(isinstance(funcs, list) for funcs in func.values()):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.DataFrame(
{col: value for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton()))
else:
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.Series(
{col: value[0] for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton(),
proxy=self._expr.proxy().agg(func, *args, **kwargs)))

agg = aggregate
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're missing this alias in Series

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Done.


applymap = frame_base._elementwise_method('applymap')

memory_usage = frame_base.wont_implement_method('non-deferred value')

all = frame_base._associative_agg_method('all')
any = frame_base._associative_agg_method('any')
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')

cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')

max = frame_base._associative_agg_method('max')
min = frame_base._associative_agg_method('min')
mode = frame_base._agg_method('mode')
max = frame_base._agg_method('max')
min = frame_base._agg_method('min')

def mode(self, axis=0, *args, **kwargs):
if axis == 1 or axis == 'columns':
raise frame_base.WontImplementError('non-deferred column values')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'mode',
lambda df: df.mode(*args, **kwargs),
[self._expr],
#TODO(robertwb): Approximate?
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))

@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
Expand All @@ -203,7 +279,7 @@ def dropna(self, axis, **kwargs):
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')

prod = product = frame_base._associative_agg_method('prod')
prod = product = frame_base._agg_method('prod')

@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
Expand Down Expand Up @@ -296,7 +372,7 @@ def sort_values(self, axis, **kwargs):

stack = frame_base._elementwise_method('stack')

sum = frame_base._associative_agg_method('sum')
sum = frame_base._agg_method('sum')

to_records = to_dict = to_numpy = to_string = (
frame_base.wont_implement_method('non-deferred value'))
Expand Down Expand Up @@ -390,6 +466,12 @@ def wrapper(self, *args, **kargs):
setattr(DeferredGroupBy, meth, _unliftable_agg(meth))


def _is_associative(func):
return func in LIFTABLE_AGGREGATIONS or (
getattr(func, '__name__', None) in LIFTABLE_AGGREGATIONS
and func.__module__ in ('numpy', 'builtins'))


class _DeferredLoc(object):
def __init__(self, frame):
self._frame = frame
Expand Down
23 changes: 22 additions & 1 deletion sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import absolute_import

import sys
import unittest

import numpy as np
Expand All @@ -36,7 +37,7 @@ def _run_test(self, func, *args):
expected = func(*args)
actual = expressions.Session({}).evaluate(func(*deferred_args)._expr)
self.assertTrue(
expected.equals(actual),
getattr(expected, 'equals', expected.__eq__)(actual),
'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))

def test_series_arithmetic(self):
Expand Down Expand Up @@ -81,6 +82,26 @@ def test_loc(self):
self._run_test(lambda df: df.loc[df.A > 10], df)
self._run_test(lambda df: df.loc[lambda df: df.A > 10], df)

def test_series_agg(self):
s = pd.Series(list(range(16)))
self._run_test(lambda s: s.agg('sum'), s)
self._run_test(lambda s: s.agg(['sum']), s)
with beam.dataframe.allow_non_parallel_operations():
self._run_test(lambda s: s.agg(['sum', 'mean']), s)
self._run_test(lambda s: s.agg(['mean']), s)
self._run_test(lambda s: s.agg('mean'), s)

@unittest.skipIf(sys.version_info < (3, 6), 'Nondeterministic dict ordering.')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be reasonable to re-order the columns by name when asserting equality?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column ordering seems to be a fairly fundamental property of dataframes that I'd prefer to check in general, and 3.5 won't be supported for long.

def test_dataframe_agg(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
self._run_test(lambda df: df.agg('sum'), df)
with beam.dataframe.allow_non_parallel_operations():
self._run_test(lambda df: df.agg(['sum', 'mean']), df)
self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df)
self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)


class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
Expand Down
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def default_label(self):
return '%s:%s' % (self.stage.ops, id(self))

def expand(self, pcolls):

scalar_inputs = [expr for expr in self.stage.inputs if is_scalar(expr)]
tabular_inputs = [
expr for expr in self.stage.inputs if not is_scalar(expr)
Expand Down Expand Up @@ -180,6 +181,22 @@ def __init__(self, inputs, partitioning):
self.ops = []
self.outputs = set()

def __repr__(self, indent=0):
if indent:
sep = '\n' + ' ' * indent
else:
sep = ''
return (
"Stage[%sinputs=%s, %spartitioning=%s, %sops=%s, %soutputs=%s]" % (
sep,
self.inputs,
sep,
self.partitioning,
sep,
self.ops,
sep,
self.outputs))

# First define some helper functions.
def output_is_partitioned_by(expr, stage, partitioning):
if partitioning == partitionings.Nothing():
Expand Down Expand Up @@ -244,6 +261,11 @@ def expr_to_stages(expr):
# It also must be declared as an output of the producing stage.
expr_to_stage(arg).outputs.add(arg)
stage.ops.append(expr)
# Ensure that any inputs for the overall transform are added
# in downstream stages.
for arg in expr.args():
if arg in inputs:
stage.inputs.add(arg)
Comment on lines +266 to +268
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for arg in expr.args():
if arg in inputs:
stage.inputs.add(arg)
# Ensure that any inputs for the overall transform are added in downstream stages
for arg in expr.args():
if arg in inputs:
stage.inputs.add(arg)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# This is a list as given expression may be available in many stages.
return [stage]

Expand Down