Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add split/rsplit; Need to refactor regex
  • Loading branch information
yeandy committed Feb 12, 2022
commit 6b382a310ff8719285c892fc33872bcb9a249c1a
83 changes: 77 additions & 6 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -4721,13 +4721,84 @@ def repeat(self, repeats):
pd.core.strings.StringMethods, 'get_dummies',
reason='non-deferred-columns')

split = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'split',
reason='non-deferred-columns')
# TODO: Add for pandas 1.4.0
# def _split_helper(
# self,
# rsplit=False,
# pat=None,
# expand=False,
# regex=None,
# **kwargs
# ):
def _split_helper(self, rsplit=False, pat=None, expand=False, **kwargs):
# Not creating separate columns
if not expand:
proxy = self._expr.proxy()
else:
# Creating separate columns, so data type is more strict
dtype = self._expr.proxy().dtype
if not isinstance(dtype, pd.CategoricalDtype):
method_name = 'rsplit' if rsplit else 'split'
raise frame_base.WontImplementError(
method_name + "() of non-categorical type is not supported because "
"the type of the output column depends on the data. Please use "
"pd.CategoricalDtype with explicit categories.",
reason="non-deferred-columns")

rsplit = frame_base.wont_implement_method(
pd.core.strings.StringMethods, 'rsplit',
reason='non-deferred-columns')
split_cats = [
cat.split(sep=kwargs.get('pat'), maxsplit=kwargs.get('n', -1))
for cat in dtype.categories
]

# TODO: Replace for pandas 1.4.0
# Treat pat as literal string
# if not regex or (regex is None and len(pat) == 1):
# split_cats = [
# cat.split(
# sep=kwargs.get('pat'),
# maxsplit=kwargs.get('n', -1)
# ) for cat in dtype.categories
# ]
# Treat pat as regex
# else:
# split_cats = [
# re.split(
# pattern=pat,
# string=cat,
# maxsplit=kwargs.get('n', 0)
# ) for cat in dtype.categories
# ]

max_splits = len(max(split_cats, key=len))
proxy = pd.DataFrame(columns=range(max_splits))

func = lambda s: proxy.combine_first(
(s.str.split(pat=pat, expand=expand, **kwargs)
if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs))
)

return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'split',
func,
[self._expr],
proxy=proxy,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()))

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
# TODO: Add for pandas 1.4.0
# def split(self, pat=None, expand=False, regex=None, **kwargs):
def split(self, pat=None, expand=False, **kwargs):
return self._split_helper(rsplit=False, pat=pat, expand=expand, **kwargs)

@frame_base.with_docs_from(pd.core.strings.StringMethods)
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
# TODO: Add for pandas 1.4.0
# def rsplit(self, pat=None, expand=False, regex=None, **kwargs):
def rsplit(self, pat=None, expand=False, **kwargs):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rsplit API does not support regex as an argument even though its documentation does. This is probably not a bug, but rather documentation inconsistency. I can file an issue w/ pandas.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's file an issue with pandas about this, if you want to get a commit in pandas I think it's a valid use of your time to contribute a fix too. If not I'm sure someone will pick it up, it's a very active community.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it was filed a while time ago, but no one has picked it up. I'll assign it to myself to do. In the meantime, I can add a subtask (under the non-deferred ticket?) to remind us to update the function api once the rsplit fix in pandas is released.

return self._split_helper(rsplit=True, pat=pat, expand=expand, **kwargs)


ELEMENTWISE_STRING_METHODS = [
Expand Down
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/dataframe/pandas_doctests_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,16 @@ def test_string_tests(self):
f'{module_name}.StringMethods.get_dummies': ['*'],
f'{module_name}.str_get_dummies': ['*'],
f'{module_name}.StringMethods': ['s.str.split("_")'],
f'{module_name}.StringMethods.rsplit': ['*'],
f'{module_name}.StringMethods.split': ['*'],
f'{module_name}.StringMethods.rsplit': [
"s.str.split(expand=True)",
"""s.str.split(r"\\+|=", expand=True)""",
"""s.str.rsplit("/", n=1, expand=True)""",
],
f'{module_name}.StringMethods.split': [
"s.str.split(expand=True)",
"""s.str.split(r"\\+|=", expand=True)""",
"""s.str.rsplit("/", n=1, expand=True)"""
],
},
skip={
# count() on Series with a NaN produces mismatched type if we
Expand Down
53 changes: 52 additions & 1 deletion sdks/python/apache_beam/dataframe/transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import typing
import unittest

import numpy as np
import pandas as pd
# TODO: Add for pandas 1.4.0
# import re

import apache_beam as beam
from apache_beam import coders
Expand Down Expand Up @@ -52,6 +55,7 @@ def check_correct(expected, actual):


def concat(parts):
# import pdb; pdb.set_trace()
if len(parts) > 1:
return pd.concat(parts)
elif len(parts) == 1:
Expand Down Expand Up @@ -81,7 +85,7 @@ def run_scenario(self, input, func):
input_deferred = frame_base.DeferredFrame.wrap(input_placeholder)
actual_deferred = func(input_deferred)._expr.evaluate_at(
expressions.Session({input_placeholder: input}))

# import pdb; pdb.set_trace()
check_correct(expected, actual_deferred)

with beam.Pipeline() as p:
Expand Down Expand Up @@ -349,6 +353,53 @@ def test_rename(self):
0: 2, 2: 0
}, errors='raise'))

def test_split(self):
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
self.run_scenario(s, lambda s: s.str.split())
self.run_scenario(s, lambda s: s.str.rsplit())
self.run_scenario(s, lambda s: s.str.split(n=2))
self.run_scenario(s, lambda s: s.str.rsplit(n=2))
self.run_scenario(s, lambda s: s.str.split(pat="/"))

# When expand=True, there is exception because series is not categorical
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"split\(\) of non-categorical type is not supported"):
self.run_scenario(s, lambda s: s.str.split(expand=True))
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"rsplit\(\) of non-categorical type is not supported"):
self.run_scenario(s, lambda s: s.str.rsplit(expand=True))

# When expand=True, and series is categorical type
s = s.astype('category')
self.run_scenario(s, lambda s: s.str.split(expand=True))
self.run_scenario(s, lambda s: s.str.rsplit("/", n=1, expand=True))

# TODO: Remote this example for pandas 1.4.0
s = pd.Series(["1+1=2"]).astype('category')
self.run_scenario(s, lambda s: s.str.rsplit(r"\+|=", expand=True))

# TODO: Test below examples for pandas 1.4.0
# When expand=True and testing regex
# s = pd.Series(["foo and bar plus baz"]).astype('category')
# self.run_scenario(s, lambda s: s.str.split(r"and|plus", expand=True))

# s = pd.Series(['foojpgbar.jpg']).astype('category')
# self.run_scenario(s, lambda s: s.str.split(r".", expand=True))
# self.run_scenario(s, lambda s: s.str.split(r".", expand=True))

# self.run_scenario(s,
# lambda s: s.str.split(r"\.jpg", regex=True, expand=True))
# self.run_scenario(s,
# lambda s: s.str.split(re.compile(r"\.jpg"), expand=True))
# self.run_scenario(s,
# lambda s: s.str.split(r"\.jpg", regex=False, expand=True))


class FusionTest(unittest.TestCase):
@staticmethod
Expand Down