Skip to content

Commit 31fcbc1

Browse files
authored
feat: add on_assign, on_revoke, on_lost callbacks for Confluent subscriber (#2789)
Add rebalance callback parameters to the Confluent subscriber, allowing users to specify on_assign, on_revoke, and on_lost callbacks that are passed through to confluent_kafka.Consumer.subscribe(). This enables monitoring partition assignment/revocation events without monkey-patching AsyncConfluentConsumer. Fixes #1676
1 parent 446b58c commit 31fcbc1

File tree

4 files changed

+85
-2
lines changed

4 files changed

+85
-2
lines changed

faststream/confluent/broker/registrator.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from collections.abc import Iterable, Sequence
1+
from collections.abc import Callable, Iterable, Sequence
22
from typing import (
33
TYPE_CHECKING,
44
Annotated,
@@ -78,6 +78,10 @@ def subscriber(
7878
"read_uncommitted",
7979
"read_committed",
8080
] = "read_uncommitted",
81+
# rebalance callbacks
82+
on_assign: Callable[..., None] | None = None,
83+
on_revoke: Callable[..., None] | None = None,
84+
on_lost: Callable[..., None] | None = None,
8185
batch: Literal[False] = False,
8286
max_records: int | None = None,
8387
# broker args
@@ -138,6 +142,10 @@ def subscriber(
138142
"read_uncommitted",
139143
"read_committed",
140144
] = "read_uncommitted",
145+
# rebalance callbacks
146+
on_assign: Callable[..., None] | None = None,
147+
on_revoke: Callable[..., None] | None = None,
148+
on_lost: Callable[..., None] | None = None,
141149
batch: Literal[True] = ...,
142150
max_records: int | None = None,
143151
# broker args
@@ -198,6 +206,10 @@ def subscriber(
198206
"read_uncommitted",
199207
"read_committed",
200208
] = "read_uncommitted",
209+
# rebalance callbacks
210+
on_assign: Callable[..., None] | None = None,
211+
on_revoke: Callable[..., None] | None = None,
212+
on_lost: Callable[..., None] | None = None,
201213
batch: Literal[False] = False,
202214
max_records: int | None = None,
203215
# broker args
@@ -258,6 +270,10 @@ def subscriber(
258270
"read_uncommitted",
259271
"read_committed",
260272
] = "read_uncommitted",
273+
# rebalance callbacks
274+
on_assign: Callable[..., None] | None = None,
275+
on_revoke: Callable[..., None] | None = None,
276+
on_lost: Callable[..., None] | None = None,
261277
batch: bool = False,
262278
max_records: int | None = None,
263279
# broker args
@@ -322,6 +338,10 @@ def subscriber(
322338
"read_uncommitted",
323339
"read_committed",
324340
] = "read_uncommitted",
341+
# rebalance callbacks
342+
on_assign: Callable[..., None] | None = None,
343+
on_revoke: Callable[..., None] | None = None,
344+
on_lost: Callable[..., None] | None = None,
325345
batch: bool = False,
326346
max_records: int | None = None,
327347
# broker args
@@ -464,6 +484,12 @@ def subscriber(
464484
return the ALSO. See method docs below.
465485
batch: Whether to consume messages in batches or not.
466486
max_records: Number of messages to consume as one batch.
487+
on_assign: Callback called when partitions are assigned to the consumer
488+
during a rebalance. Receives ``(consumer, partitions)`` arguments.
489+
on_revoke: Callback called when partitions are revoked from the consumer
490+
during a rebalance. Receives ``(consumer, partitions)`` arguments.
491+
on_lost: Callback called when partitions are lost (e.g., due to session
492+
timeout). Receives ``(consumer, partitions)`` arguments.
467493
dependencies: Dependencies list (`[Dependant(),]`) to apply to the subscriber.
468494
parser: Parser to map original **Message** object to FastStream one.
469495
decoder: Function to decode FastStream msg bytes body to python objects.
@@ -506,6 +532,9 @@ def subscriber(
506532
"session_timeout_ms": session_timeout_ms,
507533
"heartbeat_interval_ms": heartbeat_interval_ms,
508534
"isolation_level": isolation_level,
535+
"on_assign": on_assign,
536+
"on_revoke": on_revoke,
537+
"on_lost": on_lost,
509538
},
510539
auto_commit=auto_commit,
511540
# subscriber args

faststream/confluent/broker/router.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ def __init__(
135135
"read_uncommitted",
136136
"read_committed",
137137
] = "read_uncommitted",
138+
# rebalance callbacks
139+
on_assign: Callable[..., None] | None = None,
140+
on_revoke: Callable[..., None] | None = None,
141+
on_lost: Callable[..., None] | None = None,
138142
batch: bool = False,
139143
max_records: int | None = None,
140144
# broker args
@@ -274,6 +278,9 @@ def __init__(
274278
return the ALSO. See method docs below.
275279
batch: Whether to consume messages in batches or not.
276280
max_records: Number of messages to consume as one batch.
281+
on_assign: Callback called when partitions are assigned to the consumer.
282+
on_revoke: Callback called when partitions are revoked from the consumer.
283+
on_lost: Callback called when partitions are lost.
277284
dependencies: Dependencies list (`[Dependant(),]`) to apply to the subscriber.
278285
parser: Parser to map original **Message** object to FastStream one.
279286
decoder: Function to decode FastStream msg bytes body to python objects.
@@ -309,6 +316,9 @@ def __init__(
309316
session_timeout_ms=session_timeout_ms,
310317
heartbeat_interval_ms=heartbeat_interval_ms,
311318
isolation_level=isolation_level,
319+
on_assign=on_assign,
320+
on_revoke=on_revoke,
321+
on_lost=on_lost,
312322
max_records=max_records,
313323
batch=batch,
314324
# basic args

faststream/confluent/fastapi/fastapi.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,10 @@ def subscriber(
430430
"read_uncommitted",
431431
"read_committed",
432432
] = "read_uncommitted",
433+
# rebalance callbacks
434+
on_assign: Callable[..., None] | None = None,
435+
on_revoke: Callable[..., None] | None = None,
436+
on_lost: Callable[..., None] | None = None,
433437
batch: Literal[False] = False,
434438
max_records: int | None = None,
435439
# broker args
@@ -497,6 +501,10 @@ def subscriber(
497501
"read_uncommitted",
498502
"read_committed",
499503
] = "read_uncommitted",
504+
# rebalance callbacks
505+
on_assign: Callable[..., None] | None = None,
506+
on_revoke: Callable[..., None] | None = None,
507+
on_lost: Callable[..., None] | None = None,
500508
batch: Literal[False] = False,
501509
max_records: int | None = None,
502510
# broker args
@@ -564,6 +572,10 @@ def subscriber(
564572
"read_uncommitted",
565573
"read_committed",
566574
] = "read_uncommitted",
575+
# rebalance callbacks
576+
on_assign: Callable[..., None] | None = None,
577+
on_revoke: Callable[..., None] | None = None,
578+
on_lost: Callable[..., None] | None = None,
567579
batch: Literal[True] = ...,
568580
max_records: int | None = None,
569581
# broker args
@@ -622,6 +634,10 @@ def subscriber(
622634
"read_uncommitted",
623635
"read_committed",
624636
] = "read_uncommitted",
637+
# rebalance callbacks
638+
on_assign: Callable[..., None] | None = None,
639+
on_revoke: Callable[..., None] | None = None,
640+
on_lost: Callable[..., None] | None = None,
625641
batch: bool = False,
626642
max_records: int | None = None,
627643
# broker args
@@ -693,6 +709,10 @@ def subscriber(
693709
"read_uncommitted",
694710
"read_committed",
695711
] = "read_uncommitted",
712+
# rebalance callbacks
713+
on_assign: Callable[..., None] | None = None,
714+
on_revoke: Callable[..., None] | None = None,
715+
on_lost: Callable[..., None] | None = None,
696716
batch: bool = False,
697717
max_records: int | None = None,
698718
# broker args
@@ -838,6 +858,12 @@ def subscriber(
838858
return the ALSO. See method docs below.
839859
batch: Whether to consume messages in batches or not.
840860
max_records: Number of messages to consume as one batch.
861+
on_assign: Callback called when partitions are assigned to the consumer
862+
during a rebalance. Receives ``(consumer, partitions)`` arguments.
863+
on_revoke: Callback called when partitions are revoked from the consumer
864+
during a rebalance. Receives ``(consumer, partitions)`` arguments.
865+
on_lost: Callback called when partitions are lost (e.g., due to session
866+
timeout). Receives ``(consumer, partitions)`` arguments.
841867
dependencies: Dependencies list (`[Dependant(),]`) to apply to the subscriber.
842868
parser: Parser to map original **Message** object to FastStream one.
843869
decoder: Function to decode FastStream msg bytes body to python objects.
@@ -950,6 +976,9 @@ def subscriber(
950976
session_timeout_ms=session_timeout_ms,
951977
heartbeat_interval_ms=heartbeat_interval_ms,
952978
isolation_level=isolation_level,
979+
on_assign=on_assign,
980+
on_revoke=on_revoke,
981+
on_lost=on_lost,
953982
batch=batch,
954983
max_records=max_records,
955984
# broker args

faststream/confluent/helpers/client.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,18 @@ def __init__(
215215
connections_max_idle_ms: int = 540000,
216216
isolation_level: str = "read_uncommitted",
217217
allow_auto_create_topics: bool = True,
218+
# rebalance callbacks
219+
on_assign: Callable[..., None] | None = None,
220+
on_revoke: Callable[..., None] | None = None,
221+
on_lost: Callable[..., None] | None = None,
218222
) -> None:
219223
self.admin_client = admin_service
220224
self.logger_state = logger
221225

226+
self._on_assign = on_assign
227+
self._on_revoke = on_revoke
228+
self._on_lost = on_lost
229+
222230
self.topics = list(topics)
223231
self.partitions = partitions
224232

@@ -292,10 +300,17 @@ async def start(self) -> None:
292300
)
293301

294302
if self.topics:
303+
subscribe_kwargs: dict[str, Any] = {"topics": self.topics}
304+
if self._on_assign is not None:
305+
subscribe_kwargs["on_assign"] = self._on_assign
306+
if self._on_revoke is not None:
307+
subscribe_kwargs["on_revoke"] = self._on_revoke
308+
if self._on_lost is not None:
309+
subscribe_kwargs["on_lost"] = self._on_lost
295310
await run_in_executor(
296311
self._thread_pool,
297312
self.consumer.subscribe,
298-
topics=self.topics,
313+
**subscribe_kwargs,
299314
)
300315

301316
elif self.partitions:

0 commit comments

Comments
 (0)