[pkg/kafka/configkafka] Add conn_idle_timeout for franz-go clients#45321
[pkg/kafka/configkafka] Add conn_idle_timeout for franz-go clients#45321ChrsMark merged 19 commits intoopen-telemetry:mainfrom
Conversation
Did you consider internally dividing the configured default by 2, and using that to set ConnIdleTimeout in franz-go? i.e. ensure the configured timeout is an upper bound. Then we could set the default to 9m. @twmb do you have any advice? Do you recall why ConnIdleTimeout is a rough timeout, rather than a strict bound? |
Thanks, that's a great suggestion.
The reap connection is done in a background loop periodically in franz-go. So instead of having a precise tracker of connection idle time, e.g. implemented by having a goroutine checking for idle timeout for each connection, this is currently implemented by checking all O(N) connections every 20 seconds by default, to close connections that have been idle for 20s, which IMO is quite aggressive in comparison to other client implementation default (e.g. 9m in java). The current design is simple to reason about, and is more lightweight than having a goroutine per conn or a heap to monitor for idle connections, but the TLS handshake + network round trip overhead may add up very quickly due to the 20s default. |
|
^^Accurate I can't rely on just SetDeadline on individual connections, because that kills the underlying connection but doesn't notify what is using the connection. Something in the application needs to monitor if these inactive connections -- and the goroutines managing reading from them / writing to them -- should be closed. Rather than complicated logic that constantly shuffles a slice based on idleness or not, I've found it easier to just have a periodic loop that ranges over everything. Something can be considered "too idle" immediately after the loop runs - say it was at 19s, loop runs, and now is idle 1s later. 20s later the connection will be reaped, for a total of 39s. It's much easier internally to reason about and implement, but it does have this liiiitle bit of end user confusion. |
|
I'd probably still on not dividing by two though. If I want to have connections eligible to be closed after 20s, I'd be surprised if they're being closed at 10s. |
Fair point. Thanks @twmb! |
|
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
|
I'll try to improve the franz-go implementation of idle connection handling before plumbing this config through. Closing the PR for now. |
|
Waiting for twmb/franz-go#1245 to be released and its bump incorporated into this PR, to avoid 2 * 9m connIdleTimeout overshooting the server idle timeout, running into edge cases where server closes idle connections first. |
pkg/kafka/configkafka/config.go
Outdated
| } | ||
| } | ||
| if c.ConnectionIdleTimeout <= 0 { | ||
| return fmt.Errorf("connection_idle_timeout (%s) must be positive", c.ConnectionIdleTimeout) |
There was a problem hiding this comment.
Should we check that it's >= 100ms?
There was a problem hiding this comment.
thanks, nice catch. However, I don't think ClientConfig.Validate is called anywhere at all. In that case,
- this line serves as a sanity check at otel level. It is here for consistency, not for function, because it isn't called.
- I've created [internal/kafka] Validate franz-go configuration options early in Config.Validate #46024 to properly invoke franz-go config validation at config validation time now that sarama has been removed in many components.
There was a problem hiding this comment.
this line serves as a sanity check at otel level. It is here for consistency, not for function, because it isn't called.
It should be called by confmap. Did you observe that not to be the case?
There was a problem hiding this comment.
Assuming a diff
diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go
index 87dc786cd5..27dbe66acd 100644
--- a/receiver/kafkareceiver/config_test.go
+++ b/receiver/kafkareceiver/config_test.go
@@ -471,6 +471,15 @@ func TestConfigValidate(t *testing.T) {
},
expectedErr: "profiles.exclude_topics contains empty string",
},
+ {
+ name: "invalid config with negative connection idle timeout",
+ config: &Config{
+ ClientConfig: configkafka.ClientConfig{
+ ConnectionIdleTimeout: -1,
+ },
+ },
+ expectedErr: "connection_idle_timeout (-1) must be positive",
+ },
}
for _, tt := range tests {
output
=== RUN TestConfigValidate
=== RUN TestConfigValidate/invalid_config_with_negative_connection_idle_timeout
config_test.go:496:
Error Trace: /home/carson/projects/opentelemetry-collector-contrib/receiver/kafkareceiver/config_test.go:496
Error: An error is expected but got nil.
Test: TestConfigValidate/invalid_config_with_negative_connection_idle_timeout
--- FAIL: TestConfigValidate/invalid_config_with_negative_connection_idle_timeout (0.00s)
--- FAIL: TestConfigValidate (0.00s)
There was a problem hiding this comment.
I'm slightly inclined to delete this conn idle timeout validation in configkafka and to 100% rely on franz-go validation in #46024 . wdyt?
There was a problem hiding this comment.
I think the test isn't failing because we're calling Config.Validate, which (intentionally) doesn't call the Validate method of fields. IIANM we should use xconfmap.Validate instead.
As for relying on franz-go for validation... I don't know, but my instinct is that we should validate in configkafka, and only ever pass through valid options to franz-go.
There was a problem hiding this comment.
TIL config.Validate vs xconfmap.Validate, thanks for the pointer. I'll update the tests to use xconfmap.Validate in a separate PR because it requires fixing existing tests if that's desired.
As for relying on franz-go for validation... I don't know, but my instinct is that we should validate in configkafka, and only ever pass through valid options to franz-go.
There are a few options:
- A: no check in config.Validate & rely on franz-go validation
- B: fully align configkafka validation and franz-go validation values. But it also means when franz-go validation rules changes, it requires a corresponding change in configkafka
- C: middle ground between A and B. configkafka does basic sanity check to reject clearly invalid values, assuming this is also written for other kafka clients e.g. sarama. Then rely on franz-go for actual min/max checks.
The drawback of B is demonstrated in twmb/franz-go#1245 where we changed an arbitrary min conn idle timeout from 1s to 100ms. It is not hard to imagine these minor tweaks to happen over time.
With C it implies inconsistent error message if configured conn idle timeout is 0ms vs 1ms, if configkafka rejects 0ms and franz-go rejects >0ms and <100ms.
A helps us avoid the maintenance with no apparent loss in user experience.
The PR is currently doing B which is also fine IMO
There was a problem hiding this comment.
I would prefer option B, though we don't necessarily have to make the exactly the same; the collector could also be more strict (but not less).
This would align well with open-telemetry/opentelemetry-collector#14543
Align the Kafka client config option name with the ConnIdleTimeout field to keep naming consistent across code and YAML. Update schema, docs, and config-loading tests to use the new key. Co-authored-by: Cursor <cursoragent@cursor.com>
Align the conn_idle_timeout description with franz-go semantics: idle connections are not reused after the timeout and may be closed later. Co-authored-by: Cursor <cursoragent@cursor.com>
Keep release notes aligned with the final config key and franz-go semantics by describing that idle connections are not reused and may be closed. Co-authored-by: Cursor <cursoragent@cursor.com>
…pen-telemetry#45321) #### Description This PR adds a configurable Kafka client idle-connection timeout for franz-go based components. Key changes included in this PR branch (relative to `main`): - Add `conn_idle_timeout` to `configkafka.ClientConfig` with default `9m`. - Validate the option as a positive duration. - Pass the setting into franz-go via `kgo.ConnIdleTimeout(...)` in shared client setup. - Expose the option in schema and component documentation. - Clarify the option description to match franz-go 1.20.7 semantics: idle connections are not reused after the timeout and may be closed. - Add/update config-loading tests and testdata for `configkafka`, `kafkareceiver`, and related observer coverage. - Add changelog entries for Kafka receiver and exporter. #### Link to tracking issue N/A #### Testing - `go test ./...` (run in `pkg/kafka/configkafka`) - `go test ./...` (run in `internal/kafka`) - `go test ./...` (run in `receiver/kafkareceiver`) - `go test ./...` (run in `exporter/kafkaexporter`) #### Documentation Updated configuration docs in: - `receiver/kafkareceiver/README.md` - `exporter/kafkaexporter/README.md` --------- Co-authored-by: Cursor <cursoragent@cursor.com>
Description
This PR adds a configurable Kafka client idle-connection timeout for franz-go based components.
Key changes included in this PR branch (relative to
main):conn_idle_timeouttoconfigkafka.ClientConfigwith default9m.kgo.ConnIdleTimeout(...)in shared client setup.configkafka,kafkareceiver, and related observer coverage.Link to tracking issue
N/A
Testing
go test ./...(run inpkg/kafka/configkafka)go test ./...(run ininternal/kafka)go test ./...(run inreceiver/kafkareceiver)go test ./...(run inexporter/kafkaexporter)Documentation
Updated configuration docs in:
receiver/kafkareceiver/README.mdexporter/kafkaexporter/README.md