From e850957ae3b7e6990cea32f76ce3645f5834a55e Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Thu, 30 Jul 2020 11:19:00 -0700 Subject: [PATCH 01/10] Add support for encoding Maps and Nulls (in container types) in Python RowCoder --- .../model/fnexecution/v1/standard_coders.yaml | 28 +++++++ .../src/main/proto/beam_runner_api.proto | 13 ++- model/pipeline/src/main/proto/schema.proto | 3 + .../core/construction/CommonCoderTest.java | 19 +++-- sdks/python/apache_beam/coders/coder_impl.py | 81 +++++++++++++++++++ sdks/python/apache_beam/coders/coders.py | 53 ++++++++++++ sdks/python/apache_beam/coders/row_coder.py | 68 +++++++++------- .../apache_beam/coders/row_coder_test.py | 19 ++++- .../coders/standard_coders_test.py | 11 ++- sdks/python/apache_beam/typehints/schemas.py | 5 ++ 10 files changed, 259 insertions(+), 41 deletions(-) diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index 80e4eb833e27..506bb8ec022b 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -384,3 +384,31 @@ nested: false examples: "\x02\x01\x02\x01": {f_bool: True, f_bytes: null} "\x02\x00\x00\x04ab\x00c": {f_bool: False, f_bytes: "ab\0c"} + +--- + +# Binary data generated with the python SDK: +# +# import typing +# import apache_beam as beam +# class Test(typing.NamedTuple): +# f_map: typing.Mapping[str,int] +# schema = beam.typehints.schemas.named_tuple_to_schema(Test) +# coder = beam.coders.row_coder.RowCoder(schema) +# print("payload = %s" % schema.SerializeToString()) +# examples = (Test(f_map={}), +# Test(f_map={"foo": 9001, "bar": 9223372036854775807}), +# Test(f_map={"everything": None, "is": None, "null!": None, "¯\_(ツ)_/¯": None})) +# for example in examples: +# print("example = %s" % coder.encode(example)) +coder: + urn: "beam:coder:row:v1" + # f_map: map + payload: "\n\x15\n\x05f_map\x1a\x0c*\n\n\x02\x10\x07\x12\x04\x08\x01\x10\x04\x12$d8c8f969-14e6-457f-a8b5-62a1aec7f1cd" + # map ordering is non-deterministic + non_deterministic: True +nested: false +examples: + "\x01\x00\x00\x00\x00\x00": {f_map:{}} + "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map:{"foo": 9001, "bar": 9223372036854775807}} + "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map:{"everything":null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}} diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 36237903a668..8b1ce6b301d4 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) - // MapType: not yet a standard coder (BEAM-7996) + // MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // + // The MapType is encoded by: + // - An INT32 representing the size of the map (N) + // - Followed by N interleaved keys and values, encoded with their + // corresponding coder. + // + // Nullable types in container types (ArrayType, MapType) are encoded by: + // - A one byte null indicator, 0x00 for null values, or 0x01 for present + // values. + // - For present values the null indicator is followed by the value + // encoded with it's corresponding coder. + // // The payload for RowCoder is an instance of Schema. // Components: None // Experimental. diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto index dcf75ca4a5ee..bffa5f1decbb 100644 --- a/model/pipeline/src/main/proto/schema.proto +++ b/model/pipeline/src/main/proto/schema.proto @@ -19,6 +19,9 @@ // ** Experimental ** // Protocol Buffers describing Beam Schemas, a portable representation for // complex types. +// +// The primary application of Schema is as the payload for the standard coder +// "beam:coder:row:v1", defined in beam_runner_api.proto syntax = "proto3"; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java index 10221b83ae76..7226eee83f14 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java @@ -21,7 +21,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; @@ -43,6 +42,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -340,6 +340,8 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co } private static Object parseField(Object value, Schema.FieldType fieldType) { + if (value == null) return null; + switch (fieldType.getTypeName()) { case BYTE: return ((Number) value).byteValue(); @@ -366,12 +368,15 @@ private static Object parseField(Object value, Schema.FieldType fieldType) { .map((element) -> parseField(element, fieldType.getCollectionElementType())) .collect(toImmutableList()); case MAP: - Map kvMap = (Map) value; - return kvMap.entrySet().stream() - .collect( - toImmutableMap( - (pair) -> parseField(pair.getKey(), fieldType.getMapKeyType()), - (pair) -> parseField(pair.getValue(), fieldType.getMapValueType()))); + Map kvMap = new HashMap<>(); + ((Map) value) + .entrySet().stream() + .forEach( + (entry) -> + kvMap.put( + parseField(entry.getKey(), fieldType.getMapKeyType()), + parseField(entry.getValue(), fieldType.getMapValueType()))); + return kvMap; case ROW: Map rowMap = (Map) value; Schema schema = fieldType.getRowSchema(); diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 8fdc4a42e64b..fd3126ac99dc 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -530,6 +530,87 @@ def estimate_size(self, unused_value, nested=False): return 1 +class MapCoderImpl(CoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for typing.Mapping objects.""" + def __init__( + self, + key_coder, # type: CoderImpl + value_coder # type: CoderImpl + ): + self._key_coder = key_coder + self._value_coder = value_coder + + def encode_to_stream(self, value, out, nested): + size = len(value) + out.write_bigendian_int32(size) + for i, kv in enumerate(value.items()): + key, value = kv + last = i == size - 1 + self._key_coder.encode_to_stream(key, out, True) + self._value_coder.encode_to_stream(value, out, not (last and not nested)) + + def decode_from_stream(self, in_stream, nested): + size = in_stream.read_bigendian_int32() + result = {} + for i in range(size): + last = i == size - 1 + key = self._key_coder.decode_from_stream(in_stream, True) + value = self._value_coder.decode_from_stream( + in_stream, not (last and not nested)) + result[key] = value + + return result + + def estimate_size(self, unused_value, nested=False): + estimate = 4 # 4 bytes for int32 size prefix + for i, kv in enumerate(unused_value.items()): + key, value = kv + last = i == len(unused_value) - 1 + estimate += self._key_coder.estimate_size(key, True) + estimate += self._value_coder.estimate_size( + value, not (last and not nested)) + + +class NullableCoderImpl(CoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for typing.Optional objects.""" + + ENCODE_NULL = 0 + ENCODE_PRESENT = 1 + + def __init__( + self, + value_coder # type: CoderImpl + ): + self._value_coder = value_coder + + def encode_to_stream(self, value, out, nested): + if value is None: + out.write_byte(self.ENCODE_NULL) + else: + out.write_byte(self.ENCODE_PRESENT) + self._value_coder.encode_to_stream(value, out, nested) + + def decode_from_stream(self, in_stream, nested): + null_indicator = in_stream.read_byte() + if null_indicator == self.ENCODE_NULL: + return None + elif null_indicator == self.ENCODE_PRESENT: + return self._value_coder.decode_from_stream(in_stream, nested) + else: + raise ValueError( + "Encountered unexpected value for null indicator: '%s'" % + null_indicator) + + def estimate_size(self, unused_value, nested=False): + return 1 + ( + self._value_coder.estimate_size(unused_value) + if unused_value is not None else 0) + + class FloatCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, out, nested): diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 399a46dfd7a4..b6aca0afc9ae 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -83,6 +83,8 @@ 'FastPrimitivesCoder', 'FloatCoder', 'IterableCoder', + 'MapCoder', + 'NullableCoder', 'PickleCoder', 'ProtoCoder', 'SingletonCoder', @@ -520,6 +522,57 @@ def __hash__(self): Coder.register_structured_urn(common_urns.coders.BOOL.urn, BooleanCoder) +class MapCoder(FastCoder): + def __init__(self, key_coder, value_coder): + # type: (Coder, Coder) -> None + self._key_coder = key_coder + self._value_coder = value_coder + + def _create_impl(self): + return coder_impl.MapCoderImpl( + self._key_coder.get_impl(), self._value_coder.get_impl()) + + def to_type_hint(self): + return typehints.Dict[self._key_coder.to_type_hint(), + self._value_coder.to_type_hint()] + + def is_deterministic(self): + # () -> bool + # Map ordering is non-deterministic + return False + + def __eq__(self, other): + return ( + type(self) == type(other) and self._key_coder == other._key_coder and + self._value_coder == other._value_coder) + + def __hash__(self): + return hash(type(self)) + hash(self._key_coder) + hash(self._value_coder) + + +class NullableCoder(FastCoder): + def __init__(self, value_coder): + # type: (Coder) -> None + self._value_coder = value_coder + + def _create_impl(self): + return coder_impl.NullableCoderImpl(self._value_coder.get_impl()) + + def to_type_hint(self): + return typehints.Optional[self._value_coder.to_type_hint()] + + def is_deterministic(self): + # () -> bool + return self._value_coder.is_deterministic() + + def __eq__(self, other): + return ( + type(self) == type(other) and self._value_coder == other._value_coder) + + def __hash__(self): + return hash(type(self)) + hash(self._value_coder) + + class VarIntCoder(FastCoder): """Variable-length integer coder.""" def _create_impl(self): diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index 3ad880f7e900..02c7f0629817 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -30,6 +30,8 @@ from apache_beam.coders.coders import FastCoder from apache_beam.coders.coders import FloatCoder from apache_beam.coders.coders import IterableCoder +from apache_beam.coders.coders import MapCoder +from apache_beam.coders.coders import NullableCoder from apache_beam.coders.coders import StrUtf8Coder from apache_beam.coders.coders import TupleCoder from apache_beam.coders.coders import VarIntCoder @@ -58,8 +60,9 @@ def __init__(self, schema): to encode/decode. """ self.schema = schema + # Use non-null coders because null values are represented separately self.components = [ - RowCoder.coder_from_type(field.type) for field in self.schema.fields + _nonnull_coder_from_type(field.type) for field in self.schema.fields ] def _create_impl(self): @@ -102,32 +105,6 @@ def from_payload(payload): # type: (bytes) -> RowCoder return RowCoder(proto_utils.parse_Bytes(payload, schema_pb2.Schema)) - @staticmethod - def coder_from_type(field_type): - type_info = field_type.WhichOneof("type_info") - if type_info == "atomic_type": - if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64): - return VarIntCoder() - elif field_type.atomic_type == schema_pb2.DOUBLE: - return FloatCoder() - elif field_type.atomic_type == schema_pb2.STRING: - return StrUtf8Coder() - elif field_type.atomic_type == schema_pb2.BOOLEAN: - return BooleanCoder() - elif field_type.atomic_type == schema_pb2.BYTES: - return BytesCoder() - elif type_info == "array_type": - return IterableCoder( - RowCoder.coder_from_type(field_type.array_type.element_type)) - elif type_info == "row_type": - return RowCoder(field_type.row_type.schema) - - # The Java SDK supports several more types, but the coders are not yet - # standard, and are not implemented in Python. - raise ValueError( - "Encountered a type that is not currently supported by RowCoder: %s" % - field_type) - def __reduce__(self): # when pickling, use bytes representation of the schema. schema_pb2.Schema # objects cannot be pickled. @@ -137,6 +114,43 @@ def __reduce__(self): typecoders.registry.register_coder(row_type.RowTypeConstraint, RowCoder) +def _coder_from_type(field_type): + coder = _nonnull_coder_from_type(field_type) + if field_type.nullable: + return NullableCoder(coder) + else: + return coder + + +def _nonnull_coder_from_type(field_type): + type_info = field_type.WhichOneof("type_info") + if type_info == "atomic_type": + if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64): + return VarIntCoder() + elif field_type.atomic_type == schema_pb2.DOUBLE: + return FloatCoder() + elif field_type.atomic_type == schema_pb2.STRING: + return StrUtf8Coder() + elif field_type.atomic_type == schema_pb2.BOOLEAN: + return BooleanCoder() + elif field_type.atomic_type == schema_pb2.BYTES: + return BytesCoder() + elif type_info == "array_type": + return IterableCoder(_coder_from_type(field_type.array_type.element_type)) + elif type_info == "map_type": + return MapCoder( + _coder_from_type(field_type.map_type.key_type), + _coder_from_type(field_type.map_type.value_type)) + elif type_info == "row_type": + return RowCoder(field_type.row_type.schema) + + # The Java SDK supports several more types, but the coders are not yet + # standard, and are not implemented in Python. + raise ValueError( + "Encountered a type that is not currently supported by RowCoder: %s" % + field_type) + + class RowCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" SIZE_CODER = VarIntCoder().get_impl() diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py index 65b102446b56..4277e58c8fa6 100644 --- a/sdks/python/apache_beam/coders/row_coder_test.py +++ b/sdks/python/apache_beam/coders/row_coder_test.py @@ -45,14 +45,15 @@ ("aliases", typing.List[unicode]), ("knows_javascript", bool), # TODO(BEAM-7372): Use bytes instead of ByteString - ("payload", typing.Optional[typing.ByteString]) + ("payload", typing.Optional[typing.ByteString]), + ("custom_metadata", typing.Mapping[unicode, int]) ]) coders_registry.register_coder(Person, RowCoder) class RowCoderTest(unittest.TestCase): - JON_SNOW = Person("Jon Snow", 23, None, ["crow", "wildling"], False, None) + JON_SNOW = Person("Jon Snow", 23, None, ["crow", "wildling"], False, None, {}) PEOPLE = [ JON_SNOW, Person( @@ -60,8 +61,9 @@ class RowCoderTest(unittest.TestCase): 25, "Westeros", ["Mother of Dragons"], False, - None), - Person("Michael Bluth", 30, None, [], True, b"I've made a huge mistake") + None, {"dragons": 3}), + Person( + "Michael Bluth", 30, None, [], True, b"I've made a huge mistake", {}) ] def test_create_row_coder_from_named_tuple(self): @@ -102,6 +104,15 @@ def test_create_row_coder_from_schema(self): name="payload", type=schema_pb2.FieldType( atomic_type=schema_pb2.BYTES, nullable=True)), + schema_pb2.Field( + name="custom_metadata", + type=schema_pb2.FieldType( + map_type=schema_pb2.MapType( + key_type=schema_pb2.FieldType( + atomic_type=schema_pb2.STRING), + value_type=schema_pb2.FieldType( + atomic_type=schema_pb2.INT64), + ))), ]) coder = RowCoder(schema) diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index 4a1389581128..49c1741cb4db 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -78,6 +78,13 @@ def parse_float(s): def value_parser_from_schema(schema): def attribute_parser_from_type(type_): + parser = nonnull_attribute_parser_from_type(type_) + if type_.nullable: + return lambda x: None if x is None else parser(x) + else: + return parser + + def nonnull_attribute_parser_from_type(type_): # TODO: This should be exhaustive type_info = type_.WhichOneof("type_info") if type_info == "atomic_type": @@ -89,8 +96,8 @@ def attribute_parser_from_type(type_): element_parser = attribute_parser_from_type(type_.array_type.element_type) return lambda x: list(map(element_parser, x)) elif type_info == "map_type": - key_parser = attribute_parser_from_type(type_.array_type.key_type) - value_parser = attribute_parser_from_type(type_.array_type.value_type) + key_parser = attribute_parser_from_type(type_.map_type.key_type) + value_parser = attribute_parser_from_type(type_.map_type.value_type) return lambda x: dict( (key_parser(k), value_parser(v)) for k, v in x.items()) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 0cc513ffeda2..cb4cf0114263 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -178,6 +178,11 @@ def typing_to_runner_api(type_): return schema_pb2.FieldType( array_type=schema_pb2.ArrayType(element_type=element_type)) + elif _safe_issubclass(type_, Mapping): + key_type, value_type = map(typing_to_runner_api, _get_args(type_)) + return schema_pb2.FieldType( + map_type=schema_pb2.MapType(key_type=key_type, value_type=value_type)) + raise ValueError("Unsupported type: %s" % type_) From 587dde57cbb2b0095a1fa04b59798d1b62c66f18 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Thu, 30 Jul 2020 13:56:42 -0700 Subject: [PATCH 02/10] fixup! Add support for encoding Maps and Nulls (in container types) in Python RowCoder --- .../runners/core/construction/CommonCoderTest.java | 4 +++- sdks/python/apache_beam/coders/coder_impl.py | 5 +++-- .../apache_beam/coders/coders_test_common.py | 14 ++++++++++++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java index 7226eee83f14..3f70736ec51a 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java @@ -340,7 +340,9 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co } private static Object parseField(Object value, Schema.FieldType fieldType) { - if (value == null) return null; + if (value == null) { + return null; + } switch (fieldType.getTypeName()) { case BYTE: diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index fd3126ac99dc..1ee6dc43bcdc 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -530,7 +530,7 @@ def estimate_size(self, unused_value, nested=False): return 1 -class MapCoderImpl(CoderImpl): +class MapCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees. A coder for typing.Mapping objects.""" @@ -571,9 +571,10 @@ def estimate_size(self, unused_value, nested=False): estimate += self._key_coder.estimate_size(key, True) estimate += self._value_coder.estimate_size( value, not (last and not nested)) + return estimate -class NullableCoderImpl(CoderImpl): +class NullableCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees. A coder for typing.Optional objects.""" diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index f1a771a78568..e1ce23b1271c 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -82,8 +82,8 @@ def tearDownClass(cls): coders.ToBytesCoder ]) cls.seen_nested -= set([coders.ProtoCoder, CustomCoder]) - assert not standard - cls.seen - assert not cls.seen_nested - standard + assert not standard - cls.seen, str(standard - cls.seen) + assert not cls.seen_nested - standard, str(cls.seen_nested - standard) @classmethod def _observe(cls, coder): @@ -560,6 +560,16 @@ def iterable_state_read(token, element_coder_impl): context=context, test_size_estimation=False) + def test_nullable_coder(self): + self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64) + + def test_map_coder(self): + self.check_coder( + coders.MapCoder(coders.VarIntCoder(), coders.StrUtf8Coder()), { + 1: "one", 300: "three hundred" + }, {}, {i: str(i) + for i in range(5000)}) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 0dfff978855e2e12087cd37f854eeffb6b2f2d91 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 4 Aug 2020 09:56:00 -0700 Subject: [PATCH 03/10] fixup! Add support for encoding Maps and Nulls (in container types) in Python RowCoder --- sdks/python/apache_beam/coders/coder_impl.py | 23 +++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 1ee6dc43bcdc..501bc5e7eb6e 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -533,6 +533,14 @@ def estimate_size(self, unused_value, nested=False): class MapCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees. + Note this implementation always uses nested context when encoding keys + and values. This differs from Java's MapCoder, which uses + nested=False if possible for the last value encoded. + + This difference is acceptable because MapCoder is not standard. It is only + used in a standard context by RowCoder which always uses nested context for + attribute values. + A coder for typing.Mapping objects.""" def __init__( self, @@ -547,18 +555,19 @@ def encode_to_stream(self, value, out, nested): out.write_bigendian_int32(size) for i, kv in enumerate(value.items()): key, value = kv - last = i == size - 1 + # Note this implementation always uses nested context when encoding keys + # and values which differs from Java. See note in docstring. self._key_coder.encode_to_stream(key, out, True) - self._value_coder.encode_to_stream(value, out, not (last and not nested)) + self._value_coder.encode_to_stream(value, out, True) def decode_from_stream(self, in_stream, nested): size = in_stream.read_bigendian_int32() result = {} for i in range(size): - last = i == size - 1 + # Note this implementation always uses nested context when encoding keys + # and values which differs from Java. See note in docstring. key = self._key_coder.decode_from_stream(in_stream, True) - value = self._value_coder.decode_from_stream( - in_stream, not (last and not nested)) + value = self._value_coder.decode_from_stream(in_stream, True) result[key] = value return result @@ -567,10 +576,8 @@ def estimate_size(self, unused_value, nested=False): estimate = 4 # 4 bytes for int32 size prefix for i, kv in enumerate(unused_value.items()): key, value = kv - last = i == len(unused_value) - 1 estimate += self._key_coder.estimate_size(key, True) - estimate += self._value_coder.estimate_size( - value, not (last and not nested)) + estimate += self._value_coder.estimate_size(value, True) return estimate From 16c06ea87222f3a35bb38dda66d71361722270d8 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 4 Aug 2020 11:07:30 -0700 Subject: [PATCH 04/10] Don't specify nested in row coder tests --- .../org/apache/beam/model/fnexecution/v1/standard_coders.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index 506bb8ec022b..f42fd7833390 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -347,7 +347,6 @@ coder: urn: "beam:coder:row:v1" # str: string, i32: int32, f64: float64, arr: array[string] payload: "\n\t\n\x03str\x1a\x02\x10\x07\n\t\n\x03i32\x1a\x02\x10\x03\n\t\n\x03f64\x1a\x02\x10\x06\n\r\n\x03arr\x1a\x06\x1a\x04\n\x02\x10\x07\x12$4e5e554c-d4c1-4a5d-b5e1-f3293a6b9f05" -nested: false examples: "\u0004\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a\0\0\0\u0003\u0003foo\u0003bar\u0003baz": {str: "foo", i32: 9001, f64: "0.1", arr: ["foo", "bar", "baz"]} @@ -357,7 +356,6 @@ coder: urn: "beam:coder:row:v1" # str: nullable string, i32: nullable int32, f64: nullable float64 payload: "\n\x0b\n\x03str\x1a\x04\x08\x01\x10\x07\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\n\x0b\n\x03f64\x1a\x04\x08\x01\x10\x06\x12$b20c6545-57af-4bc8-b2a9-51ace21c7393" -nested: false examples: "\u0003\u0001\u0007": {str: null, i32: null, f64: null} "\u0003\u0001\u0004\u0003foo\u00a9\u0046": {str: "foo", i32: 9001, f64: null} @@ -380,7 +378,6 @@ coder: urn: "beam:coder:row:v1" # f_bool: boolean, f_bytes: nullable bytes payload: "\n\x0c\n\x06f_bool\x1a\x02\x10\x08\n\x0f\n\x07f_bytes\x1a\x04\x08\x01\x10\t\x12$eea1b747-7571-43d3-aafa-9255afdceafb" -nested: false examples: "\x02\x01\x02\x01": {f_bool: True, f_bytes: null} "\x02\x00\x00\x04ab\x00c": {f_bool: False, f_bytes: "ab\0c"} From 33354403a0e22dd20646175e34f12b379a5ef75e Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 4 Aug 2020 11:07:52 -0700 Subject: [PATCH 05/10] fixup! Add support for encoding Maps and Nulls (in container types) in Python RowCoder --- .../apache/beam/model/fnexecution/v1/standard_coders.yaml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index f42fd7833390..08b823d1767c 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -404,8 +404,7 @@ coder: payload: "\n\x15\n\x05f_map\x1a\x0c*\n\n\x02\x10\x07\x12\x04\x08\x01\x10\x04\x12$d8c8f969-14e6-457f-a8b5-62a1aec7f1cd" # map ordering is non-deterministic non_deterministic: True -nested: false examples: - "\x01\x00\x00\x00\x00\x00": {f_map:{}} - "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map:{"foo": 9001, "bar": 9223372036854775807}} - "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map:{"everything":null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}} + "\x01\x00\x00\x00\x00\x00": {f_map: {}} + "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map: {"foo": 9001, "bar": 9223372036854775807}} + "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything":null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}} From 0416e2ed65e5e859c4db1466254758a30d2200b7 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 4 Aug 2020 11:08:24 -0700 Subject: [PATCH 06/10] Don't mutate value when reading rows --- .../apache/beam/runners/core/construction/CommonCoderTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java index 3f70736ec51a..2c4090e8bf11 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java @@ -380,7 +380,8 @@ private static Object parseField(Object value, Schema.FieldType fieldType) { parseField(entry.getValue(), fieldType.getMapValueType()))); return kvMap; case ROW: - Map rowMap = (Map) value; + // Clone map so we don't mutate the underlying value + Map rowMap = new HashMap<>((Map) value); Schema schema = fieldType.getRowSchema(); Row.Builder row = Row.withSchema(schema); for (Schema.Field field : schema.getFields()) { From 9bb591d9a8238a87bd8e44a82f3be7a84ca4b4e7 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 4 Aug 2020 11:11:07 -0700 Subject: [PATCH 07/10] fixup! Add support for encoding Maps and Nulls (in container types) in Python RowCoder --- sdks/python/apache_beam/coders/coder_impl.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 501bc5e7eb6e..aeb3e6b11b90 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -553,8 +553,7 @@ def __init__( def encode_to_stream(self, value, out, nested): size = len(value) out.write_bigendian_int32(size) - for i, kv in enumerate(value.items()): - key, value = kv + for key, value in value.items(): # Note this implementation always uses nested context when encoding keys # and values which differs from Java. See note in docstring. self._key_coder.encode_to_stream(key, out, True) @@ -563,7 +562,7 @@ def encode_to_stream(self, value, out, nested): def decode_from_stream(self, in_stream, nested): size = in_stream.read_bigendian_int32() result = {} - for i in range(size): + for _ in range(size): # Note this implementation always uses nested context when encoding keys # and values which differs from Java. See note in docstring. key = self._key_coder.decode_from_stream(in_stream, True) From d4f55982291aa23dd4ea2284ee74194dc65ec9cc Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 4 Aug 2020 15:21:25 -0700 Subject: [PATCH 08/10] fixup! Add support for encoding Maps and Nulls (in container types) in Python RowCoder --- sdks/python/apache_beam/coders/coder_impl.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index aeb3e6b11b90..86da51f7a6a4 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -550,10 +550,10 @@ def __init__( self._key_coder = key_coder self._value_coder = value_coder - def encode_to_stream(self, value, out, nested): - size = len(value) + def encode_to_stream(self, dict_value, out, nested): + size = len(dict_value) out.write_bigendian_int32(size) - for key, value in value.items(): + for key, value in dict_value.items(): # Note this implementation always uses nested context when encoding keys # and values which differs from Java. See note in docstring. self._key_coder.encode_to_stream(key, out, True) @@ -573,8 +573,7 @@ def decode_from_stream(self, in_stream, nested): def estimate_size(self, unused_value, nested=False): estimate = 4 # 4 bytes for int32 size prefix - for i, kv in enumerate(unused_value.items()): - key, value = kv + for key, value in unused_value.items(): estimate += self._key_coder.estimate_size(key, True) estimate += self._value_coder.estimate_size(value, True) return estimate From 73f5602c9eb2fc4a7971fdea12faa0badf0c26b0 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 4 Aug 2020 15:22:10 -0700 Subject: [PATCH 09/10] Python: Don't mutate value --- sdks/python/apache_beam/coders/standard_coders_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index 49c1741cb4db..df889785bc2d 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -29,6 +29,7 @@ import sys import unittest from builtins import map +from copy import deepcopy from typing import Dict from typing import Tuple @@ -108,6 +109,7 @@ def nonnull_attribute_parser_from_type(type_): def value_parser(x): result = [] + x = deepcopy(x) for name, parser in parsers: value = x.pop(name) result.append(None if value is None else parser(value)) From 31f246e62056ecf9c1acb0a816b2feef555020c2 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 5 Aug 2020 14:45:20 -0700 Subject: [PATCH 10/10] fixup! Add support for encoding Maps and Nulls (in container types) in Python RowCoder --- sdks/python/apache_beam/coders/coder_impl.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 86da51f7a6a4..54c63e77dd03 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -551,8 +551,7 @@ def __init__( self._value_coder = value_coder def encode_to_stream(self, dict_value, out, nested): - size = len(dict_value) - out.write_bigendian_int32(size) + out.write_bigendian_int32(len(dict_value)) for key, value in dict_value.items(): # Note this implementation always uses nested context when encoding keys # and values which differs from Java. See note in docstring.