Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/sql/src/kafka_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ impl Config {
}

// Allows for returning a default value for this configuration option
fn set_default(mut self, d: Option<String>) -> Self {
self.default = d;
self
}
// fn set_default(mut self, d: Option<String>) -> Self {
// self.default = d;
// self
// }

// Get the appropriate String to use as the Kafka config key.
fn get_key(&self) -> String {
Expand Down Expand Up @@ -148,10 +148,11 @@ pub fn extract_config(
// The range of values comes from `statistics.interval.ms` in
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
ValType::Number(0, 86_400_000),
)
.set_default(Some(
chrono::Duration::seconds(1).num_milliseconds().to_string(),
)),
),
// TODO - stats can be reenabled by default when we figure out why it's leaking memory.
// .set_default(Some(
// chrono::Duration::seconds(1).num_milliseconds().to_string(),
// )),
Config::new(
"topic_metadata_refresh_interval_ms",
// The range of values comes from `topic.metadata.refresh.interval.ms` in
Expand Down
9 changes: 5 additions & 4 deletions test/catalog-compat/catcompatck/catcompatck
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,12 @@ a b
3 1
1 2

# TODO: Uncomment out test once kafka stats are enabled by default once again
# Kafka metrics for the real_time_src should be enabled now
# Count should be 2 because there are two materialized views on real_time_src
# If real_time_src_no_stats were also emitting stats, there would be 3 rows
> SELECT count(*) FROM mz_kafka_consumer_statistics;
count
-----
2
# > SELECT count(*) FROM mz_kafka_consumer_statistics;
# count
# -----
# 2
EOF
56 changes: 29 additions & 27 deletions test/testdrive/avro-sources.td
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,21 @@ a b
5 6
9 10

# TODO: Uncomment out test once kafka stats are enabled by default once again
# There should be two partitions for the last created source / consumer (non_dbz_data_varying_partition_2)
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
count
-----
1
1
1
1
2
2
2
2
2
2
# > SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
# count
# -----
# 1
# 1
# 1
# 1
# 2
# 2
# 2
# 2
# 2
# 2

> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
Expand All @@ -367,21 +368,22 @@ a b
9 10
11 12

# TODO: Uncomment out test once kafka stats are enabled by default once again
# There should three partitions for the last three sources / consumers (non_dbz_data_varying_partition_[123])
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
count
-----
1
1
1
1
2
2
2
2
3
3
3
# > SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
# count
# -----
# 1
# 1
# 1
# 1
# 2
# 2
# 2
# 2
# 3
# 3
# 3

$ set-sql-timeout duration=12.7s

Expand Down
1 change: 1 addition & 0 deletions test/testdrive/kafka-stats.td
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ $ kafka-create-topic topic=data

> CREATE SOURCE data
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}'
WITH (statistics_interval_ms = 1000)
FORMAT AVRO USING SCHEMA '${schema}'

> CREATE MATERIALIZED VIEW test1 AS
Expand Down