Skip to content

Add hook for amazonmsk#69000

Draft
arnoldmr01 wants to merge 3 commits into
apache:mainfrom
arnoldmr01:add-hook-for-amazonmsk
Draft

Add hook for amazonmsk#69000
arnoldmr01 wants to merge 3 commits into
apache:mainfrom
arnoldmr01:add-hook-for-amazonmsk

Conversation

@arnoldmr01

@arnoldmr01 arnoldmr01 commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

What is this PR for?

Integrate Amazon MSK for user to have variety way to implement Event-Driven Scheduling feature which has been introduced in Airflow 3.0
related: #52712

What is changed in this PR?

This PR only add Amazon MSK Hook first, coving the following functions:

  • Create cluster
  • Get cluster
  • Update cluster configuration
  • Delete cluster
  • List cluster
  • CRUD topic

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
  • Codex

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@arnoldmr01 arnoldmr01 marked this pull request as ready for review June 30, 2026 07:32
@arnoldmr01 arnoldmr01 requested a review from o-nikolas as a code owner June 30, 2026 07:32

@o-nikolas o-nikolas left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the methods look like single line wrappers around the boto API. It seems like we don't have much of a usecase for this? Curious to hear what @vincbeck and @ferruzzi have to say about this one.

# in the `devel-dependencies` section to be the same minimum version.
"boto3>=1.41.0",
"botocore>=1.41.0",
"aws-msk-iam-sasl-signer-python>=1.0.2",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of this is optional in the code as you've written it, but it's included as a mandatory dependency here. Let's move this below to be an optional dependency

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will modify it later

@o-nikolas o-nikolas left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd probably have to bump the botocore minimum since these are new APIs?

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

uv.lock on main just moved via #68933 ("[main] Upgrade important CI environment"), commit e0a1726 and this PR currently conflicts.

Quickest fix:

git fetch upstream main && git rebase upstream/main
rm uv.lock && uv lock
git add uv.lock && git rebase --continue
git push --force-with-lease

Automated nudge — ignore if you're not ready to rebase. This comment is updated in place on future uv.lock bumps.

@arnoldmr01 arnoldmr01 marked this pull request as draft July 1, 2026 03:44
@ferruzzi

ferruzzi commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Agreed with Niko. Looks like the hooks are generally replacing one-liners with functions, which we generally avoid.

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM overall.

Most of the methods look like single line wrappers around the boto API. It seems like we don't have much of a usecase for this?

I agreed with this point as well. We could make the MSK Hook itself just the interface to retrieve the authenticated MSK client with given conn_id.

Same as the AwaitMessageTrigger (the Kafka concusmer in Airlfow trigger) implementation, we will leverage the SDK client that retrieved via connection directly without additional wrapper.

async def run(self):
consumer_hook = KafkaConsumerHook(topics=self.topics, kafka_config_id=self.kafka_config_id)
async_get_consumer = sync_to_async(consumer_hook.get_consumer)
self._consumer = await async_get_consumer()
async_poll = sync_to_async(self._consumer.poll)
async_commit = sync_to_async(self._consumer.commit)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants