diff --git a/src/coord/src/timestamp.rs b/src/coord/src/timestamp.rs index da02296b9753f..32ae51288f500 100644 --- a/src/coord/src/timestamp.rs +++ b/src/coord/src/timestamp.rs @@ -631,6 +631,9 @@ impl Timestamper { id: GlobalId, kc: KafkaSourceConnector, ) -> Option { + // These keys do not make sense for the timestamping connector, and will + // be filtered out (fixes #6313) + const CONF_DENYLIST: &'static [&'static str] = &["statistics.interval.ms"]; let mut config = ClientConfig::new(); config.set("bootstrap.servers", &kc.addrs.to_string()); @@ -639,7 +642,9 @@ impl Timestamper { } for (k, v) in &kc.config_options { - config.set(k, v); + if !CONF_DENYLIST.contains(&k.as_str()) { + config.set(k, v); + } } let consumer = match config.create::() { @@ -931,6 +936,9 @@ fn rt_kafka_metadata_fetch_loop(c: RtKafkaConnector, consumer: BaseConsumer, wai } } + // Poll once to clear any extraneous messages on this queue. + consumer.poll(Duration::from_secs(0)); + if current_partition_count > 0 { thread::sleep(wait); } else {