diff --git a/CHANGES.md b/CHANGES.md index 5db1b11c595a..ef9cde03d474 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,7 @@ * Fixed "Desired bundle size 0 bytes must be greater than 0" in Java SDK's BigtableIO.BigtableSource when you have more cores than bytes to read (Java) [#28793](https://github.com/apache/beam/issues/28793). * `watch_file_pattern` arg of the [RunInference](https://github.com/apache/beam/blob/104c10b3ee536a9a3ea52b4dbf62d86b669da5d9/sdks/python/apache_beam/ml/inference/base.py#L997) arg had no effect prior to 2.52.0. To use the behavior of arg `watch_file_pattern` prior to 2.52.0, follow the documentation at https://beam.apache.org/documentation/ml/side-input-updates/ and use `WatchFilePattern` PTransform as a SideInput. ([#28948](https://github.com/apache/beam/pulls/28948)) +* `MLTransform` doesn't output artifacts such as min, max and quantiles. Instead, `MLTransform` will add a feature to output these artifacts as human readable format - [#29017](https://github.com/apache/beam/issues/29017). For now, to use the artifacts such as min and max that were produced by the eariler `MLTransform`, use `read_artifact_location` of `MLTransform`, which reads artifacts that were produced earlier in a different `MLTransform` ([#29016](https://github.com/apache/beam/pull/29016/)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py index 1d2197e35e4e..0db10718295b 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py @@ -31,7 +31,7 @@ import tensorflow_transform as tft # pylint: disable=unused-import from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1 from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary - from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data + from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_scalar except ImportError: raise unittest.SkipTest('tensorflow_transform is not installed.') @@ -46,8 +46,8 @@ def check_mltransform_compute_and_apply_vocab(): def check_mltransform_scale_to_0_1(): expected = '''[START mltransform_scale_to_0_1] -Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32)) -Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32)) +Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32)) +Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32)) [END mltransform_scale_to_0_1] '''.splitlines()[1:-1] return expected @@ -80,7 +80,7 @@ def test_mltransform_scale_to_0_1(self, mock_stdout): self.assertEqual(predicted, expected) def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout): - mltransform_compute_and_apply_vocabulary_with_non_columnar_data() + mltransform_compute_and_apply_vocabulary_with_scalar() predicted = mock_stdout.getvalue().splitlines() expected = check_mltransform_compute_and_apply_vocabulary_with_scalar() self.assertEqual(predicted, expected) diff --git a/sdks/python/apache_beam/ml/transforms/base.py b/sdks/python/apache_beam/ml/transforms/base.py index 49ce6e9ec1e0..dc70aebf93fd 100644 --- a/sdks/python/apache_beam/ml/transforms/base.py +++ b/sdks/python/apache_beam/ml/transforms/base.py @@ -67,16 +67,6 @@ def apply_transform(self, data: OperationInputT, inputs: input data. """ - @abc.abstractmethod - def get_artifacts( - self, data: OperationInputT, - output_column_prefix: str) -> Optional[Dict[str, OperationOutputT]]: - """ - If the operation generates any artifacts, they can be returned from this - method. - """ - pass - def __call__(self, data: OperationInputT, output_column_name: str) -> Dict[str, OperationOutputT]: """ @@ -84,9 +74,6 @@ def __call__(self, data: OperationInputT, This method will invoke the apply() method of the class. """ transformed_data = self.apply_transform(data, output_column_name) - artifacts = self.get_artifacts(data, output_column_name) - if artifacts: - transformed_data = {**transformed_data, **artifacts} return transformed_data def get_counter(self): diff --git a/sdks/python/apache_beam/ml/transforms/handlers_test.py b/sdks/python/apache_beam/ml/transforms/handlers_test.py index 3342ec76cae5..327c8c76c0e9 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers_test.py +++ b/sdks/python/apache_beam/ml/transforms/handlers_test.py @@ -58,14 +58,6 @@ def apply_transform(self, inputs, output_column_name, **kwargs): return {output_column_name: inputs * 10} -class _FakeOperationWithArtifacts(TFTOperation): - def apply_transform(self, inputs, output_column_name, **kwargs): - return {output_column_name: inputs} - - def get_artifacts(self, data, col_name): - return {'artifact': tf.convert_to_tensor([1])} - - class IntType(NamedTuple): x: int @@ -106,16 +98,6 @@ def test_tft_operation_preprocessing_fn( actual_result = process_handler.process_data_fn(inputs) self.assertDictEqual(actual_result, expected_result) - def test_preprocessing_fn_with_artifacts(self): - process_handler = handlers.TFTProcessHandler( - transforms=[_FakeOperationWithArtifacts(columns=['x'])], - artifact_location=self.artifact_location) - inputs = {'x': [1, 2, 3]} - preprocessing_fn = process_handler.process_data_fn - actual_result = preprocessing_fn(inputs) - expected_result = {'x': [1, 2, 3], 'artifact': tf.convert_to_tensor([1])} - self.assertDictEqual(actual_result, expected_result) - def test_input_type_from_schema_named_tuple_pcoll(self): data = [{'x': 1}] with beam.Pipeline() as p: diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py index 1d492642cd60..c7b8ff015324 100644 --- a/sdks/python/apache_beam/ml/transforms/tft.py +++ b/sdks/python/apache_beam/ml/transforms/tft.py @@ -45,9 +45,7 @@ import tensorflow as tf import tensorflow_transform as tft from apache_beam.ml.transforms.base import BaseOperation -from tensorflow_transform import analyzers from tensorflow_transform import common_types -from tensorflow_transform import tf_utils __all__ = [ 'ComputeAndApplyVocabulary', @@ -77,6 +75,8 @@ def wrapper(fn): return wrapper +# TODO: https://github.com/apache/beam/pull/29016 +# Add support for outputting artifacts to a text file in human readable form. class TFTOperation(BaseOperation[common_types.TensorType, common_types.TensorType]): def __init__(self, columns: List[str]) -> None: @@ -95,13 +95,6 @@ def __init__(self, columns: List[str]) -> None: "Columns are not specified. Please specify the column for the " " op %s" % self.__class__.__name__) - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - """ - Returns the artifacts generated by the operation. - """ - return {} - @tf.function def _split_string_with_delimiter(self, data, delimiter): """ @@ -240,15 +233,6 @@ def apply_transform( } return output_dict - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - mean_var = tft.analyzers._mean_and_var(data) - shape = [tf.shape(data)[0], 1] - return { - col_name + '_mean': tf.broadcast_to(mean_var[0], shape), - col_name + '_var': tf.broadcast_to(mean_var[1], shape), - } - @register_input_dtype(float) class ScaleTo01(TFTOperation): @@ -280,14 +264,6 @@ def __init__( self.elementwise = elementwise self.name = name - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - shape = [tf.shape(data)[0], 1] - return { - col_name + '_min': tf.broadcast_to(tft.min(data), shape), - col_name + '_max': tf.broadcast_to(tft.max(data), shape) - } - def apply_transform( self, data: common_types.TensorType, output_column_name: str) -> Dict[str, common_types.TensorType]: @@ -368,34 +344,6 @@ def __init__( self.elementwise = elementwise self.name = name - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - num_buckets = self.num_buckets - epsilon = self.epsilon - elementwise = self.elementwise - - if num_buckets < 1: - raise ValueError('Invalid num_buckets %d' % num_buckets) - - if isinstance(data, (tf.SparseTensor, tf.RaggedTensor)) and elementwise: - raise ValueError( - 'bucketize requires `x` to be dense if `elementwise=True`') - - x_values = tf_utils.get_values(data) - - if epsilon is None: - # See explanation in args documentation for epsilon. - epsilon = min(1.0 / num_buckets, 0.01) - - quantiles = analyzers.quantiles( - x_values, num_buckets, epsilon, reduce_instance_dims=not elementwise) - shape = [ - tf.shape(data)[0], num_buckets - 1 if num_buckets > 1 else num_buckets - ] - # These quantiles are used as the bucket boundaries in the later stages. - # Should we change the prefix _quantiles to _bucket_boundaries? - return {col_name + '_quantiles': tf.broadcast_to(quantiles, shape)} - def apply_transform( self, data: common_types.TensorType, output_column_name: str) -> Dict[str, common_types.TensorType]: @@ -572,6 +520,7 @@ def __init__( ngram_range: Tuple[int, int] = (1, 1), ngrams_separator: Optional[str] = None, compute_word_count: bool = False, + key_vocab_filename: str = 'key_vocab_mapping', name: Optional[str] = None, ): """ @@ -592,9 +541,9 @@ def __init__( n-gram sizes. seperator: A string that will be inserted between each ngram. compute_word_count: A boolean that specifies whether to compute - the unique word count and add it as an artifact to the output. - Note that the count will be computed over the entire dataset so - it will be the same value for all inputs. + the unique word count over the entire dataset. Defaults to False. + key_vocab_filename: The file name for the key vocabulary file when + compute_word_count is True. name: A name for the operation (optional). Note that original order of the input may not be preserved. @@ -605,33 +554,26 @@ def __init__( self.ngrams_separator = ngrams_separator self.name = name self.split_string_by_delimiter = split_string_by_delimiter + self.key_vocab_filename = key_vocab_filename if compute_word_count: self.compute_word_count_fn = count_unqiue_words else: - self.compute_word_count_fn = lambda *args, **kwargs: {} + self.compute_word_count_fn = lambda *args, **kwargs: None if ngram_range != (1, 1) and not ngrams_separator: raise ValueError( 'ngrams_separator must be specified when ngram_range is not (1, 1)') - def get_artifacts(self, data: tf.SparseTensor, - col_name: str) -> Dict[str, tf.Tensor]: - return self.compute_word_count_fn(data, col_name) - def apply_transform(self, data: tf.SparseTensor, output_col_name: str): if self.split_string_by_delimiter: data = self._split_string_with_delimiter( data, self.split_string_by_delimiter) output = tft.bag_of_words( data, self.ngram_range, self.ngrams_separator, self.name) + # word counts are written to the key_vocab_filename + self.compute_word_count_fn(data, self.key_vocab_filename) return {output_col_name: output} -def count_unqiue_words(data: tf.SparseTensor, - output_col_name: str) -> Dict[str, tf.Tensor]: - keys, count = tft.count_per_key(data) - shape = [tf.shape(data)[0], tf.shape(keys)[0]] - return { - output_col_name + '_unique_elements': tf.broadcast_to(keys, shape), - output_col_name + '_counts': tf.broadcast_to(count, shape) - } +def count_unqiue_words(data: tf.SparseTensor, output_vocab_name: str) -> None: + tft.count_per_key(data, key_vocabulary_filename=output_vocab_name) diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py index 41f59c868c3b..38ded6a809af 100644 --- a/sdks/python/apache_beam/ml/transforms/tft_test.py +++ b/sdks/python/apache_beam/ml/transforms/tft_test.py @@ -17,6 +17,7 @@ # pytype: skip-file +import os import shutil import tempfile import unittest @@ -38,31 +39,6 @@ if not tft: raise unittest.SkipTest('tensorflow_transform is not installed.') -z_score_expected = {'x_mean': 3.5, 'x_var': 2.9166666666666665} - - -def assert_z_score_artifacts(element): - element = element.as_dict() - assert 'x_mean' in element - assert 'x_var' in element - assert element['x_mean'] == z_score_expected['x_mean'] - assert element['x_var'] == z_score_expected['x_var'] - - -def assert_ScaleTo01_artifacts(element): - element = element.as_dict() - assert 'x_min' in element - assert 'x_max' in element - assert element['x_min'] == 1 - assert element['x_max'] == 6 - - -def assert_bucketize_artifacts(element): - element = element.as_dict() - assert 'x_quantiles' in element - assert np.array_equal( - element['x_quantiles'], np.array([3, 5], dtype=np.float32)) - class ScaleZScoreTest(unittest.TestCase): def setUp(self) -> None: @@ -100,7 +76,18 @@ def test_z_score(self): | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.ScaleToZScore(columns=['x']))) - _ = (result | beam.Map(assert_z_score_artifacts)) + expected_data = [ + np.array([-1.46385], dtype=np.float32), + np.array([-0.87831], dtype=np.float32), + np.array([-0.29277], dtype=np.float32), + np.array([0.29277], dtype=np.float32), + np.array([0.87831], dtype=np.float32), + np.array([1.46385], dtype=np.float32), + ] + + actual_data = (result | beam.Map(lambda x: x.x)) + assert_that( + actual_data, equal_to(expected_data, equals_fn=np.array_equal)) def test_z_score_list_data(self): list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}] @@ -111,7 +98,14 @@ def test_z_score_list_data(self): | "listMLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.ScaleToZScore(columns=['x']))) - _ = (list_result | beam.Map(assert_z_score_artifacts)) + + expected_data = [ + np.array([-1.46385, -0.87831, -0.29277], dtype=np.float32), + np.array([0.29277, 0.87831, 1.46385], dtype=np.float32) + ] + actual_data = (list_result | beam.Map(lambda x: x.x)) + assert_that( + actual_data, equal_to(expected_data, equals_fn=np.array_equal)) class ScaleTo01Test(unittest.TestCase): @@ -130,7 +124,6 @@ def test_ScaleTo01_list(self): | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.ScaleTo01(columns=['x']))) - _ = (list_result | beam.Map(assert_ScaleTo01_artifacts)) expected_output = [ np.array([0, 0.2, 0.4], dtype=np.float32), @@ -150,7 +143,6 @@ def test_ScaleTo01(self): write_artifact_location=self.artifact_location).with_transform( tft.ScaleTo01(columns=['x']))) - _ = (result | beam.Map(assert_ScaleTo01_artifacts)) expected_output = ( np.array([0], dtype=np.float32), np.array([0.2], dtype=np.float32), @@ -179,7 +171,6 @@ def test_bucketize(self): | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.Bucketize(columns=['x'], num_buckets=3))) - _ = (result | beam.Map(assert_bucketize_artifacts)) transformed_data = (result | beam.Map(lambda x: x.x)) expected_data = [ @@ -202,8 +193,6 @@ def test_bucketize_list(self): | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.Bucketize(columns=['x'], num_buckets=3))) - _ = (list_result | beam.Map(assert_bucketize_artifacts)) - transformed_data = ( list_result | "TransformedColumnX" >> beam.Map(lambda ele: ele.x)) @@ -214,36 +203,6 @@ def test_bucketize_list(self): assert_that( transformed_data, equal_to(expected_data, equals_fn=np.array_equal)) - @parameterized.expand([ - (range(1, 10), [4, 7]), - (range(9, 0, -1), [4, 7]), - (range(19, 0, -1), [10]), - (range(1, 100), [25, 50, 75]), - # similar to the above but with odd number of elements - (range(1, 100, 2), [25, 51, 75]), - (range(99, 0, -1), range(10, 100, 10)) - ]) - def test_bucketize_boundaries(self, test_input, expected_boundaries): - # boundaries are outputted as artifacts for the Bucketize transform. - data = [{'x': [i]} for i in test_input] - num_buckets = len(expected_boundaries) + 1 - with beam.Pipeline() as p: - result = ( - p - | "Create" >> beam.Create(data) - | "MLTransform" >> base.MLTransform( - write_artifact_location=self.artifact_location).with_transform( - tft.Bucketize(columns=['x'], num_buckets=num_buckets))) - actual_boundaries = ( - result - | beam.Map(lambda x: x.as_dict()) - | beam.Map(lambda x: x['x_quantiles'])) - - def assert_boundaries(actual_boundaries): - assert np.array_equal(actual_boundaries, expected_boundaries) - - _ = (actual_boundaries | beam.Map(assert_boundaries)) - class ApplyBucketsTest(unittest.TestCase): def setUp(self) -> None: @@ -731,10 +690,6 @@ def test_bag_of_words_on_by_splitting_input_text(self): assert_that(result, equal_to(expected_data, equals_fn=np.array_equal)) def test_count_per_key_on_list(self): - def map_element_to_count(elements, counts): - d = {elements[i]: counts[i] for i in range(len(elements))} - return d - data = [{ 'x': ['I', 'like', 'pie', 'pie', 'pie'], }, { @@ -743,25 +698,28 @@ def map_element_to_count(elements, counts): 'x': ['Banana', 'Banana', 'Apple', 'Apple', 'Apple', 'Apple'] }] with beam.Pipeline() as p: - result = ( + _ = ( p | "Create" >> beam.Create(data) | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location, transforms=[ - tft.BagOfWords(columns=['x'], compute_word_count=True) + tft.BagOfWords( + columns=['x'], + compute_word_count=True, + key_vocab_filename='my_vocab') ])) - # the unique elements and counts are artifacts and will be - # stored in the result and same for all the elements in the - # PCollection. - result = result | beam.Map( - lambda x: map_element_to_count(x.x_unique_elements, x.x_counts)) + def validate_count_per_key(key_vocab_filename): + key_vocab_location = os.path.join( + self.artifact_location, 'transform_fn/assets', key_vocab_filename) + with open(key_vocab_location, 'r') as f: + key_vocab_list = [line.strip() for line in f] + return key_vocab_list - expected_data = [{ - b'Apple': 4, b'Banana': 2, b'I': 1, b'like': 1, b'pie': 4, b'yum': 2 - }] * 3 # since there are 3 elements in input. - assert_that(result, equal_to(expected_data)) + expected_data = ['2 yum', '4 Apple', '1 like', '1 I', '4 pie', '2 Banana'] + actual_data = validate_count_per_key('my_vocab') + self.assertEqual(expected_data, actual_data) if __name__ == '__main__': diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 1e797d96074f..e4cf09cacba4 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -162,7 +162,7 @@ deps = holdup==1.8.0 extras = gcp -allowlist_externals = +allowlist_externals = bash echo sleep @@ -194,7 +194,7 @@ deps = extras = azure passenv = REQUESTS_CA_BUNDLE -allowlist_externals = +allowlist_externals = wget az bash @@ -311,11 +311,12 @@ commands = # Run all DataFrame API unit tests bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe' -[testenv:py{38,39}-tft-113] +[testenv:py{38,39}-tft-{113,114}] deps = 113: tensorflow_transform>=1.13.0,<1.14.0 + 114: tensorflow_transform>=1.14.0,<1.15.0 commands = - bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms' + bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py' [testenv:py{38,39,310,311}-pytorch-{19,110,111,112,113}] deps = diff --git a/website/www/site/content/en/documentation/ml/preprocess-data.md b/website/www/site/content/en/documentation/ml/preprocess-data.md index cb79afff6036..2b291b9c75a5 100644 --- a/website/www/site/content/en/documentation/ml/preprocess-data.md +++ b/website/www/site/content/en/documentation/ml/preprocess-data.md @@ -105,7 +105,7 @@ artifacts. When you use the `write_artifact_location` parameter, the `MLTransform` class runs the specified transformations on the dataset and then creates artifacts from these transformations. The artifacts are stored in the location that you specify in -the `write_artifact_location` parameter or in the `MLTransform` output. +the `write_artifact_location` parameter. Write mode is useful when you want to store the results of your transformations for future use. For example, if you apply the same transformations on a @@ -120,8 +120,7 @@ The following examples demonstrate how write mode works. The `ComputeAndApplyVocabulary` transform outputs the indices of the vocabulary to the vocabulary file. - The `ScaleToZScore` transform calculates the mean and variance over the entire dataset - and then normalizes the entire dataset using the mean and variance. The - mean and variance are outputted by the `MLTransform` operation. + and then normalizes the entire dataset using the mean and variance. When you use the `write_artifact_location` parameter, these values are stored as a `tensorflow` graph in the location specified by the `write_artifact_location` parameter value. You can reuse the values in read mode