Skip to content

Fix WriteToPubSub to pass ordering_key to publish() method#37345

Open
nikitagrover19 wants to merge 6 commits intoapache:masterfrom
nikitagrover19:fix-pubsub-ordering-key
Open

Fix WriteToPubSub to pass ordering_key to publish() method#37345
nikitagrover19 wants to merge 6 commits intoapache:masterfrom
nikitagrover19:fix-pubsub-ordering-key

Conversation

@nikitagrover19
Copy link
Copy Markdown

Fixes #36201

This PR fixes the WriteToPubSub transform in the Python SDK to properly pass the ordering_key parameter to the Google Cloud Pub/Sub publish() method.

The Problem
While the ordering_key field was correctly serialized into the PubsubMessage protobuf, it was not being extracted and passed to PublisherClient.publish() in the _PubSubWriteDoFn._flush() method. This caused message ordering to not work even when users explicitly specified an ordering_key.

The Solution:
Modified _PubSubWriteDoFn._flush() to:

  • Extract ordering_key from the deserialized PubsubMessage
  • Pass it to publish() in both code paths (with attributes and data-only)
  • Handle empty/missing ordering_key for backward compatibility

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @nikitagrover19, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical bug in the Apache Beam Python SDK's WriteToPubSub transform, which previously failed to correctly utilize the ordering_key for Google Cloud Pub/Sub messages. The fix ensures that messages published through Beam pipelines respect the specified ordering, which is crucial for applications requiring strict message sequence. The changes involve targeted modifications to the publishing logic and the addition of robust test cases to confirm the functionality and prevent regressions.

Highlights

  • Fix for Pub/Sub Ordering Key: The WriteToPubSub transform in the Python SDK now correctly passes the ordering_key to the Google Cloud Pub/Sub publish() method, resolving an issue where message ordering was not working as expected.
  • _PubSubWriteDoFn._flush() Modification: The _flush() method has been updated to extract the ordering_key from the deserialized PubsubMessage and include it in the publish() call, ensuring proper handling for both attribute-rich and data-only messages.
  • Backward Compatibility: The implementation includes logic to gracefully handle cases where ordering_key is empty or missing, maintaining backward compatibility for existing pipelines.
  • Comprehensive Testing: New integration and unit tests have been added to validate the correct propagation and usage of the ordering_key in various scenarios, including with and without attributes, and to ensure backward compatibility.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@nikitagrover19
Copy link
Copy Markdown
Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly adds support for ordering_key in WriteToPubSub. The changes to the _flush method are functional, but I've suggested a refactoring to improve code clarity and maintainability. I've also added suggestions to strengthen the new unit and integration tests by making the assertions more specific and robust. Overall, this is a good fix.

Comment on lines 662 to +676
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **pubsub_msg.attributes)
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key
if pubsub_msg.ordering_key else '',
**pubsub_msg.attributes)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
if pubsub_msg.ordering_key:
future = self._pub_client.publish(
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to improve readability and reduce code duplication. You can determine the attributes and ordering_key to use and then have a single call to self._pub_client.publish. This makes the code easier to maintain.

      attributes = {}
      if self.with_attributes and pubsub_msg.attributes:
        attributes = pubsub_msg.attributes

      if pubsub_msg.ordering_key:
        future = self._pub_client.publish(
            self._topic,
            pubsub_msg.data,
            ordering_key=pubsub_msg.ordering_key,
            **attributes)
      else:
        future = self._pub_client.publish(
            self._topic, pubsub_msg.data, **attributes)

Comment on lines +345 to +351
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current verification for ordering_key only checks for its presence. To make the test more robust, you should assert that the ordering_key has the expected value for each message.

Additionally, you can improve efficiency by acknowledging all messages in a single batch request after pulling them, rather than one by one inside the loop.

Suggested change
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
received_map = {msg.message.data: msg for msg in response.received_messages}
self.assertEqual(received_map[b'order_data001'].message.ordering_key, 'key1')
self.assertEqual(received_map[b'order_data002'].message.ordering_key, 'key1')
self.assertEqual(received_map[b'order_data003'].message.ordering_key, 'key2')
ack_ids = [msg.ack_id for msg in response.received_messages]
if ack_ids:
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": ack_ids,
})

Comment on lines +1162 to +1163
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make this test more robust, you should also verify that ordering_key is not passed to the publish method when no ordering key is provided in the PubsubMessage. You can do this by checking the call_args of the mock.

Suggested change
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args
self.assertNotIn('ordering_key', call_args.kwargs)

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@nikitagrover19 nikitagrover19 marked this pull request as draft January 19, 2026 16:50
@nikitagrover19 nikitagrover19 marked this pull request as ready for review January 20, 2026 11:32
@nikitagrover19
Copy link
Copy Markdown
Author

FYI: CI failures look flaky (PortableRunner gRPC DEADLINE_EXCEEDED / Socket closed).
I ran these locally and they pass:

  • apache_beam/yaml/yaml_transform_unit_test.py::ExpandPipelineTest::test_expand_pipeline_with_pipeline_key_only
  • apache_beam/dataframe/convert_test.py
    • ConvertTest::test_convert
    • ConvertTest::test_convert_memoization_clears_cache
    • (full file run also passes, including with --runner=PortableRunner)

@damccorm
Copy link
Copy Markdown
Contributor

assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @tvalentyn for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 3, 2026

Reminder, please take a look at this pr: @tvalentyn

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly adds support for ordering_key when writing to Pub/Sub, which is a valuable fix. The changes are logical and include new unit and integration tests to cover the new functionality. I have two suggestions for improvement: one to refactor the publishing logic in pubsub.py for better readability and maintainability, and another to strengthen the assertions in the new integration test to make it more robust.

Comment on lines 662 to +676
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **pubsub_msg.attributes)
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key
if pubsub_msg.ordering_key else '',
**pubsub_msg.attributes)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
if pubsub_msg.ordering_key:
future = self._pub_client.publish(
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to reduce code duplication and improve readability. Instead of separate code paths for messages with and without attributes, you can build a dictionary of keyword arguments for the publish call. This makes the code cleaner and easier to maintain.

      publish_kwargs = {}
      if self.with_attributes and pubsub_msg.attributes:
        publish_kwargs.update(pubsub_msg.attributes)

      if pubsub_msg.ordering_key:
        publish_kwargs['ordering_key'] = pubsub_msg.ordering_key

      future = self._pub_client.publish(
          self._topic, pubsub_msg.data, **publish_kwargs)

Comment on lines +345 to +351
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion in this test is quite weak. It only checks for the presence of the ordering_key attribute on the message object using dir(), but doesn't verify its value. A more robust test would be to check that the received messages have the correct ordering keys and attributes corresponding to the messages that were sent. Also, acknowledging messages one by one in a loop is inefficient; it's better to collect all ack_ids and acknowledge them in a single call.

Suggested change
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
# Verify ordering keys and attributes were preserved
received_msgs_map = {
msg.message.data: msg.message
for msg in response.received_messages
}
for sent_msg in test_messages:
self.assertIn(sent_msg.data, received_msgs_map)
received_msg = received_msgs_map[sent_msg.data]
self.assertEqual(received_msg.ordering_key, sent_msg.ordering_key)
self.assertEqual(dict(received_msg.attributes), sent_msg.attributes)
# Acknowledge all messages at once for efficiency
ack_ids = [msg.ack_id for msg in response.received_messages]
if ack_ids:
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": ack_ids,
})

@github-actions
Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @tvalentyn

@tvalentyn
Copy link
Copy Markdown
Contributor

responded on the issue

@nikitagrover19 nikitagrover19 force-pushed the fix-pubsub-ordering-key branch from 4f5e0e9 to 34d536d Compare March 25, 2026 14:55
@nikitagrover19
Copy link
Copy Markdown
Author

@tvalentyn

Following up on our earlier discussion - since there wasn’t further feedback on the issue, I went ahead and implemented the XLang-based approach you suggested.

The latest commit reflects this, wiring publish_with_ordering_key through the Python external transform to Java PubSub IO, along with a couple of related fixes discovered during testing (including pubsubRootUrl handling and a with_attributes issue).

The PR currently also retains the direct _flush fix in the Python SDK for completeness. Happy to scope this down if you'd prefer focusing solely on the XLang approach.

Would appreciate your feedback on whether this direction looks good.

getTimestampAttribute(),
null,
c.getPipelineOptions().as(PubsubOptions.class),
Write.this.getPubsubRootUrl());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my education, why was this necessary?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found it:

PubsubIO.java: fixed a pre-existing bug where PubsubBoundedWriter.startBundle ignored the transform-level pubsubRootUrl, always falling back to pipeline options

Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if Write.this.getPubsubRootUrl somehow returned Null, would newClient still initialize with the default (as per pipeline options)?

Copy link
Copy Markdown
Author

@nikitagrover19 nikitagrover19 Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both PubsubJsonClient and PubsubGrpcClient use MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl()) - so if Write.this.getPubsubRootUrl() returns null, it falls back to options.getPubsubRootUrl() which has @Default.String("https://pubsub.googleapis.com") . So yes, the default is preserved correctly in the null case. No change needed here unless you'd prefer an explicit null guard for clarity?

message with the given name and the message's publish time as the value.
publish_with_ordering_key: If True, enables ordering key support when
publishing messages. The ordering key must be set on each
PubsubMessage via the ``ordering_key`` attribute. Requires
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires
messages to be routed to the same region.

What does this mean?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sentence was inaccurate - it was leftover from an earlier draft and doesn't apply here. Will remove it.

def expand(self, pvalue):
if self.with_attributes:
pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
pcoll = pvalue | 'ToProto' >> Map(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: Map(pubsub.WriteToPubSub.to_proto_str) - was this a typo that was meant to be Map(pubsub.WriteToPubSub.message_to_proto_str) ? The latter seems to have some type checking which probably wouldn't hurt.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, message_to_proto_str is the right choice - it has proper type checking. The original to_proto_str reference was a pre-existing bug (the method doesn't exist on WriteToPubSub), which is why with_attributes=True was broken before this PR. Will switch to message_to_proto_str.

"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)

@pytest.mark.it_postcommit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will work only in direct runner, but not in dataflow runner - correct?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also wondering, if we can have a warning for Dataflow that that advises users to use xlang version if they wish to use ordering key. IIRC we have something like that in Java SDK?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - the _flush fix in _PubSubWriteDoFn only applies to Direct Runner since Dataflow overrides this with its own implementation. Will add a comment to the integration test clarifying this scope. For the warning - I'll add a logging.warning in _PubSubWriteDoFn when ordering_key is set on any message, advising Dataflow users to use the WriteToPubSub XLang path instead. Does that approach sound right, or would you prefer it at pipeline construction time (e.g., in WriteToPubSub.expand())?

@github-actions github-actions bot added java and removed java labels Mar 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Beam Python PubSubIO Does Not Support Ordered Key Publishing

3 participants