Skip to content

Commit d2fb942

Browse files
scwhittlepabloem
andauthored
Merge pull request #17794 from [#21252] Enforce pubsub message publishing limits in the python SDK
* [#21252] Enforce pubsub message publishing limits in the python SDK This improves visiblity as otherwise messages can become stuck publishing inside the dataflow runner. This also allows users to handle errors by catching them. Fixes issue #21252 for python sdk. * fixtest Co-authored-by: Pablo <pabloem@users.noreply.github.com>
1 parent 8a779c0 commit d2fb942

File tree

2 files changed

+57
-10
lines changed

2 files changed

+57
-10
lines changed

sdks/python/apache_beam/io/gcp/pubsub.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ def _from_proto_str(proto_msg):
135135
def _to_proto_str(self, for_publish=False):
136136
"""Get serialized form of ``PubsubMessage``.
137137
138+
The serialized message is validated against pubsub message limits specified
139+
at https://cloud.google.com/pubsub/quotas#resource_limits
140+
138141
Args:
139142
proto_msg: str containing a serialized protobuf.
140143
for_publish: bool, if True strip out message fields which cannot be
@@ -147,15 +150,39 @@ def _to_proto_str(self, for_publish=False):
147150
containing the payload of this object.
148151
"""
149152
msg = pubsub.types.PubsubMessage()
153+
if len(self.data) > (10 << 20):
154+
raise ValueError('A pubsub message data field must not exceed 10MB')
150155
msg.data = self.data
151-
for key, value in self.attributes.items():
152-
msg.attributes[key] = value
153-
if self.message_id and not for_publish:
154-
msg.message_id = self.message_id
155-
if self.publish_time and not for_publish:
156-
msg.publish_time = self.publish_time
156+
157+
if self.attributes:
158+
if len(self.attributes) > 100:
159+
raise ValueError(
160+
'A pubsub message must not have more than 100 attributes.')
161+
for key, value in self.attributes.items():
162+
if len(key) > 256:
163+
raise ValueError(
164+
'A pubsub message attribute key must not exceed 256 bytes.')
165+
if len(value) > 1024:
166+
raise ValueError(
167+
'A pubsub message attribute value must not exceed 1024 bytes')
168+
msg.attributes[key] = value
169+
170+
if not for_publish:
171+
if self.message_id:
172+
msg.message_id = self.message_id
173+
if self.publish_time:
174+
msg.publish_time = self.publish_time
175+
176+
if len(self.ordering_key) > 1024:
177+
raise ValueError(
178+
'A pubsub message ordering key must not exceed 1024 bytes.')
157179
msg.ordering_key = self.ordering_key
158-
return pubsub.types.PubsubMessage.serialize(msg)
180+
181+
serialized = pubsub.types.PubsubMessage.serialize(msg)
182+
if len(serialized) > (10 << 20):
183+
raise ValueError(
184+
'Serialized pubsub message exceeds the publish request limit of 10MB')
185+
return serialized
159186

160187
@staticmethod
161188
def _from_message(msg):
@@ -340,9 +367,8 @@ def message_to_proto_str(element):
340367
@staticmethod
341368
def bytes_to_proto_str(element):
342369
# type: (bytes) -> bytes
343-
msg = pubsub.types.PubsubMessage()
344-
msg.data = element
345-
return pubsub.types.PubsubMessage.serialize(msg)
370+
msg = PubsubMessage(element, {})
371+
return msg._to_proto_str(for_publish=True)
346372

347373
def expand(self, pcoll):
348374
if self.with_attributes:

sdks/python/apache_beam/io/gcp/pubsub_test.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,27 @@ def test_proto_conversion(self):
8787
self.assertEqual(m_converted.data, data)
8888
self.assertEqual(m_converted.attributes, attributes)
8989

90+
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
91+
def test_payload_publish_invalid(self):
92+
with self.assertRaisesRegex(ValueError, r'data field.*10MB'):
93+
msg = PubsubMessage(b'0' * 1024 * 1024 * 11, None)
94+
msg._to_proto_str(for_publish=True)
95+
with self.assertRaisesRegex(ValueError, 'attribute key'):
96+
msg = PubsubMessage(b'0', {'0' * 257: '0'})
97+
msg._to_proto_str(for_publish=True)
98+
with self.assertRaisesRegex(ValueError, 'attribute value'):
99+
msg = PubsubMessage(b'0', {'0' * 100: '0' * 1025})
100+
msg._to_proto_str(for_publish=True)
101+
with self.assertRaisesRegex(ValueError, '100 attributes'):
102+
attributes = {}
103+
for i in range(0, 101):
104+
attributes[str(i)] = str(i)
105+
msg = PubsubMessage(b'0', attributes)
106+
msg._to_proto_str(for_publish=True)
107+
with self.assertRaisesRegex(ValueError, 'ordering key'):
108+
msg = PubsubMessage(b'0', None, ordering_key='0' * 1301)
109+
msg._to_proto_str(for_publish=True)
110+
90111
def test_eq(self):
91112
a = PubsubMessage(b'abc', {1: 2, 3: 4})
92113
b = PubsubMessage(b'abc', {1: 2, 3: 4})

0 commit comments

Comments
 (0)