Skip to content

Commit 7922379

Browse files
committed
fix: bind Kafka producer error handler
1 parent d971ddb commit 7922379

File tree

15 files changed

+221
-100
lines changed

15 files changed

+221
-100
lines changed

Directory.Build.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project>
22
<PropertyGroup Label="Package information">
3-
<BaseVersionSuffix></BaseVersionSuffix>
4-
<BaseVersion>5.0.0$(BaseVersionSuffix)</BaseVersion>
3+
<BaseVersionSuffix>-beta.1</BaseVersionSuffix>
4+
<BaseVersion>5.1.0$(BaseVersionSuffix)</BaseVersion>
55
<DatabasePackagesRevision>1</DatabasePackagesRevision>
66
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
77
<AnalysisMode>All</AnalysisMode>

docs/changelog/500.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@ uid: releases-500
44

55
# Release Notes v5.x.x
66

7+
## [5.1.0](https://github.com/BEagle1984/silverback/releases/tag/v5.1.0)
8+
9+
### Fixes
10+
11+
* Correctly bind the error handler to the Kafka producer to prevent uncontrolled logging to stderr
12+
713
## [5.0.0](https://github.com/BEagle1984/silverback/releases/tag/v5.0.0)
814

915
Silverback 5.0.0 is the most significant update in the library’s history, built on years of user feedback and real-world experience. This release is more than just an upgrade, it’s a **complete refactoring** that enhances flexibility, performance, and maintainability while laying a strong foundation for future innovations.
1016

1117
### Highlights
1218

13-
* An **improved configuration API** that makes it easier to set up Silverback exactly the way you want, with a more intuitive and consistent syntax. The new fluent API is designed to be more readable and straightforward and to giving you more control over the Kafka and MQTT clients that are used under the hood.
19+
* An **improved configuration API** that makes it easier to set up Silverback exactly the way you want, with a more intuitive and consistent syntax. The new fluent API is designed to be more readable and straightforward and to give you more control over the Kafka and MQTT clients that are used under the hood.
1420
```csharp
1521
services.AddSilverback()
1622
.WithConnectionToMessageBroker(options => options.AddKafka())

src/Silverback.Integration.Kafka.Testing/Messaging/Broker/Kafka/MockedConfluentConsumerBuilder.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ public class MockedConfluentConsumerBuilder : IConfluentConsumerBuilder
2626

2727
private Action<IConsumer<byte[]?, byte[]?>, string>? _statisticsHandler;
2828

29-
private Action<IConsumer<byte[]?, byte[]?>, Error>? _errorHandler;
30-
3129
private Func<IConsumer<byte[]?, byte[]?>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? _partitionsAssignedHandler;
3230

3331
private Func<IConsumer<byte[]?, byte[]?>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? _partitionsRevokedHandler;
3432

3533
private Action<IConsumer<byte[]?, byte[]?>, CommittedOffsets>? _offsetsCommittedHandler;
3634

35+
private Action<IConsumer<byte[]?, byte[]?>, Error>? _errorHandler;
36+
3737
/// <summary>
3838
/// Initializes a new instance of the <see cref="MockedConfluentConsumerBuilder" /> class.
3939
/// </summary>
@@ -70,13 +70,6 @@ public IConfluentConsumerBuilder SetStatisticsHandler(Action<IConsumer<byte[]?,
7070
return this;
7171
}
7272

73-
/// <inheritdoc cref="IConfluentConsumerBuilder.SetErrorHandler" />
74-
public IConfluentConsumerBuilder SetErrorHandler(Action<IConsumer<byte[]?, byte[]?>, Error> errorHandler)
75-
{
76-
_errorHandler = errorHandler;
77-
return this;
78-
}
79-
8073
/// <inheritdoc cref="IConfluentConsumerBuilder.SetPartitionsAssignedHandler(Func{IConsumer{byte[],byte[]},List{TopicPartition},IEnumerable{TopicPartitionOffset}})" />
8174
public IConfluentConsumerBuilder SetPartitionsAssignedHandler(
8275
Func<IConsumer<byte[]?, byte[]?>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>
@@ -128,6 +121,13 @@ public IConfluentConsumerBuilder SetOffsetsCommittedHandler(Action<IConsumer<byt
128121
return this;
129122
}
130123

124+
/// <inheritdoc cref="IConfluentConsumerBuilder.SetErrorHandler" />
125+
public IConfluentConsumerBuilder SetErrorHandler(Action<IConsumer<byte[]?, byte[]?>, Error> errorHandler)
126+
{
127+
_errorHandler = errorHandler;
128+
return this;
129+
}
130+
131131
/// <inheritdoc cref="IConfluentConsumerBuilder.SetLogHandler" />
132132
// Not yet implemented / not needed
133133
public IConfluentConsumerBuilder SetLogHandler(Action<IConsumer<byte[]?, byte[]?>, LogMessage> logHandler) => this;

src/Silverback.Integration.Kafka.Testing/Messaging/Broker/Kafka/MockedConfluentProducerBuilder.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ public IConfluentProducerBuilder SetStatisticsHandler(Action<IProducer<byte[]?,
5050
return this;
5151
}
5252

53+
/// <inheritdoc cref="IConfluentProducerBuilder.SetErrorHandler" />
54+
public IConfluentProducerBuilder SetErrorHandler(Action<IProducer<byte[]?, byte[]?>, Error> errorHandler) => this;
55+
5356
/// <inheritdoc cref="IConfluentProducerBuilder.SetLogHandler" />
5457
// Not yet implemented / not needed
5558
public IConfluentProducerBuilder SetLogHandler(Action<IProducer<byte[]?, byte[]?>, LogMessage> logHandler) => this;

src/Silverback.Integration.Kafka/Diagnostics/KafkaLogEvents.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,30 @@ public static class KafkaLogEvents
265265
GetEventId(74, nameof(OffsetSentToTransaction)),
266266
"Offset {Topic}[{Partition}]@{Offset} sent to transaction | ProducerName: {ProducerName}, TransactionalId: {TransactionalId}");
267267

268+
/// <summary>
269+
/// Gets the <see cref="LogEvent" /> representing the log that is written when a non-fatal error is reported
270+
/// by the <see cref="Confluent.Kafka.IProducer{TKey,TValue}" />.
271+
/// </summary>
272+
/// <remarks>
273+
/// Fatal errors are reported with a different event id.
274+
/// </remarks>
275+
public static LogEvent ConfluentProducerError { get; } = new(
276+
LogLevel.Warning,
277+
GetEventId(199, nameof(ConfluentProducerError)),
278+
"Error in Kafka producer: '{ErrorReason}' ({ErrorCode}) | ProducerName: {ProducerName}");
279+
280+
/// <summary>
281+
/// Gets the <see cref="LogEvent" /> representing the log that is written when a fatal error is reported by
282+
/// the <see cref="Confluent.Kafka.IProducer{TKey,TValue}" />.
283+
/// </summary>
284+
/// <remarks>
285+
/// Non-fatal errors are reported with a different event id.
286+
/// </remarks>
287+
public static LogEvent ConfluentProducerFatalError { get; } = new(
288+
LogLevel.Error,
289+
GetEventId(200, nameof(ConfluentProducerFatalError)),
290+
"Fatal error in Kafka producer: '{ErrorReason}' ({ErrorCode}) | ProducerName: {ProducerName}");
291+
268292
/// <summary>
269293
/// Gets the <see cref="LogEvent" /> representing the log that is written when a log event is received from
270294
/// the underlying <see cref="Confluent.Kafka.IProducer{TKey,TValue}" />.

src/Silverback.Integration.Kafka/Diagnostics/KafkaLoggerExtensions.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ internal static class KafkaLoggerExtensions
9292
private static readonly Action<ILogger, string, int, long, string, string?, Exception?> OffsetSentToTransaction =
9393
SilverbackLoggerMessage.Define<string, int, long, string, string?>(KafkaLogEvents.OffsetSentToTransaction);
9494

95+
private static readonly Action<ILogger, string, int, string, Exception?> ConfluentProducerError =
96+
SilverbackLoggerMessage.Define<string, int, string>(KafkaLogEvents.ConfluentProducerError);
97+
98+
private static readonly Action<ILogger, string, int, string, Exception?> ConfluentProducerFatalError =
99+
SilverbackLoggerMessage.Define<string, int, string>(KafkaLogEvents.ConfluentProducerFatalError);
100+
95101
private static readonly Action<ILogger, string, string, string, Exception?> ConfluentProducerLogCritical =
96102
SilverbackLoggerMessage.Define<string, string, string>(KafkaLogEvents.ConfluentProducerLogCritical);
97103

@@ -320,6 +326,32 @@ public static void LogConfluentProducerLogCritical(this ISilverbackLogger logger
320326
ConfluentProducerLogCritical(logger.InnerLogger, logMessage.Level.ToString(), logMessage.Message, producerWrapper.DisplayName, null);
321327
}
322328

329+
public static void LogConfluentProducerError(this ISilverbackLogger logger, Error error, KafkaProducer producer)
330+
{
331+
if (!logger.IsEnabled(KafkaLogEvents.ConfluentProducerError))
332+
return;
333+
334+
ConfluentProducerError(
335+
logger.InnerLogger,
336+
GetErrorReason(error),
337+
(int)error.Code,
338+
producer.DisplayName,
339+
null);
340+
}
341+
342+
public static void LogConfluentProducerFatalError(this ISilverbackLogger logger, Error error, KafkaProducer producer)
343+
{
344+
if (!logger.IsEnabled(KafkaLogEvents.ConfluentProducerFatalError))
345+
return;
346+
347+
ConfluentProducerFatalError(
348+
logger.InnerLogger,
349+
GetErrorReason(error),
350+
(int)error.Code,
351+
producer.DisplayName,
352+
null);
353+
}
354+
323355
public static void LogConfluentProducerLogError(this ISilverbackLogger logger, LogMessage logMessage, IConfluentProducerWrapper producerWrapper)
324356
{
325357
if (!logger.IsEnabled(KafkaLogEvents.ConfluentProducerLogError))

src/Silverback.Integration.Kafka/Messaging/Broker/Callbacks/KafkaEventsBinder.cs

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public static IConfluentProducerBuilder SetEventsHandlers(
2121
ISilverbackLogger logger) =>
2222
producerBuilder
2323
.SetStatisticsHandler((_, statistics) => OnStatistics(statistics, producerWrapper, callbacksInvoker, logger))
24+
.SetErrorHandler((_, error) => OnError(error, producerWrapper, logger))
2425
.SetLogHandler((_, logMessage) => OnLog(logMessage, producerWrapper, callbacksInvoker, logger));
2526

2627
public static IConfluentConsumerBuilder SetEventsHandlers(
@@ -32,12 +33,11 @@ public static IConfluentConsumerBuilder SetEventsHandlers(
3233
{
3334
consumerBuilder
3435
.SetStatisticsHandler((_, statistics) => OnStatistics(statistics, consumerWrapper, callbacksInvoker, logger))
35-
.SetPartitionsAssignedHandler(
36-
(_, topicPartitions) => OnPartitionsAssigned(
37-
topicPartitions,
38-
consumerWrapper,
39-
callbacksInvoker,
40-
logger))
36+
.SetPartitionsAssignedHandler((_, topicPartitions) => OnPartitionsAssigned(
37+
topicPartitions,
38+
consumerWrapper,
39+
callbacksInvoker,
40+
logger))
4141
.SetOffsetsCommittedHandler((_, offsets) => OnOffsetsCommitted(offsets, consumerWrapper, callbacksInvoker, logger))
4242
.SetErrorHandler((_, error) => OnError(error, consumerWrapper, callbacksInvoker, logger))
4343
.SetLogHandler((_, logMessage) => OnLog(logMessage, consumerWrapper, callbacksInvoker, logger));
@@ -49,18 +49,16 @@ public static IConfluentConsumerBuilder SetEventsHandlers(
4949
// overload with a Func<> results in an exception in the internal rebalance method since you are
5050
// not supposed to alter the partitions being revoked with the cooperative sticky strategy.
5151
consumerBuilder
52-
.SetPartitionsRevokedHandler(
53-
(_, topicPartitionOffsets) =>
54-
{
55-
OnPartitionsRevoked(topicPartitionOffsets, consumerWrapper, callbacksInvoker, logger);
56-
});
52+
.SetPartitionsRevokedHandler((_, topicPartitionOffsets) =>
53+
{
54+
OnPartitionsRevoked(topicPartitionOffsets, consumerWrapper, callbacksInvoker, logger);
55+
});
5756
}
5857
else
5958
{
6059
consumerBuilder
61-
.SetPartitionsRevokedHandler(
62-
(_, topicPartitionOffsets) =>
63-
OnPartitionsRevoked(topicPartitionOffsets, consumerWrapper, callbacksInvoker, logger));
60+
.SetPartitionsRevokedHandler((_, topicPartitionOffsets) =>
61+
OnPartitionsRevoked(topicPartitionOffsets, consumerWrapper, callbacksInvoker, logger));
6462
}
6563

6664
return consumerBuilder;
@@ -109,14 +107,13 @@ private static IEnumerable<TopicPartitionOffset> OnPartitionsAssigned(
109107

110108
List<TopicPartitionOffset>? topicPartitionOffsets = null;
111109

112-
callbacksInvoker.Invoke<IKafkaPartitionsAssignedCallback>(
113-
callback =>
114-
{
115-
IEnumerable<TopicPartitionOffset>? result = callback.OnPartitionsAssigned(topicPartitions, consumerWrapper.Consumer);
110+
callbacksInvoker.Invoke<IKafkaPartitionsAssignedCallback>(callback =>
111+
{
112+
IEnumerable<TopicPartitionOffset>? result = callback.OnPartitionsAssigned(topicPartitions, consumerWrapper.Consumer);
116113

117-
if (result != null)
118-
topicPartitionOffsets = [.. result];
119-
});
114+
if (result != null)
115+
topicPartitionOffsets = [.. result];
116+
});
120117

121118
topicPartitionOffsets ??= [.. topicPartitions.Select(partition => new TopicPartitionOffset(partition, Offset.Unset))];
122119

@@ -139,9 +136,8 @@ private static void OnPartitionsRevoked(
139136

140137
topicPartitionOffsets.ForEach(topicPartitionOffset => logger.LogPartitionRevoked(topicPartitionOffset, consumerWrapper.Consumer));
141138

142-
callbacksInvoker.Invoke<IKafkaPartitionsRevokedCallback>(
143-
callback =>
144-
callback.OnPartitionsRevoked(topicPartitionOffsets, consumerWrapper.Consumer));
139+
callbacksInvoker.Invoke<IKafkaPartitionsRevokedCallback>(callback =>
140+
callback.OnPartitionsRevoked(topicPartitionOffsets, consumerWrapper.Consumer));
145141
}
146142

147143
private static void OnOffsetsCommitted(
@@ -169,6 +165,21 @@ private static void OnOffsetsCommitted(
169165
callbacksInvoker.Invoke<IKafkaOffsetCommittedCallback>(callback => callback.OnOffsetsCommitted(offsets, consumerWrapper.Consumer));
170166
}
171167

168+
[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Exception logged")]
169+
private static void OnError(
170+
Error error,
171+
IConfluentProducerWrapper producerWrapper,
172+
ISilverbackLogger logger)
173+
{
174+
if (producerWrapper.Status is not (ClientStatus.Initialized or ClientStatus.Initializing))
175+
return;
176+
177+
if (error.IsFatal)
178+
logger.LogConfluentProducerFatalError(error, producerWrapper.Producer);
179+
else
180+
logger.LogConfluentProducerError(error, producerWrapper.Producer);
181+
}
182+
172183
[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Exception logged")]
173184
private static void OnError(
174185
Error error,
@@ -183,11 +194,10 @@ private static void OnError(
183194

184195
bool handled = false;
185196

186-
callbacksInvoker.Invoke<IKafkaConsumerErrorCallback>(
187-
handler =>
188-
{
189-
handled &= handler.OnConsumerError(error, consumerWrapper.Consumer);
190-
});
197+
callbacksInvoker.Invoke<IKafkaConsumerErrorCallback>(handler =>
198+
{
199+
handled &= handler.OnConsumerError(error, consumerWrapper.Consumer);
200+
});
191201

192202
if (handled)
193203
return;
@@ -207,11 +217,10 @@ private static void OnLog(
207217
{
208218
bool handled = false;
209219

210-
callbacksInvoker.Invoke<IKafkaProducerLogCallback>(
211-
handler =>
212-
{
213-
handled &= handler.OnProducerLog(logMessage, producerWrapper);
214-
});
220+
callbacksInvoker.Invoke<IKafkaProducerLogCallback>(handler =>
221+
{
222+
handled &= handler.OnProducerLog(logMessage, producerWrapper);
223+
});
215224

216225
if (handled)
217226
return;
@@ -248,11 +257,10 @@ private static void OnLog(
248257
{
249258
bool handled = false;
250259

251-
callbacksInvoker.Invoke<IKafkaConsumerLogCallback>(
252-
handler =>
253-
{
254-
handled &= handler.OnConsumerLog(logMessage, consumerWrapper.Consumer);
255-
});
260+
callbacksInvoker.Invoke<IKafkaConsumerLogCallback>(handler =>
261+
{
262+
handled &= handler.OnConsumerLog(logMessage, consumerWrapper.Consumer);
263+
});
256264

257265
if (handled)
258266
return;

src/Silverback.Integration.Kafka/Messaging/Broker/Kafka/ConfluentConsumerBuilder.cs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,18 @@ public class ConfluentConsumerBuilder : IConfluentConsumerBuilder
1717

1818
private Action<IConsumer<byte[]?, byte[]?>, string>? _statisticsHandler;
1919

20-
private Action<IConsumer<byte[]?, byte[]?>, Error>? _errorHandler;
21-
22-
private Func<IConsumer<byte[]?, byte[]?>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>?
23-
_partitionsAssignedHandler;
20+
private Func<IConsumer<byte[]?, byte[]?>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>? _partitionsAssignedHandler;
2421

25-
private Func<IConsumer<byte[]?, byte[]?>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>?
26-
_partitionsRevokedHandlerFunc;
22+
private Func<IConsumer<byte[]?, byte[]?>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>? _partitionsRevokedHandlerFunc;
2723

28-
private Action<IConsumer<byte[]?, byte[]?>, List<TopicPartitionOffset>>?
29-
_partitionsRevokedHandlerAction;
24+
private Action<IConsumer<byte[]?, byte[]?>, List<TopicPartitionOffset>>? _partitionsRevokedHandlerAction;
3025

3126
private Action<IConsumer<byte[]?, byte[]?>, CommittedOffsets>? _offsetsCommittedHandler;
3227

3328
private Action<IConsumer<byte[]?, byte[]?>, LogMessage>? _logHandler;
3429

30+
private Action<IConsumer<byte[]?, byte[]?>, Error>? _errorHandler;
31+
3532
/// <inheritdoc cref="IConfluentConsumerBuilder.SetConfig" />
3633
public IConfluentConsumerBuilder SetConfig(ConsumerConfig config)
3734
{
@@ -46,13 +43,6 @@ public IConfluentConsumerBuilder SetStatisticsHandler(Action<IConsumer<byte[]?,
4643
return this;
4744
}
4845

49-
/// <inheritdoc cref="IConfluentConsumerBuilder.SetErrorHandler" />
50-
public IConfluentConsumerBuilder SetErrorHandler(Action<IConsumer<byte[]?, byte[]?>, Error> errorHandler)
51-
{
52-
_errorHandler = errorHandler;
53-
return this;
54-
}
55-
5646
/// <inheritdoc cref="IConfluentConsumerBuilder.SetPartitionsAssignedHandler(Func{IConsumer{byte[],byte[]},List{TopicPartition},IEnumerable{TopicPartitionOffset}})" />
5747
public IConfluentConsumerBuilder SetPartitionsAssignedHandler(
5848
Func<IConsumer<byte[]?, byte[]?>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>
@@ -108,6 +98,13 @@ public IConfluentConsumerBuilder SetLogHandler(Action<IConsumer<byte[]?, byte[]?
10898
return this;
10999
}
110100

101+
/// <inheritdoc cref="IConfluentConsumerBuilder.SetErrorHandler" />
102+
public IConfluentConsumerBuilder SetErrorHandler(Action<IConsumer<byte[]?, byte[]?>, Error> errorHandler)
103+
{
104+
_errorHandler = errorHandler;
105+
return this;
106+
}
107+
111108
/// <inheritdoc cref="IConfluentConsumerBuilder.Build" />
112109
public IConsumer<byte[]?, byte[]?> Build()
113110
{
@@ -119,9 +116,6 @@ public IConfluentConsumerBuilder SetLogHandler(Action<IConsumer<byte[]?, byte[]?
119116
if (_statisticsHandler != null)
120117
builder.SetStatisticsHandler(_statisticsHandler);
121118

122-
if (_errorHandler != null)
123-
builder.SetErrorHandler(_errorHandler);
124-
125119
if (_partitionsAssignedHandler != null)
126120
builder.SetPartitionsAssignedHandler(_partitionsAssignedHandler);
127121

@@ -136,6 +130,9 @@ public IConfluentConsumerBuilder SetLogHandler(Action<IConsumer<byte[]?, byte[]?
136130
if (_logHandler != null)
137131
builder.SetLogHandler(_logHandler);
138132

133+
if (_errorHandler != null)
134+
builder.SetErrorHandler(_errorHandler);
135+
139136
return builder.Build();
140137
}
141138
}

0 commit comments

Comments
 (0)