diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index 80e4eb833e27..08b823d1767c 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -347,7 +347,6 @@ coder: urn: "beam:coder:row:v1" # str: string, i32: int32, f64: float64, arr: array[string] payload: "\n\t\n\x03str\x1a\x02\x10\x07\n\t\n\x03i32\x1a\x02\x10\x03\n\t\n\x03f64\x1a\x02\x10\x06\n\r\n\x03arr\x1a\x06\x1a\x04\n\x02\x10\x07\x12$4e5e554c-d4c1-4a5d-b5e1-f3293a6b9f05" -nested: false examples: "\u0004\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a\0\0\0\u0003\u0003foo\u0003bar\u0003baz": {str: "foo", i32: 9001, f64: "0.1", arr: ["foo", "bar", "baz"]} @@ -357,7 +356,6 @@ coder: urn: "beam:coder:row:v1" # str: nullable string, i32: nullable int32, f64: nullable float64 payload: "\n\x0b\n\x03str\x1a\x04\x08\x01\x10\x07\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\n\x0b\n\x03f64\x1a\x04\x08\x01\x10\x06\x12$b20c6545-57af-4bc8-b2a9-51ace21c7393" -nested: false examples: "\u0003\u0001\u0007": {str: null, i32: null, f64: null} "\u0003\u0001\u0004\u0003foo\u00a9\u0046": {str: "foo", i32: 9001, f64: null} @@ -380,7 +378,33 @@ coder: urn: "beam:coder:row:v1" # f_bool: boolean, f_bytes: nullable bytes payload: "\n\x0c\n\x06f_bool\x1a\x02\x10\x08\n\x0f\n\x07f_bytes\x1a\x04\x08\x01\x10\t\x12$eea1b747-7571-43d3-aafa-9255afdceafb" -nested: false examples: "\x02\x01\x02\x01": {f_bool: True, f_bytes: null} "\x02\x00\x00\x04ab\x00c": {f_bool: False, f_bytes: "ab\0c"} + +--- + +# Binary data generated with the python SDK: +# +# import typing +# import apache_beam as beam +# class Test(typing.NamedTuple): +# f_map: typing.Mapping[str,int] +# schema = beam.typehints.schemas.named_tuple_to_schema(Test) +# coder = beam.coders.row_coder.RowCoder(schema) +# print("payload = %s" % schema.SerializeToString()) +# examples = (Test(f_map={}), +# Test(f_map={"foo": 9001, "bar": 9223372036854775807}), +# Test(f_map={"everything": None, "is": None, "null!": None, "¯\_(ツ)_/¯": None})) +# for example in examples: +# print("example = %s" % coder.encode(example)) +coder: + urn: "beam:coder:row:v1" + # f_map: map + payload: "\n\x15\n\x05f_map\x1a\x0c*\n\n\x02\x10\x07\x12\x04\x08\x01\x10\x04\x12$d8c8f969-14e6-457f-a8b5-62a1aec7f1cd" + # map ordering is non-deterministic + non_deterministic: True +examples: + "\x01\x00\x00\x00\x00\x00": {f_map: {}} + "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map: {"foo": 9001, "bar": 9223372036854775807}} + "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything":null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}} diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 36237903a668..8b1ce6b301d4 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -855,10 +855,21 @@ message StandardCoders { // BOOLEAN: beam:coder:bool:v1 // BYTES: beam:coder:bytes:v1 // ArrayType: beam:coder:iterable:v1 (always has a known length) - // MapType: not yet a standard coder (BEAM-7996) + // MapType: not a standard coder, specification defined below. // RowType: beam:coder:row:v1 // LogicalType: Uses the coder for its representation. // + // The MapType is encoded by: + // - An INT32 representing the size of the map (N) + // - Followed by N interleaved keys and values, encoded with their + // corresponding coder. + // + // Nullable types in container types (ArrayType, MapType) are encoded by: + // - A one byte null indicator, 0x00 for null values, or 0x01 for present + // values. + // - For present values the null indicator is followed by the value + // encoded with it's corresponding coder. + // // The payload for RowCoder is an instance of Schema. // Components: None // Experimental. diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto index dcf75ca4a5ee..bffa5f1decbb 100644 --- a/model/pipeline/src/main/proto/schema.proto +++ b/model/pipeline/src/main/proto/schema.proto @@ -19,6 +19,9 @@ // ** Experimental ** // Protocol Buffers describing Beam Schemas, a portable representation for // complex types. +// +// The primary application of Schema is as the payload for the standard coder +// "beam:coder:row:v1", defined in beam_runner_api.proto syntax = "proto3"; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java index 10221b83ae76..2c4090e8bf11 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java @@ -21,7 +21,6 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; @@ -43,6 +42,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -340,6 +340,10 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co } private static Object parseField(Object value, Schema.FieldType fieldType) { + if (value == null) { + return null; + } + switch (fieldType.getTypeName()) { case BYTE: return ((Number) value).byteValue(); @@ -366,14 +370,18 @@ private static Object parseField(Object value, Schema.FieldType fieldType) { .map((element) -> parseField(element, fieldType.getCollectionElementType())) .collect(toImmutableList()); case MAP: - Map kvMap = (Map) value; - return kvMap.entrySet().stream() - .collect( - toImmutableMap( - (pair) -> parseField(pair.getKey(), fieldType.getMapKeyType()), - (pair) -> parseField(pair.getValue(), fieldType.getMapValueType()))); + Map kvMap = new HashMap<>(); + ((Map) value) + .entrySet().stream() + .forEach( + (entry) -> + kvMap.put( + parseField(entry.getKey(), fieldType.getMapKeyType()), + parseField(entry.getValue(), fieldType.getMapValueType()))); + return kvMap; case ROW: - Map rowMap = (Map) value; + // Clone map so we don't mutate the underlying value + Map rowMap = new HashMap<>((Map) value); Schema schema = fieldType.getRowSchema(); Row.Builder row = Row.withSchema(schema); for (Schema.Field field : schema.getFields()) { diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 8fdc4a42e64b..54c63e77dd03 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -530,6 +530,92 @@ def estimate_size(self, unused_value, nested=False): return 1 +class MapCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + Note this implementation always uses nested context when encoding keys + and values. This differs from Java's MapCoder, which uses + nested=False if possible for the last value encoded. + + This difference is acceptable because MapCoder is not standard. It is only + used in a standard context by RowCoder which always uses nested context for + attribute values. + + A coder for typing.Mapping objects.""" + def __init__( + self, + key_coder, # type: CoderImpl + value_coder # type: CoderImpl + ): + self._key_coder = key_coder + self._value_coder = value_coder + + def encode_to_stream(self, dict_value, out, nested): + out.write_bigendian_int32(len(dict_value)) + for key, value in dict_value.items(): + # Note this implementation always uses nested context when encoding keys + # and values which differs from Java. See note in docstring. + self._key_coder.encode_to_stream(key, out, True) + self._value_coder.encode_to_stream(value, out, True) + + def decode_from_stream(self, in_stream, nested): + size = in_stream.read_bigendian_int32() + result = {} + for _ in range(size): + # Note this implementation always uses nested context when encoding keys + # and values which differs from Java. See note in docstring. + key = self._key_coder.decode_from_stream(in_stream, True) + value = self._value_coder.decode_from_stream(in_stream, True) + result[key] = value + + return result + + def estimate_size(self, unused_value, nested=False): + estimate = 4 # 4 bytes for int32 size prefix + for key, value in unused_value.items(): + estimate += self._key_coder.estimate_size(key, True) + estimate += self._value_coder.estimate_size(value, True) + return estimate + + +class NullableCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees. + + A coder for typing.Optional objects.""" + + ENCODE_NULL = 0 + ENCODE_PRESENT = 1 + + def __init__( + self, + value_coder # type: CoderImpl + ): + self._value_coder = value_coder + + def encode_to_stream(self, value, out, nested): + if value is None: + out.write_byte(self.ENCODE_NULL) + else: + out.write_byte(self.ENCODE_PRESENT) + self._value_coder.encode_to_stream(value, out, nested) + + def decode_from_stream(self, in_stream, nested): + null_indicator = in_stream.read_byte() + if null_indicator == self.ENCODE_NULL: + return None + elif null_indicator == self.ENCODE_PRESENT: + return self._value_coder.decode_from_stream(in_stream, nested) + else: + raise ValueError( + "Encountered unexpected value for null indicator: '%s'" % + null_indicator) + + def estimate_size(self, unused_value, nested=False): + return 1 + ( + self._value_coder.estimate_size(unused_value) + if unused_value is not None else 0) + + class FloatCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, out, nested): diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 399a46dfd7a4..b6aca0afc9ae 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -83,6 +83,8 @@ 'FastPrimitivesCoder', 'FloatCoder', 'IterableCoder', + 'MapCoder', + 'NullableCoder', 'PickleCoder', 'ProtoCoder', 'SingletonCoder', @@ -520,6 +522,57 @@ def __hash__(self): Coder.register_structured_urn(common_urns.coders.BOOL.urn, BooleanCoder) +class MapCoder(FastCoder): + def __init__(self, key_coder, value_coder): + # type: (Coder, Coder) -> None + self._key_coder = key_coder + self._value_coder = value_coder + + def _create_impl(self): + return coder_impl.MapCoderImpl( + self._key_coder.get_impl(), self._value_coder.get_impl()) + + def to_type_hint(self): + return typehints.Dict[self._key_coder.to_type_hint(), + self._value_coder.to_type_hint()] + + def is_deterministic(self): + # () -> bool + # Map ordering is non-deterministic + return False + + def __eq__(self, other): + return ( + type(self) == type(other) and self._key_coder == other._key_coder and + self._value_coder == other._value_coder) + + def __hash__(self): + return hash(type(self)) + hash(self._key_coder) + hash(self._value_coder) + + +class NullableCoder(FastCoder): + def __init__(self, value_coder): + # type: (Coder) -> None + self._value_coder = value_coder + + def _create_impl(self): + return coder_impl.NullableCoderImpl(self._value_coder.get_impl()) + + def to_type_hint(self): + return typehints.Optional[self._value_coder.to_type_hint()] + + def is_deterministic(self): + # () -> bool + return self._value_coder.is_deterministic() + + def __eq__(self, other): + return ( + type(self) == type(other) and self._value_coder == other._value_coder) + + def __hash__(self): + return hash(type(self)) + hash(self._value_coder) + + class VarIntCoder(FastCoder): """Variable-length integer coder.""" def _create_impl(self): diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index f1a771a78568..e1ce23b1271c 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -82,8 +82,8 @@ def tearDownClass(cls): coders.ToBytesCoder ]) cls.seen_nested -= set([coders.ProtoCoder, CustomCoder]) - assert not standard - cls.seen - assert not cls.seen_nested - standard + assert not standard - cls.seen, str(standard - cls.seen) + assert not cls.seen_nested - standard, str(cls.seen_nested - standard) @classmethod def _observe(cls, coder): @@ -560,6 +560,16 @@ def iterable_state_read(token, element_coder_impl): context=context, test_size_estimation=False) + def test_nullable_coder(self): + self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64) + + def test_map_coder(self): + self.check_coder( + coders.MapCoder(coders.VarIntCoder(), coders.StrUtf8Coder()), { + 1: "one", 300: "three hundred" + }, {}, {i: str(i) + for i in range(5000)}) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index 3ad880f7e900..02c7f0629817 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -30,6 +30,8 @@ from apache_beam.coders.coders import FastCoder from apache_beam.coders.coders import FloatCoder from apache_beam.coders.coders import IterableCoder +from apache_beam.coders.coders import MapCoder +from apache_beam.coders.coders import NullableCoder from apache_beam.coders.coders import StrUtf8Coder from apache_beam.coders.coders import TupleCoder from apache_beam.coders.coders import VarIntCoder @@ -58,8 +60,9 @@ def __init__(self, schema): to encode/decode. """ self.schema = schema + # Use non-null coders because null values are represented separately self.components = [ - RowCoder.coder_from_type(field.type) for field in self.schema.fields + _nonnull_coder_from_type(field.type) for field in self.schema.fields ] def _create_impl(self): @@ -102,32 +105,6 @@ def from_payload(payload): # type: (bytes) -> RowCoder return RowCoder(proto_utils.parse_Bytes(payload, schema_pb2.Schema)) - @staticmethod - def coder_from_type(field_type): - type_info = field_type.WhichOneof("type_info") - if type_info == "atomic_type": - if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64): - return VarIntCoder() - elif field_type.atomic_type == schema_pb2.DOUBLE: - return FloatCoder() - elif field_type.atomic_type == schema_pb2.STRING: - return StrUtf8Coder() - elif field_type.atomic_type == schema_pb2.BOOLEAN: - return BooleanCoder() - elif field_type.atomic_type == schema_pb2.BYTES: - return BytesCoder() - elif type_info == "array_type": - return IterableCoder( - RowCoder.coder_from_type(field_type.array_type.element_type)) - elif type_info == "row_type": - return RowCoder(field_type.row_type.schema) - - # The Java SDK supports several more types, but the coders are not yet - # standard, and are not implemented in Python. - raise ValueError( - "Encountered a type that is not currently supported by RowCoder: %s" % - field_type) - def __reduce__(self): # when pickling, use bytes representation of the schema. schema_pb2.Schema # objects cannot be pickled. @@ -137,6 +114,43 @@ def __reduce__(self): typecoders.registry.register_coder(row_type.RowTypeConstraint, RowCoder) +def _coder_from_type(field_type): + coder = _nonnull_coder_from_type(field_type) + if field_type.nullable: + return NullableCoder(coder) + else: + return coder + + +def _nonnull_coder_from_type(field_type): + type_info = field_type.WhichOneof("type_info") + if type_info == "atomic_type": + if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64): + return VarIntCoder() + elif field_type.atomic_type == schema_pb2.DOUBLE: + return FloatCoder() + elif field_type.atomic_type == schema_pb2.STRING: + return StrUtf8Coder() + elif field_type.atomic_type == schema_pb2.BOOLEAN: + return BooleanCoder() + elif field_type.atomic_type == schema_pb2.BYTES: + return BytesCoder() + elif type_info == "array_type": + return IterableCoder(_coder_from_type(field_type.array_type.element_type)) + elif type_info == "map_type": + return MapCoder( + _coder_from_type(field_type.map_type.key_type), + _coder_from_type(field_type.map_type.value_type)) + elif type_info == "row_type": + return RowCoder(field_type.row_type.schema) + + # The Java SDK supports several more types, but the coders are not yet + # standard, and are not implemented in Python. + raise ValueError( + "Encountered a type that is not currently supported by RowCoder: %s" % + field_type) + + class RowCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" SIZE_CODER = VarIntCoder().get_impl() diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py index 65b102446b56..4277e58c8fa6 100644 --- a/sdks/python/apache_beam/coders/row_coder_test.py +++ b/sdks/python/apache_beam/coders/row_coder_test.py @@ -45,14 +45,15 @@ ("aliases", typing.List[unicode]), ("knows_javascript", bool), # TODO(BEAM-7372): Use bytes instead of ByteString - ("payload", typing.Optional[typing.ByteString]) + ("payload", typing.Optional[typing.ByteString]), + ("custom_metadata", typing.Mapping[unicode, int]) ]) coders_registry.register_coder(Person, RowCoder) class RowCoderTest(unittest.TestCase): - JON_SNOW = Person("Jon Snow", 23, None, ["crow", "wildling"], False, None) + JON_SNOW = Person("Jon Snow", 23, None, ["crow", "wildling"], False, None, {}) PEOPLE = [ JON_SNOW, Person( @@ -60,8 +61,9 @@ class RowCoderTest(unittest.TestCase): 25, "Westeros", ["Mother of Dragons"], False, - None), - Person("Michael Bluth", 30, None, [], True, b"I've made a huge mistake") + None, {"dragons": 3}), + Person( + "Michael Bluth", 30, None, [], True, b"I've made a huge mistake", {}) ] def test_create_row_coder_from_named_tuple(self): @@ -102,6 +104,15 @@ def test_create_row_coder_from_schema(self): name="payload", type=schema_pb2.FieldType( atomic_type=schema_pb2.BYTES, nullable=True)), + schema_pb2.Field( + name="custom_metadata", + type=schema_pb2.FieldType( + map_type=schema_pb2.MapType( + key_type=schema_pb2.FieldType( + atomic_type=schema_pb2.STRING), + value_type=schema_pb2.FieldType( + atomic_type=schema_pb2.INT64), + ))), ]) coder = RowCoder(schema) diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index 4a1389581128..df889785bc2d 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -29,6 +29,7 @@ import sys import unittest from builtins import map +from copy import deepcopy from typing import Dict from typing import Tuple @@ -78,6 +79,13 @@ def parse_float(s): def value_parser_from_schema(schema): def attribute_parser_from_type(type_): + parser = nonnull_attribute_parser_from_type(type_) + if type_.nullable: + return lambda x: None if x is None else parser(x) + else: + return parser + + def nonnull_attribute_parser_from_type(type_): # TODO: This should be exhaustive type_info = type_.WhichOneof("type_info") if type_info == "atomic_type": @@ -89,8 +97,8 @@ def attribute_parser_from_type(type_): element_parser = attribute_parser_from_type(type_.array_type.element_type) return lambda x: list(map(element_parser, x)) elif type_info == "map_type": - key_parser = attribute_parser_from_type(type_.array_type.key_type) - value_parser = attribute_parser_from_type(type_.array_type.value_type) + key_parser = attribute_parser_from_type(type_.map_type.key_type) + value_parser = attribute_parser_from_type(type_.map_type.value_type) return lambda x: dict( (key_parser(k), value_parser(v)) for k, v in x.items()) @@ -101,6 +109,7 @@ def attribute_parser_from_type(type_): def value_parser(x): result = [] + x = deepcopy(x) for name, parser in parsers: value = x.pop(name) result.append(None if value is None else parser(value)) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 0cc513ffeda2..cb4cf0114263 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -178,6 +178,11 @@ def typing_to_runner_api(type_): return schema_pb2.FieldType( array_type=schema_pb2.ArrayType(element_type=element_type)) + elif _safe_issubclass(type_, Mapping): + key_type, value_type = map(typing_to_runner_api, _get_args(type_)) + return schema_pb2.FieldType( + map_type=schema_pb2.MapType(key_type=key_type, value_type=value_type)) + raise ValueError("Unsupported type: %s" % type_)