Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ coder:
urn: "beam:coder:row:v1"
# str: string, i32: int32, f64: float64, arr: array[string]
payload: "\n\t\n\x03str\x1a\x02\x10\x07\n\t\n\x03i32\x1a\x02\x10\x03\n\t\n\x03f64\x1a\x02\x10\x06\n\r\n\x03arr\x1a\x06\x1a\x04\n\x02\x10\x07\x12$4e5e554c-d4c1-4a5d-b5e1-f3293a6b9f05"
nested: false
examples:
"\u0004\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a\0\0\0\u0003\u0003foo\u0003bar\u0003baz": {str: "foo", i32: 9001, f64: "0.1", arr: ["foo", "bar", "baz"]}

Expand All @@ -357,7 +356,6 @@ coder:
urn: "beam:coder:row:v1"
# str: nullable string, i32: nullable int32, f64: nullable float64
payload: "\n\x0b\n\x03str\x1a\x04\x08\x01\x10\x07\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\n\x0b\n\x03f64\x1a\x04\x08\x01\x10\x06\x12$b20c6545-57af-4bc8-b2a9-51ace21c7393"
nested: false
examples:
"\u0003\u0001\u0007": {str: null, i32: null, f64: null}
"\u0003\u0001\u0004\u0003foo\u00a9\u0046": {str: "foo", i32: 9001, f64: null}
Expand All @@ -380,7 +378,33 @@ coder:
urn: "beam:coder:row:v1"
# f_bool: boolean, f_bytes: nullable bytes
payload: "\n\x0c\n\x06f_bool\x1a\x02\x10\x08\n\x0f\n\x07f_bytes\x1a\x04\x08\x01\x10\t\x12$eea1b747-7571-43d3-aafa-9255afdceafb"
nested: false
examples:
"\x02\x01\x02\x01": {f_bool: True, f_bytes: null}
"\x02\x00\x00\x04ab\x00c": {f_bool: False, f_bytes: "ab\0c"}

---

# Binary data generated with the python SDK:
#
# import typing
# import apache_beam as beam
# class Test(typing.NamedTuple):
# f_map: typing.Mapping[str,int]
# schema = beam.typehints.schemas.named_tuple_to_schema(Test)
# coder = beam.coders.row_coder.RowCoder(schema)
# print("payload = %s" % schema.SerializeToString())
# examples = (Test(f_map={}),
# Test(f_map={"foo": 9001, "bar": 9223372036854775807}),
# Test(f_map={"everything": None, "is": None, "null!": None, "¯\_(ツ)_/¯": None}))
# for example in examples:
# print("example = %s" % coder.encode(example))
coder:
urn: "beam:coder:row:v1"
# f_map: map<str, nullable int64>
payload: "\n\x15\n\x05f_map\x1a\x0c*\n\n\x02\x10\x07\x12\x04\x08\x01\x10\x04\x12$d8c8f969-14e6-457f-a8b5-62a1aec7f1cd"
# map ordering is non-deterministic
non_deterministic: True
examples:
"\x01\x00\x00\x00\x00\x00": {f_map: {}}
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map: {"foo": 9001, "bar": 9223372036854775807}}
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything":null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
13 changes: 12 additions & 1 deletion model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -855,10 +855,21 @@ message StandardCoders {
// BOOLEAN: beam:coder:bool:v1
// BYTES: beam:coder:bytes:v1
// ArrayType: beam:coder:iterable:v1 (always has a known length)
// MapType: not yet a standard coder (BEAM-7996)
// MapType: not a standard coder, specification defined below.
// RowType: beam:coder:row:v1
// LogicalType: Uses the coder for its representation.
//
// The MapType is encoded by:
// - An INT32 representing the size of the map (N)
// - Followed by N interleaved keys and values, encoded with their
// corresponding coder.
//
// Nullable types in container types (ArrayType, MapType) are encoded by:
// - A one byte null indicator, 0x00 for null values, or 0x01 for present
// values.
// - For present values the null indicator is followed by the value
// encoded with it's corresponding coder.
//
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @lostluck - previously the encoding for MapType and for nullable types within containers was not documented.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack! Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just the encoding of a nullable type? (Why does it have to be called out specially?)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For maps specifically, do we want to allow null keys? Is it valuable to have null values (as distinct from just not present)? I might lean towards disallowing nulls and then possibly allowing it in the future if we have good reason to, which will be forward compatible.

Copy link
Copy Markdown
Member Author

@TheNeuralBit TheNeuralBit Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specifically for nullable types that are elements of an Array or keys/values of a Map. For rows we encode nulls in a separate bitmask:

// - A byte array representing a packed bitset indicating null fields (a
// 1 indicating a null) encoded with beam:coder:bytes:v1. The unused
// bits in the last byte must be set to 0. If there are no nulls an
// empty byte array is encoded.
// The two-byte bitset (not including the lenghth-prefix) for the row
// [NULL, 0, 0, 0, NULL, 0, 0, NULL, 0, NULL] would be
// [0b10010001, 0b00000010]

Java schemas will already let you use nullable types for keys and values in Maps. I doubt anyone is relying on it.. but there's a risk if we disallow it now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the verdict here is to ignore the nullability field of a FieldType when it's nested in a Map or Array?

That's... unfortunate. Must we bend over backwards to maintain compatibility with Java's previous encoding, which wasn't officially a schema encoding until this PR? It seems very strange that we're already relying on unspecified behaviour.

Copy link
Copy Markdown
Contributor

@lostluck lostluck Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this requirement means that all values must be pointers (or reference types) in Go as ordinary primitives cannot be nullable. That feels very strange.

EDIT: Please disregard my last comments, I misread that this only applies to when the field is specified as nullable.
I misunderstood that the discussion is orthogonal to that (whether to allow nullable map components at all.)

Copy link
Copy Markdown
Contributor

@lostluck lostluck Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertwb Your comment fed my own misunderstanding. It is possible to declare a map in schemas as not having null/keys values, but not necessarily on the SDK side.
Technically, there's no reason that the SDK can't use a non-nullable containing version of the container if the Key and Value components are not themselves marked as nillable. IIRC, the Java SDK could converted ImmutableMaps or similar into just non-nullable Key and non-nullable Value types.
The issue as I'm understanding it is that the limitation is on the SDK Language side, rather than the schema specification side, as discussed the schemas fields can individually have their nullable bits set.
Eg. Go doesn't have this ambiguity for map types.
On the other hand, in Go, Iterable/array types which will be represented by slices will have this ambiguity when used as a field, as they can be nil, and could also still be pointers to said reference types. That ambiguity is well known enough that pointers to reference types (maps, slices, chans..) are strongly discouraged in idiomatic Go.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lostluck The issue here is what kinds of objects the SDK must accept when receiving these from a foreign SDK. If nulls are allowed, you cannot let the element type be (say) a map[str, int] in go.

It looks like there is a way to specify in the schema whether the keys/values could be null (or explicitly disallow it for either or both). The encoding, on the other hand, always has this leading bit set (regardless of whether the schema allows null values), but is harder to change for backwards compatibility reasons. If this is the case, I'm OK with that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, we set this prefix depending on the bit in the schema. All is good.

// The payload for RowCoder is an instance of Schema.
// Components: None
// Experimental.
Expand Down
3 changes: 3 additions & 0 deletions model/pipeline/src/main/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
// ** Experimental **
// Protocol Buffers describing Beam Schemas, a portable representation for
// complex types.
//
// The primary application of Schema is as the payload for the standard coder
// "beam:coder:row:v1", defined in beam_runner_api.proto

syntax = "proto3";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -43,6 +42,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -340,6 +340,10 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co
}

private static Object parseField(Object value, Schema.FieldType fieldType) {
if (value == null) {
return null;
}

switch (fieldType.getTypeName()) {
case BYTE:
return ((Number) value).byteValue();
Expand All @@ -366,14 +370,18 @@ private static Object parseField(Object value, Schema.FieldType fieldType) {
.map((element) -> parseField(element, fieldType.getCollectionElementType()))
.collect(toImmutableList());
case MAP:
Map<Object, Object> kvMap = (Map<Object, Object>) value;
return kvMap.entrySet().stream()
.collect(
toImmutableMap(
(pair) -> parseField(pair.getKey(), fieldType.getMapKeyType()),
(pair) -> parseField(pair.getValue(), fieldType.getMapValueType())));
Map<Object, Object> kvMap = new HashMap<>();
((Map<Object, Object>) value)
.entrySet().stream()
.forEach(
(entry) ->
kvMap.put(
parseField(entry.getKey(), fieldType.getMapKeyType()),
parseField(entry.getValue(), fieldType.getMapValueType())));
return kvMap;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is necessary because ImmutableMap (as well as Collectors.toMap) does not allow null values, so it errors on the new test cases.

case ROW:
Map<String, Object> rowMap = (Map<String, Object>) value;
// Clone map so we don't mutate the underlying value
Map<String, Object> rowMap = new HashMap<>((Map<String, Object>) value);
Schema schema = fieldType.getRowSchema();
Row.Builder row = Row.withSchema(schema);
for (Schema.Field field : schema.getFields()) {
Expand Down
86 changes: 86 additions & 0 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,92 @@ def estimate_size(self, unused_value, nested=False):
return 1


class MapCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Note this implementation always uses nested context when encoding keys
and values. This differs from Java's MapCoder, which uses
nested=False if possible for the last value encoded.

This difference is acceptable because MapCoder is not standard. It is only
used in a standard context by RowCoder which always uses nested context for
attribute values.

A coder for typing.Mapping objects."""
def __init__(
self,
key_coder, # type: CoderImpl
value_coder # type: CoderImpl
):
self._key_coder = key_coder
self._value_coder = value_coder

def encode_to_stream(self, dict_value, out, nested):
out.write_bigendian_int32(len(dict_value))
for key, value in dict_value.items():
# Note this implementation always uses nested context when encoding keys
# and values which differs from Java. See note in docstring.
self._key_coder.encode_to_stream(key, out, True)
self._value_coder.encode_to_stream(value, out, True)

def decode_from_stream(self, in_stream, nested):
size = in_stream.read_bigendian_int32()
result = {}
for _ in range(size):
# Note this implementation always uses nested context when encoding keys
# and values which differs from Java. See note in docstring.
key = self._key_coder.decode_from_stream(in_stream, True)
value = self._value_coder.decode_from_stream(in_stream, True)
result[key] = value

return result

def estimate_size(self, unused_value, nested=False):
estimate = 4 # 4 bytes for int32 size prefix
for key, value in unused_value.items():
estimate += self._key_coder.estimate_size(key, True)
estimate += self._value_coder.estimate_size(value, True)
return estimate


class NullableCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for typing.Optional objects."""

ENCODE_NULL = 0
ENCODE_PRESENT = 1

def __init__(
self,
value_coder # type: CoderImpl
):
self._value_coder = value_coder

def encode_to_stream(self, value, out, nested):
if value is None:
out.write_byte(self.ENCODE_NULL)
else:
out.write_byte(self.ENCODE_PRESENT)
self._value_coder.encode_to_stream(value, out, nested)

def decode_from_stream(self, in_stream, nested):
null_indicator = in_stream.read_byte()
if null_indicator == self.ENCODE_NULL:
return None
elif null_indicator == self.ENCODE_PRESENT:
return self._value_coder.decode_from_stream(in_stream, nested)
else:
raise ValueError(
"Encountered unexpected value for null indicator: '%s'" %
null_indicator)

def estimate_size(self, unused_value, nested=False):
return 1 + (
self._value_coder.estimate_size(unused_value)
if unused_value is not None else 0)


class FloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
def encode_to_stream(self, value, out, nested):
Expand Down
53 changes: 53 additions & 0 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
'FastPrimitivesCoder',
'FloatCoder',
'IterableCoder',
'MapCoder',
'NullableCoder',
'PickleCoder',
'ProtoCoder',
'SingletonCoder',
Expand Down Expand Up @@ -520,6 +522,57 @@ def __hash__(self):
Coder.register_structured_urn(common_urns.coders.BOOL.urn, BooleanCoder)


class MapCoder(FastCoder):
def __init__(self, key_coder, value_coder):
# type: (Coder, Coder) -> None
self._key_coder = key_coder
self._value_coder = value_coder

def _create_impl(self):
return coder_impl.MapCoderImpl(
self._key_coder.get_impl(), self._value_coder.get_impl())

def to_type_hint(self):
return typehints.Dict[self._key_coder.to_type_hint(),
self._value_coder.to_type_hint()]

def is_deterministic(self):
# () -> bool
# Map ordering is non-deterministic
return False

def __eq__(self, other):
return (
type(self) == type(other) and self._key_coder == other._key_coder and
self._value_coder == other._value_coder)

def __hash__(self):
return hash(type(self)) + hash(self._key_coder) + hash(self._value_coder)


class NullableCoder(FastCoder):
def __init__(self, value_coder):
# type: (Coder) -> None
self._value_coder = value_coder

def _create_impl(self):
return coder_impl.NullableCoderImpl(self._value_coder.get_impl())

def to_type_hint(self):
return typehints.Optional[self._value_coder.to_type_hint()]

def is_deterministic(self):
# () -> bool
return self._value_coder.is_deterministic()

def __eq__(self, other):
return (
type(self) == type(other) and self._value_coder == other._value_coder)

def __hash__(self):
return hash(type(self)) + hash(self._value_coder)


class VarIntCoder(FastCoder):
"""Variable-length integer coder."""
def _create_impl(self):
Expand Down
14 changes: 12 additions & 2 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def tearDownClass(cls):
coders.ToBytesCoder
])
cls.seen_nested -= set([coders.ProtoCoder, CustomCoder])
assert not standard - cls.seen
assert not cls.seen_nested - standard
assert not standard - cls.seen, str(standard - cls.seen)
assert not cls.seen_nested - standard, str(cls.seen_nested - standard)

@classmethod
def _observe(cls, coder):
Expand Down Expand Up @@ -560,6 +560,16 @@ def iterable_state_read(token, element_coder_impl):
context=context,
test_size_estimation=False)

def test_nullable_coder(self):
self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64)

def test_map_coder(self):
self.check_coder(
coders.MapCoder(coders.VarIntCoder(), coders.StrUtf8Coder()), {
1: "one", 300: "three hundred"
}, {}, {i: str(i)
for i in range(5000)})


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading