From a5c4b2a077c83032d605728c7c2b02f351fef50f Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 22:39:07 +0900 Subject: [PATCH 01/22] refactor(connectors): add iggy_clients and state_path to RuntimeContext Extend RuntimeContext with Arc and state_path fields to enable individual connector restart without full runtime restart. Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/context.rs | 7 +++++++ core/connectors/runtime/src/main.rs | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index 292ea226c0..12199339f0 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -19,6 +19,7 @@ use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig, SourceConfig}; use crate::configs::runtime::ConnectorsRuntimeConfig; use crate::metrics::Metrics; +use crate::stream::IggyClients; use crate::{ SinkConnectorWrapper, SourceConnectorWrapper, manager::{ @@ -40,6 +41,8 @@ pub struct RuntimeContext { pub config_provider: Arc, pub metrics: Arc, pub start_time: IggyTimestamp, + pub iggy_clients: Arc, + pub state_path: String, } pub fn init( @@ -49,6 +52,8 @@ pub fn init( sink_wrappers: &[SinkConnectorWrapper], source_wrappers: &[SourceConnectorWrapper], config_provider: Box, + iggy_clients: Arc, + state_path: String, ) -> RuntimeContext { let metrics = Arc::new(Metrics::init()); let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers)); @@ -64,6 +69,8 @@ pub fn init( config_provider: Arc::from(config_provider), metrics, start_time: IggyTimestamp::now(), + iggy_clients, + state_path, } } diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 1d53e71cfd..a0ebdff416 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -143,7 +143,7 @@ async fn main() -> Result<(), RuntimeError> { info!("State will be stored in: {}", config.state.path); - let iggy_clients = stream::init(config.iggy.clone()).await?; + let iggy_clients = Arc::new(stream::init(config.iggy.clone()).await?); let connectors_config_provider: Box = create_connectors_config_provider(&config.connectors).await?; @@ -216,6 +216,8 @@ async fn main() -> Result<(), RuntimeError> { &sink_wrappers, &source_wrappers, connectors_config_provider, + iggy_clients.clone(), + config.state.path.clone(), ); let context = Arc::new(context); api::init(&config.http, context.clone()).await; From a9f20095ce23f1c4bb6173d8de4202a857ed710a Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 22:41:07 +0900 Subject: [PATCH 02/22] refactor(connectors): add lifecycle fields to SinkDetails and SourceDetails Add shutdown_tx (watch::Sender) and task_handles/handler_tasks (Vec) to SinkDetails and SourceDetails for per-connector lifecycle management during restart. Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/context.rs | 4 ++++ core/connectors/runtime/src/manager/sink.rs | 5 ++++- core/connectors/runtime/src/manager/source.rs | 5 ++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index 12199339f0..d583cf32c7 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -110,6 +110,8 @@ fn map_sinks( plugin_config_format: sink_plugin.config_format, }, config: sink_config.clone(), + shutdown_tx: None, + task_handles: vec![], }); } } @@ -152,6 +154,8 @@ fn map_sources( plugin_config_format: source_plugin.config_format, }, config: source_config.clone(), + shutdown_tx: None, + handler_tasks: vec![], }); } } diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 0bc058201d..c6709d3174 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -22,7 +22,8 @@ use dashmap::DashMap; use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus}; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, watch}; +use tokio::task::JoinHandle; #[derive(Debug)] pub struct SinkManager { @@ -115,4 +116,6 @@ pub struct SinkInfo { pub struct SinkDetails { pub info: SinkInfo, pub config: SinkConfig, + pub shutdown_tx: Option>, + pub task_handles: Vec>, } diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index b259fd8cf4..75d78f0328 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -22,7 +22,8 @@ use dashmap::DashMap; use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus}; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, watch}; +use tokio::task::JoinHandle; #[derive(Debug)] pub struct SourceManager { @@ -115,4 +116,6 @@ pub struct SourceInfo { pub struct SourceDetails { pub info: SourceInfo, pub config: SourceConfig, + pub shutdown_tx: Option>, + pub handler_tasks: Vec>, } From 524a62b2be3a38e002dec3d5da0c7ce968306001 Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 22:42:23 +0900 Subject: [PATCH 03/22] feat(connectors): add shutdown channel to sink consume loop Add watch::Receiver to consume_messages() with tokio::select! for cooperative cancellation. Store shutdown_tx and task JoinHandles in SinkDetails for per-connector lifecycle management. Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/sink.rs | 35 ++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 1e82703fa6..4500195c78 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -42,6 +42,7 @@ use std::{ sync::{Arc, atomic::Ordering}, time::Instant, }; +use tokio::sync::watch; use tracing::{debug, error, info, warn}; pub async fn init( @@ -206,11 +207,15 @@ pub fn consume(sinks: Vec, context: Arc) { continue; } info!("Starting consume for sink with ID: {}...", plugin.id); + let (shutdown_tx, shutdown_rx) = watch::channel(()); + let mut task_handles = Vec::new(); + for consumer in plugin.consumers { let plugin_key = plugin.key.clone(); let context = context.clone(); + let shutdown_rx = shutdown_rx.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { context .sinks .update_status( @@ -230,6 +235,7 @@ pub fn consume(sinks: Vec, context: Arc) { plugin.verbose, &plugin_key, &context.metrics, + shutdown_rx, ) .await { @@ -245,11 +251,22 @@ pub fn consume(sinks: Vec, context: Arc) { return; } info!( - "Consume messages for sink connector with ID: {} started successfully.", + "Consume messages for sink connector with ID: {} completed.", plugin.id ); }); + task_handles.push(handle); } + + let plugin_key = plugin.key.clone(); + let context_clone = context.clone(); + tokio::spawn(async move { + if let Some(details) = context_clone.sinks.get(&plugin_key).await { + let mut details = details.lock().await; + details.shutdown_tx = Some(shutdown_tx); + details.task_handles = task_handles; + } + }); } } } @@ -265,6 +282,7 @@ async fn consume_messages( verbose: bool, plugin_key: &str, metrics: &Arc, + mut shutdown_rx: watch::Receiver<()>, ) -> Result<(), RuntimeError> { info!("Started consuming messages for sink connector with ID: {plugin_id}"); let batch_size = batch_size as usize; @@ -274,7 +292,18 @@ async fn consume_messages( topic: consumer.topic().to_string(), }; - while let Some(message) = consumer.next().await { + loop { + let message = tokio::select! { + _ = shutdown_rx.changed() => { + info!("Sink connector with ID: {plugin_id} received shutdown signal"); + break; + } + msg = consumer.next() => msg, + }; + + let Some(message) = message else { + break; + }; let Ok(message) = message else { error!("Failed to receive message."); continue; From 4437792483545aa50368863346060b2bbc139580 Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 22:47:05 +0900 Subject: [PATCH 04/22] refactor(connectors): store Arc in Manager for restart support - Wrap SinkApi/SourceApi containers in Arc for shared ownership - Store container references in SinkDetails/SourceDetails via Manager - Add manual Debug impl for Details structs (dlopen2 Container lacks Debug) - Make SinkApi/SourceApi pub(crate) for cross-module access Signed-off-by: shin --- core/connectors/runtime/src/context.rs | 2 + core/connectors/runtime/src/main.rs | 53 ++++++++++++++----- core/connectors/runtime/src/manager/sink.rs | 15 +++++- core/connectors/runtime/src/manager/source.rs | 15 +++++- 4 files changed, 69 insertions(+), 16 deletions(-) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index d583cf32c7..8ecca45742 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -112,6 +112,7 @@ fn map_sinks( config: sink_config.clone(), shutdown_tx: None, task_handles: vec![], + container: None, }); } } @@ -156,6 +157,7 @@ fn map_sources( config: source_config.clone(), shutdown_tx: None, handler_tasks: vec![], + container: None, }); } } diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index a0ebdff416..4a18078aef 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -69,8 +69,8 @@ static PLUGIN_ID: AtomicU32 = AtomicU32::new(1); const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"]; const DEFAULT_CONFIG_PATH: &str = "core/connectors/runtime/config.toml"; -#[derive(WrapperApi)] -struct SourceApi { +#[derive(WrapperApi, Debug)] +pub(crate) struct SourceApi { iggy_source_open: extern "C" fn( id: u32, config_ptr: *const u8, @@ -84,8 +84,8 @@ struct SourceApi { iggy_source_version: extern "C" fn() -> *const std::ffi::c_char, } -#[derive(WrapperApi)] -struct SinkApi { +#[derive(WrapperApi, Debug)] +pub(crate) struct SinkApi { iggy_sink_open: extern "C" fn( id: u32, config_ptr: *const u8, @@ -167,21 +167,27 @@ async fn main() -> Result<(), RuntimeError> { let mut sink_wrappers = vec![]; let mut sink_with_plugins = HashMap::new(); - for (key, sink) in sinks { + let mut sink_containers_by_key: HashMap>> = HashMap::new(); + for (path, sink) in sinks { + let container = Arc::new(sink.container); + let callback = container.iggy_sink_consume; let plugin_ids = sink .plugins .iter() .filter(|plugin| plugin.error.is_none()) .map(|plugin| plugin.id) .collect(); + for plugin in &sink.plugins { + sink_containers_by_key.insert(plugin.key.clone(), container.clone()); + } sink_wrappers.push(SinkConnectorWrapper { - callback: sink.container.iggy_sink_consume, + callback, plugins: sink.plugins, }); sink_with_plugins.insert( - key, + path, SinkWithPlugins { - container: sink.container, + container, plugin_ids, }, ); @@ -189,21 +195,27 @@ async fn main() -> Result<(), RuntimeError> { let mut source_wrappers = vec![]; let mut source_with_plugins = HashMap::new(); - for (key, source) in sources { + let mut source_containers_by_key: HashMap>> = HashMap::new(); + for (path, source) in sources { + let container = Arc::new(source.container); + let callback = container.iggy_source_handle; let plugin_ids = source .plugins .iter() .filter(|plugin| plugin.error.is_none()) .map(|plugin| plugin.id) .collect(); + for plugin in &source.plugins { + source_containers_by_key.insert(plugin.key.clone(), container.clone()); + } source_wrappers.push(SourceConnectorWrapper { - callback: source.container.iggy_source_handle, + callback, plugins: source.plugins, }); source_with_plugins.insert( - key, + path, SourceWithPlugins { - container: source.container, + container, plugin_ids, }, ); @@ -219,6 +231,19 @@ async fn main() -> Result<(), RuntimeError> { iggy_clients.clone(), config.state.path.clone(), ); + for (key, container) in sink_containers_by_key { + if let Some(details) = context.sinks.get(&key).await { + let mut details = details.lock().await; + details.container = Some(container); + } + } + for (key, container) in source_containers_by_key { + if let Some(details) = context.sources.get(&key).await { + let mut details = details.lock().await; + details.container = Some(container); + } + } + let context = Arc::new(context); api::init(&config.http, context.clone()).await; @@ -403,7 +428,7 @@ struct SinkConnectorWrapper { } struct SinkWithPlugins { - container: Container, + container: Arc>, plugin_ids: Vec, } @@ -432,7 +457,7 @@ struct SourceConnectorProducer { } struct SourceWithPlugins { - container: Container, + container: Arc>, plugin_ids: Vec, } diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index c6709d3174..87e199143c 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -16,11 +16,14 @@ * specific language governing permissions and limitations * under the License. */ +use crate::SinkApi; use crate::configs::connectors::{ConfigFormat, SinkConfig}; use crate::metrics::Metrics; use dashmap::DashMap; +use dlopen2::wrapper::Container; use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus}; use std::collections::HashMap; +use std::fmt; use std::sync::Arc; use tokio::sync::{Mutex, watch}; use tokio::task::JoinHandle; @@ -112,10 +115,20 @@ pub struct SinkInfo { pub plugin_config_format: Option, } -#[derive(Debug)] pub struct SinkDetails { pub info: SinkInfo, pub config: SinkConfig, pub shutdown_tx: Option>, pub task_handles: Vec>, + pub container: Option>>, +} + +impl fmt::Debug for SinkDetails { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SinkDetails") + .field("info", &self.info) + .field("config", &self.config) + .field("container", &self.container.as_ref().map(|_| "...")) + .finish() + } } diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 75d78f0328..30e3a10f63 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -16,11 +16,14 @@ * specific language governing permissions and limitations * under the License. */ +use crate::SourceApi; use crate::configs::connectors::{ConfigFormat, SourceConfig}; use crate::metrics::Metrics; use dashmap::DashMap; +use dlopen2::wrapper::Container; use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus}; use std::collections::HashMap; +use std::fmt; use std::sync::Arc; use tokio::sync::{Mutex, watch}; use tokio::task::JoinHandle; @@ -112,10 +115,20 @@ pub struct SourceInfo { pub plugin_config_format: Option, } -#[derive(Debug)] pub struct SourceDetails { pub info: SourceInfo, pub config: SourceConfig, pub shutdown_tx: Option>, pub handler_tasks: Vec>, + pub container: Option>>, +} + +impl fmt::Debug for SourceDetails { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SourceDetails") + .field("info", &self.info) + .field("config", &self.config) + .field("container", &self.container.as_ref().map(|_| "...")) + .finish() + } } From c0207048f9525ebe13b50d393b28a669b12e1f06 Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 22:51:35 +0900 Subject: [PATCH 05/22] feat(connectors): add stop/start/restart methods to SinkManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SinkManager.stop_connector: shutdown signal → await tasks → FFI close - SinkManager.start_connector: FFI open → create consumers → spawn tasks - SinkManager.restart_connector: stop → load active config → start - Make init_sink and consume_messages pub(crate) for reuse - Add setup_sink_consumers helper for consumer creation Signed-off-by: shin --- core/connectors/runtime/src/manager/sink.rs | 159 +++++++++++++++++++- core/connectors/runtime/src/sink.rs | 61 +++++++- 2 files changed, 217 insertions(+), 3 deletions(-) diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 87e199143c..58c2c2cd82 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -16,17 +16,24 @@ * specific language governing permissions and limitations * under the License. */ +use crate::PLUGIN_ID; use crate::SinkApi; -use crate::configs::connectors::{ConfigFormat, SinkConfig}; +use crate::configs::connectors::{ConfigFormat, ConnectorsConfigProvider, SinkConfig}; +use crate::error::RuntimeError; use crate::metrics::Metrics; +use crate::sink; use dashmap::DashMap; use dlopen2::wrapper::Container; +use iggy::prelude::IggyClient; use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus}; use std::collections::HashMap; use std::fmt; use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; use tokio::sync::{Mutex, watch}; use tokio::task::JoinHandle; +use tracing::{error, info}; #[derive(Debug)] pub struct SinkManager { @@ -100,6 +107,156 @@ impl SinkManager { sink.info.last_error = Some(ConnectorError::new(error_message)); } } + + pub async fn stop_connector( + &self, + key: &str, + metrics: &Arc, + ) -> Result<(), RuntimeError> { + let details_arc = self + .sinks + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; + + let (shutdown_tx, task_handles, plugin_id, container) = { + let mut details = details_arc.lock().await; + ( + details.shutdown_tx.take(), + std::mem::take(&mut details.task_handles), + details.info.id, + details.container.clone(), + ) + }; + + if let Some(tx) = shutdown_tx { + let _ = tx.send(()); + } + + for handle in task_handles { + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; + } + + if let Some(container) = &container { + info!("Closing sink connector with ID: {plugin_id} for plugin: {key}"); + (container.iggy_sink_close)(plugin_id); + info!("Closed sink connector with ID: {plugin_id} for plugin: {key}"); + } + + { + let mut details = details_arc.lock().await; + let old_status = details.info.status; + details.info.status = ConnectorStatus::Stopped; + details.info.last_error = None; + if old_status == ConnectorStatus::Running { + metrics.decrement_sinks_running(); + } + } + + Ok(()) + } + + pub async fn start_connector( + &self, + key: &str, + config: &SinkConfig, + iggy_client: &IggyClient, + metrics: &Arc, + ) -> Result<(), RuntimeError> { + let details_arc = self + .sinks + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; + + let container = { + let details = details_arc.lock().await; + details.container.clone().ok_or_else(|| { + RuntimeError::InvalidConfiguration(format!( + "No container loaded for sink: {key}" + )) + })? + }; + + let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst); + + sink::init_sink( + &container, + &config.plugin_config.clone().unwrap_or_default(), + plugin_id, + )?; + info!("Sink connector with ID: {plugin_id} for plugin: {key} initialized successfully."); + + let consumers = sink::setup_sink_consumers(key, config, iggy_client).await?; + + let (shutdown_tx, shutdown_rx) = watch::channel(()); + let callback = container.iggy_sink_consume; + let verbose = config.verbose; + let mut task_handles = Vec::new(); + + for (consumer, decoder, batch_size, transforms) in consumers { + let plugin_key = key.to_string(); + let metrics_clone = metrics.clone(); + let shutdown_rx = shutdown_rx.clone(); + + let handle = tokio::spawn(async move { + if let Err(error) = sink::consume_messages( + plugin_id, + decoder, + batch_size, + callback, + transforms, + consumer, + verbose, + &plugin_key, + &metrics_clone, + shutdown_rx, + ) + .await + { + error!( + "Failed to consume messages for sink connector with ID: {plugin_id}: {error}" + ); + } + }); + task_handles.push(handle); + } + + { + let mut details = details_arc.lock().await; + details.info.id = plugin_id; + details.info.status = ConnectorStatus::Running; + details.info.last_error = None; + details.config = config.clone(); + details.shutdown_tx = Some(shutdown_tx); + details.task_handles = task_handles; + metrics.increment_sinks_running(); + } + + Ok(()) + } + + pub async fn restart_connector( + &self, + key: &str, + config_provider: &dyn ConnectorsConfigProvider, + iggy_client: &IggyClient, + metrics: &Arc, + ) -> Result<(), RuntimeError> { + info!("Restarting sink connector: {key}"); + self.stop_connector(key, metrics).await?; + + let config = config_provider + .get_sink_config(key, None) + .await + .map_err(|e| RuntimeError::InvalidConfiguration(e.to_string()))? + .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; + + self.start_connector(key, &config, iggy_client, metrics) + .await?; + info!("Sink connector: {key} restarted successfully."); + Ok(()) + } } #[derive(Debug, Clone)] diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 4500195c78..320e090456 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -272,7 +272,7 @@ pub fn consume(sinks: Vec, context: Arc) { } #[allow(clippy::too_many_arguments)] -async fn consume_messages( +pub(crate) async fn consume_messages( plugin_id: u32, decoder: Arc, batch_size: u32, @@ -385,7 +385,7 @@ fn get_plugin_version(container: &Container) -> String { } } -fn init_sink( +pub(crate) fn init_sink( container: &Container, plugin_config: &serde_json::Value, id: u32, @@ -406,6 +406,63 @@ fn init_sink( } } +pub(crate) async fn setup_sink_consumers( + key: &str, + config: &SinkConfig, + iggy_client: &IggyClient, +) -> Result< + Vec<( + IggyConsumer, + Arc, + u32, + Vec>, + )>, + RuntimeError, +> { + let transforms = if let Some(transforms_config) = &config.transforms { + transform::load(transforms_config).map_err(|error| { + RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) + })? + } else { + vec![] + }; + + let mut consumers = Vec::new(); + for stream in config.streams.iter() { + let poll_interval = + IggyDuration::from_str(stream.poll_interval.as_deref().unwrap_or("5ms")).map_err( + |error| { + RuntimeError::InvalidConfiguration(format!("Invalid poll interval: {error}")) + }, + )?; + let default_consumer_group = format!("iggy-connect-sink-{key}"); + let consumer_group = stream + .consumer_group + .as_deref() + .unwrap_or(&default_consumer_group); + let batch_length = stream.batch_length.unwrap_or(1000); + for topic in stream.topics.iter() { + let mut consumer = iggy_client + .consumer_group(consumer_group, &stream.stream, topic)? + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::next()) + .poll_interval(poll_interval) + .batch_length(batch_length) + .build(); + consumer.init().await?; + consumers.push(( + consumer, + stream.schema.decoder(), + batch_length, + transforms.clone(), + )); + } + } + Ok(consumers) +} + async fn process_messages( plugin_id: u32, messages_metadata: MessagesMetadata, From dc4425d2de2c5dda8242be0457cba3efbec79bd4 Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 23:04:13 +0900 Subject: [PATCH 06/22] feat(connectors): add stop/start/restart methods to SourceManager Signed-off-by: shin --- core/connectors/runtime/src/manager/source.rs | 161 +++++++++++++- core/connectors/runtime/src/source.rs | 198 +++++++++++++++++- 2 files changed, 355 insertions(+), 4 deletions(-) diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 30e3a10f63..2d0939631f 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -16,17 +16,26 @@ * specific language governing permissions and limitations * under the License. */ +use crate::PLUGIN_ID; use crate::SourceApi; -use crate::configs::connectors::{ConfigFormat, SourceConfig}; +use crate::configs::connectors::{ConfigFormat, ConnectorsConfigProvider, SourceConfig}; +use crate::context::RuntimeContext; +use crate::error::RuntimeError; use crate::metrics::Metrics; +use crate::source; +use crate::state::{StateProvider, StateStorage}; use dashmap::DashMap; use dlopen2::wrapper::Container; +use iggy::prelude::IggyClient; use iggy_connector_sdk::api::{ConnectorError, ConnectorStatus}; use std::collections::HashMap; use std::fmt; use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; use tokio::sync::{Mutex, watch}; use tokio::task::JoinHandle; +use tracing::info; #[derive(Debug)] pub struct SourceManager { @@ -100,6 +109,156 @@ impl SourceManager { source.info.last_error = Some(ConnectorError::new(error_message)); } } + + pub async fn stop_connector( + &self, + key: &str, + metrics: &Arc, + ) -> Result<(), RuntimeError> { + let details_arc = self + .sources + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; + + let (task_handles, plugin_id, container) = { + let mut details = details_arc.lock().await; + ( + std::mem::take(&mut details.handler_tasks), + details.info.id, + details.container.clone(), + ) + }; + + source::cleanup_sender(plugin_id); + + for handle in task_handles { + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; + } + + if let Some(container) = &container { + info!("Closing source connector with ID: {plugin_id} for plugin: {key}"); + (container.iggy_source_close)(plugin_id); + info!("Closed source connector with ID: {plugin_id} for plugin: {key}"); + } + + { + let mut details = details_arc.lock().await; + let old_status = details.info.status; + details.info.status = ConnectorStatus::Stopped; + details.info.last_error = None; + if old_status == ConnectorStatus::Running { + metrics.decrement_sources_running(); + } + } + + Ok(()) + } + + pub async fn start_connector( + &self, + key: &str, + config: &SourceConfig, + iggy_client: &IggyClient, + metrics: &Arc, + state_path: &str, + context: &Arc, + ) -> Result<(), RuntimeError> { + let details_arc = self + .sources + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; + + let container = { + let details = details_arc.lock().await; + details.container.clone().ok_or_else(|| { + RuntimeError::InvalidConfiguration(format!( + "No container loaded for source: {key}" + )) + })? + }; + + let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst); + + let state_storage = source::get_state_storage(state_path, key); + let state = match &state_storage { + StateStorage::File(file) => file.load().await?, + }; + + source::init_source( + &container, + &config.plugin_config.clone().unwrap_or_default(), + plugin_id, + state, + )?; + info!("Source connector with ID: {plugin_id} for plugin: {key} initialized successfully."); + + let (producer, encoder, transforms) = + source::setup_source_producer(config, iggy_client).await?; + + let (sender, receiver) = flume::unbounded(); + source::SOURCE_SENDERS.insert(plugin_id, sender); + + let callback = container.iggy_source_handle; + tokio::task::spawn_blocking(move || { + callback(plugin_id, source::handle_produced_messages); + }); + + let plugin_key = key.to_string(); + let verbose = config.verbose; + let context_clone = context.clone(); + let handler_task = tokio::spawn(async move { + source::source_forwarding_loop( + plugin_id, + plugin_key, + verbose, + producer, + encoder, + transforms, + state_storage, + receiver, + context_clone, + ) + .await; + }); + + { + let mut details = details_arc.lock().await; + details.info.id = plugin_id; + details.info.status = ConnectorStatus::Running; + details.info.last_error = None; + details.config = config.clone(); + details.handler_tasks = vec![handler_task]; + metrics.increment_sources_running(); + } + + Ok(()) + } + + pub async fn restart_connector( + &self, + key: &str, + config_provider: &dyn ConnectorsConfigProvider, + iggy_client: &IggyClient, + metrics: &Arc, + state_path: &str, + context: &Arc, + ) -> Result<(), RuntimeError> { + info!("Restarting source connector: {key}"); + self.stop_connector(key, metrics).await?; + + let config = config_provider + .get_source_config(key, None) + .await + .map_err(|e| RuntimeError::InvalidConfiguration(e.to_string()))? + .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; + + self.start_connector(key, &config, iggy_client, metrics, state_path, context) + .await?; + info!("Source connector: {key} restarted successfully."); + Ok(()) + } } #[derive(Debug, Clone)] diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 8fe4225100..8708f24e50 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -22,6 +22,7 @@ use dlopen2::wrapper::Container; use flume::{Receiver, Sender}; use iggy::prelude::{ DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, IggyMessage, + IggyProducer, }; use iggy_connector_sdk::{ ConnectorState, DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, @@ -218,7 +219,7 @@ fn get_plugin_version(container: &Container) -> String { } } -fn init_source( +pub(crate) fn init_source( container: &Container, plugin_config: &serde_json::Value, id: u32, @@ -246,11 +247,202 @@ fn init_source( } } -fn get_state_storage(state_path: &str, key: &str) -> StateStorage { +pub(crate) fn get_state_storage(state_path: &str, key: &str) -> StateStorage { let path = format!("{state_path}/source_{key}.state"); StateStorage::File(FileStateProvider::new(path)) } +pub(crate) async fn setup_source_producer( + config: &SourceConfig, + iggy_client: &IggyClient, +) -> Result<(IggyProducer, Arc, Vec>), RuntimeError> { + let transforms = if let Some(transforms_config) = &config.transforms { + transform::load(transforms_config).map_err(|error| { + RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) + })? + } else { + vec![] + }; + + let mut last_producer = None; + let mut last_encoder = None; + for stream in config.streams.iter() { + let linger_time = IggyDuration::from_str( + stream.linger_time.as_deref().unwrap_or("5ms"), + ) + .map_err(|error| { + RuntimeError::InvalidConfiguration(format!("Invalid linger time: {error}")) + })?; + let batch_length = stream.batch_length.unwrap_or(1000); + let producer = iggy_client + .producer(&stream.stream, &stream.topic)? + .direct( + DirectConfig::builder() + .batch_length(batch_length) + .linger_time(linger_time) + .build(), + ) + .build(); + producer.init().await?; + last_encoder = Some(stream.schema.encoder()); + last_producer = Some(producer); + } + + let producer = last_producer.ok_or_else(|| { + RuntimeError::InvalidConfiguration("No streams configured for source".to_string()) + })?; + let encoder = last_encoder.ok_or_else(|| { + RuntimeError::InvalidConfiguration("No encoder configured for source".to_string()) + })?; + + Ok((producer, encoder, transforms)) +} + +pub(crate) async fn source_forwarding_loop( + plugin_id: u32, + plugin_key: String, + verbose: bool, + producer: IggyProducer, + encoder: Arc, + transforms: Vec>, + state_storage: StateStorage, + receiver: Receiver, + context: Arc, +) { + info!("Source connector with ID: {plugin_id} started."); + context + .sources + .update_status( + &plugin_key, + ConnectorStatus::Running, + Some(&context.metrics), + ) + .await; + + let mut number = 1u64; + let topic_metadata = TopicMetadata { + stream: producer.stream().to_string(), + topic: producer.topic().to_string(), + }; + + while let Ok(produced_messages) = receiver.recv_async().await { + let count = produced_messages.messages.len(); + context + .metrics + .increment_messages_produced(&plugin_key, count as u64); + if verbose { + info!("Source connector with ID: {plugin_id} received {count} messages"); + } else { + debug!("Source connector with ID: {plugin_id} received {count} messages"); + } + let schema = produced_messages.schema; + let mut messages: Vec = Vec::with_capacity(count); + for message in produced_messages.messages { + let Ok(payload) = schema.try_into_payload(message.payload) else { + error!( + "Failed to decode message payload with schema: {schema} for source connector with ID: {plugin_id}", + ); + continue; + }; + + debug!( + "Source connector with ID: {plugin_id}] received message: {number} | schema: {schema} | payload: {payload}" + ); + messages.push(DecodedMessage { + id: message.id, + offset: None, + headers: message.headers, + checksum: message.checksum, + timestamp: message.timestamp, + origin_timestamp: message.origin_timestamp, + payload, + }); + number += 1; + } + + let Ok(iggy_messages) = process_messages( + plugin_id, + &encoder, + &topic_metadata, + messages, + &transforms, + ) else { + let error_msg = format!( + "Failed to process {count} messages by source connector with ID: {plugin_id} before sending them to stream: {}, topic: {}.", + producer.stream(), + producer.topic() + ); + error!("{error_msg}"); + context + .metrics + .increment_errors(&plugin_key, ConnectorType::Source); + context.sources.set_error(&plugin_key, &error_msg).await; + continue; + }; + + if let Err(error) = producer.send(iggy_messages).await { + let error_msg = format!( + "Failed to send {count} messages to stream: {}, topic: {} by source connector with ID: {plugin_id}. {error}", + producer.stream(), + producer.topic(), + ); + error!("{error_msg}"); + context + .metrics + .increment_errors(&plugin_key, ConnectorType::Source); + context.sources.set_error(&plugin_key, &error_msg).await; + continue; + } + + context + .metrics + .increment_messages_sent(&plugin_key, count as u64); + + if verbose { + info!( + "Sent {count} messages to stream: {}, topic: {} by source connector with ID: {plugin_id}", + producer.stream(), + producer.topic() + ); + } else { + debug!( + "Sent {count} messages to stream: {}, topic: {} by source connector with ID: {plugin_id}", + producer.stream(), + producer.topic() + ); + } + + let Some(state) = produced_messages.state else { + debug!("No state provided for source connector with ID: {plugin_id}"); + continue; + }; + + match &state_storage { + StateStorage::File(file) => { + if let Err(error) = file.save(state).await { + let error_msg = format!( + "Failed to save state for source connector with ID: {plugin_id}. {error}" + ); + error!("{error_msg}"); + context.sources.set_error(&plugin_key, &error_msg).await; + continue; + } + debug!("State saved for source connector with ID: {plugin_id}"); + } + } + } + + info!("Source connector with ID: {plugin_id} stopped."); + context + .sources + .update_status( + &plugin_key, + ConnectorStatus::Stopped, + Some(&context.metrics), + ) + .await; +} + pub fn handle( sources: Vec, context: Arc, @@ -475,7 +667,7 @@ fn process_messages( Ok(iggy_messages) } -extern "C" fn handle_produced_messages( +pub(crate) extern "C" fn handle_produced_messages( plugin_id: u32, messages_ptr: *const u8, messages_len: usize, From d17e202acd57863a3bfae3187f8921f32edd6df6 Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 23:06:58 +0900 Subject: [PATCH 07/22] feat(connectors): add POST /restart API endpoints for sink and source connectors Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/api/sink.rs | 19 ++++++++++++++++++- core/connectors/runtime/src/api/source.rs | 21 ++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/core/connectors/runtime/src/api/sink.rs b/core/connectors/runtime/src/api/sink.rs index 0a2c8791bc..0b20425ff5 100644 --- a/core/connectors/runtime/src/api/sink.rs +++ b/core/connectors/runtime/src/api/sink.rs @@ -30,7 +30,7 @@ use axum::{ extract::{Path, Query, State}, http::{HeaderMap, StatusCode, header}, response::IntoResponse, - routing::get, + routing::{get, post}, }; use serde::Deserialize; use std::sync::Arc; @@ -52,6 +52,7 @@ pub fn router(state: Arc) -> Router { "/sinks/{key}/configs/active", get(get_sink_active_config).put(update_sink_active_config), ) + .route("/sinks/{key}/restart", post(restart_sink)) .with_state(state) } @@ -246,3 +247,19 @@ async fn delete_sink_config( .await?; Ok(StatusCode::NO_CONTENT) } + +async fn restart_sink( + State(context): State>, + Path(key): Path, +) -> Result { + context + .sinks + .restart_connector( + &key, + context.config_provider.as_ref(), + &context.iggy_clients.consumer, + &context.metrics, + ) + .await?; + Ok(StatusCode::NO_CONTENT) +} diff --git a/core/connectors/runtime/src/api/source.rs b/core/connectors/runtime/src/api/source.rs index 7db1c9974f..0387421fb4 100644 --- a/core/connectors/runtime/src/api/source.rs +++ b/core/connectors/runtime/src/api/source.rs @@ -30,7 +30,7 @@ use axum::{ extract::{Path, Query, State}, http::{HeaderMap, StatusCode, header}, response::IntoResponse, - routing::get, + routing::{get, post}, }; use serde::Deserialize; use std::sync::Arc; @@ -55,6 +55,7 @@ pub fn router(state: Arc) -> Router { "/sources/{key}/configs/active", get(get_source_active_config).put(update_source_active_config), ) + .route("/sources/{key}/restart", post(restart_source)) .with_state(state) } @@ -249,3 +250,21 @@ async fn delete_source_config( .await?; Ok(StatusCode::NO_CONTENT) } + +async fn restart_source( + State(context): State>, + Path(key): Path, +) -> Result { + context + .sources + .restart_connector( + &key, + context.config_provider.as_ref(), + &context.iggy_clients.producer, + &context.metrics, + &context.state_path, + &context, + ) + .await?; + Ok(StatusCode::NO_CONTENT) +} From d46727eefb25305cc984b2aa5b0196cb2faebd2d Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 23:14:45 +0900 Subject: [PATCH 08/22] refactor(connectors): replace direct shutdown with Manager-based stops - source::handle() now stores handler_tasks in SourceManager via async spawn - main.rs shutdown uses Manager's stop_connector() for both sources and sinks - Remove SinkWithPlugins and SourceWithPlugins structs (no longer needed) - Remove unused shutdown_tx field from SourceDetails Signed-off-by: shin --- core/connectors/runtime/src/context.rs | 1 - core/connectors/runtime/src/main.rs | 83 ++++++------------- core/connectors/runtime/src/manager/source.rs | 3 +- core/connectors/runtime/src/source.rs | 16 ++-- 4 files changed, 34 insertions(+), 69 deletions(-) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index 8ecca45742..3347d3a6cc 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -155,7 +155,6 @@ fn map_sources( plugin_config_format: source_plugin.config_format, }, config: source_config.clone(), - shutdown_tx: None, handler_tasks: vec![], container: None, }); diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 4a18078aef..ba85169657 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -40,7 +40,7 @@ use std::{ env, sync::{Arc, atomic::AtomicU32}, }; -use tracing::info; +use tracing::{error, info}; mod api; pub(crate) mod configs; @@ -166,17 +166,10 @@ async fn main() -> Result<(), RuntimeError> { let sinks = sink::init(sinks_config.clone(), &iggy_clients.consumer).await?; let mut sink_wrappers = vec![]; - let mut sink_with_plugins = HashMap::new(); let mut sink_containers_by_key: HashMap>> = HashMap::new(); - for (path, sink) in sinks { + for (_path, sink) in sinks { let container = Arc::new(sink.container); let callback = container.iggy_sink_consume; - let plugin_ids = sink - .plugins - .iter() - .filter(|plugin| plugin.error.is_none()) - .map(|plugin| plugin.id) - .collect(); for plugin in &sink.plugins { sink_containers_by_key.insert(plugin.key.clone(), container.clone()); } @@ -184,27 +177,13 @@ async fn main() -> Result<(), RuntimeError> { callback, plugins: sink.plugins, }); - sink_with_plugins.insert( - path, - SinkWithPlugins { - container, - plugin_ids, - }, - ); } let mut source_wrappers = vec![]; - let mut source_with_plugins = HashMap::new(); let mut source_containers_by_key: HashMap>> = HashMap::new(); - for (path, source) in sources { + for (_path, source) in sources { let container = Arc::new(source.container); let callback = container.iggy_source_handle; - let plugin_ids = source - .plugins - .iter() - .filter(|plugin| plugin.error.is_none()) - .map(|plugin| plugin.id) - .collect(); for plugin in &source.plugins { source_containers_by_key.insert(plugin.key.clone(), container.clone()); } @@ -212,13 +191,6 @@ async fn main() -> Result<(), RuntimeError> { callback, plugins: source.plugins, }); - source_with_plugins.insert( - path, - SourceWithPlugins { - container, - plugin_ids, - }, - ); } let context = context::init( @@ -247,7 +219,7 @@ async fn main() -> Result<(), RuntimeError> { let context = Arc::new(context); api::init(&config.http, context.clone()).await; - let source_handler_tasks = source::handle(source_wrappers, context.clone()); + source::handle(source_wrappers, context.clone()); sink::consume(sink_wrappers, context.clone()); info!("All sources and sinks spawned."); @@ -270,26 +242,29 @@ async fn main() -> Result<(), RuntimeError> { } } - for (key, source) in source_with_plugins { - for id in source.plugin_ids { - info!("Closing source connector with ID: {id} for plugin: {key}"); - source.container.iggy_source_close(id); - source::cleanup_sender(id); - info!("Closed source connector with ID: {id} for plugin: {key}"); + let source_keys: Vec = context + .sources + .get_all() + .await + .into_iter() + .map(|s| s.key) + .collect(); + for key in &source_keys { + if let Err(err) = context.sources.stop_connector(key, &context.metrics).await { + error!("Failed to stop source connector: {key}. {err}"); } } - // Wait for source handler tasks to drain remaining messages and persist state - // before shutting down the Iggy clients they depend on. - for handle in source_handler_tasks { - let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await; - } - - for (key, sink) in sink_with_plugins { - for id in sink.plugin_ids { - info!("Closing sink connector with ID: {id} for plugin: {key}"); - sink.container.iggy_sink_close(id); - info!("Closed sink connector with ID: {id} for plugin: {key}"); + let sink_keys: Vec = context + .sinks + .get_all() + .await + .into_iter() + .map(|s| s.key) + .collect(); + for key in &sink_keys { + if let Err(err) = context.sinks.stop_connector(key, &context.metrics).await { + error!("Failed to stop sink connector: {key}. {err}"); } } @@ -427,11 +402,6 @@ struct SinkConnectorWrapper { plugins: Vec, } -struct SinkWithPlugins { - container: Arc>, - plugin_ids: Vec, -} - struct SourceConnector { container: Container, plugins: Vec, @@ -456,11 +426,6 @@ struct SourceConnectorProducer { producer: IggyProducer, } -struct SourceWithPlugins { - container: Arc>, - plugin_ids: Vec, -} - struct SourceConnectorWrapper { callback: HandleCallback, plugins: Vec, diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 2d0939631f..23780041e6 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -33,7 +33,7 @@ use std::fmt; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::Duration; -use tokio::sync::{Mutex, watch}; +use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::info; @@ -277,7 +277,6 @@ pub struct SourceInfo { pub struct SourceDetails { pub info: SourceInfo, pub config: SourceConfig, - pub shutdown_tx: Option>, pub handler_tasks: Vec>, pub container: Option>>, } diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 8708f24e50..f09878abdf 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -443,14 +443,12 @@ pub(crate) async fn source_forwarding_loop( .await; } -pub fn handle( - sources: Vec, - context: Arc, -) -> Vec> { - let mut handler_tasks = Vec::new(); +pub fn handle(sources: Vec, context: Arc) { for source in sources { for plugin in source.plugins { let plugin_id = plugin.id; + let plugin_key_for_store = plugin.key.clone(); + let context_for_store = context.clone(); let plugin_key = plugin.key.clone(); let context = context.clone(); @@ -617,10 +615,14 @@ pub fn handle( ) .await; }); - handler_tasks.push(handler_task); + tokio::spawn(async move { + if let Some(details) = context_for_store.sources.get(&plugin_key_for_store).await { + let mut details = details.lock().await; + details.handler_tasks = vec![handler_task]; + } + }); } } - handler_tasks } fn process_messages( From 3d681cdcdf2bc4707769fc0cb55f7736900e218c Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 19 Feb 2026 23:15:52 +0900 Subject: [PATCH 09/22] refactor(connectors): fmt and clippy fixes for restart support Signed-off-by: shin --- core/connectors/runtime/src/context.rs | 1 + core/connectors/runtime/src/manager/sink.rs | 4 +-- core/connectors/runtime/src/manager/source.rs | 4 +-- core/connectors/runtime/src/sink.rs | 12 ++++---- core/connectors/runtime/src/source.rs | 30 ++++++++++--------- 5 files changed, 25 insertions(+), 26 deletions(-) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index 3347d3a6cc..c5a6279299 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -45,6 +45,7 @@ pub struct RuntimeContext { pub state_path: String, } +#[allow(clippy::too_many_arguments)] pub fn init( config: &ConnectorsRuntimeConfig, sinks_config: &HashMap, diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 58c2c2cd82..8eea202a8b 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -172,9 +172,7 @@ impl SinkManager { let container = { let details = details_arc.lock().await; details.container.clone().ok_or_else(|| { - RuntimeError::InvalidConfiguration(format!( - "No container loaded for sink: {key}" - )) + RuntimeError::InvalidConfiguration(format!("No container loaded for sink: {key}")) })? }; diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 23780041e6..53cd8775fa 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -173,9 +173,7 @@ impl SourceManager { let container = { let details = details_arc.lock().await; details.container.clone().ok_or_else(|| { - RuntimeError::InvalidConfiguration(format!( - "No container loaded for source: {key}" - )) + RuntimeError::InvalidConfiguration(format!("No container loaded for source: {key}")) })? }; diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 320e090456..2b3dbaed9e 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -429,12 +429,12 @@ pub(crate) async fn setup_sink_consumers( let mut consumers = Vec::new(); for stream in config.streams.iter() { - let poll_interval = - IggyDuration::from_str(stream.poll_interval.as_deref().unwrap_or("5ms")).map_err( - |error| { - RuntimeError::InvalidConfiguration(format!("Invalid poll interval: {error}")) - }, - )?; + let poll_interval = IggyDuration::from_str( + stream.poll_interval.as_deref().unwrap_or("5ms"), + ) + .map_err(|error| { + RuntimeError::InvalidConfiguration(format!("Invalid poll interval: {error}")) + })?; let default_consumer_group = format!("iggy-connect-sink-{key}"); let consumer_group = stream .consumer_group diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index f09878abdf..ad8e7c8023 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -255,7 +255,14 @@ pub(crate) fn get_state_storage(state_path: &str, key: &str) -> StateStorage { pub(crate) async fn setup_source_producer( config: &SourceConfig, iggy_client: &IggyClient, -) -> Result<(IggyProducer, Arc, Vec>), RuntimeError> { +) -> Result< + ( + IggyProducer, + Arc, + Vec>, + ), + RuntimeError, +> { let transforms = if let Some(transforms_config) = &config.transforms { transform::load(transforms_config).map_err(|error| { RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) @@ -267,12 +274,10 @@ pub(crate) async fn setup_source_producer( let mut last_producer = None; let mut last_encoder = None; for stream in config.streams.iter() { - let linger_time = IggyDuration::from_str( - stream.linger_time.as_deref().unwrap_or("5ms"), - ) - .map_err(|error| { - RuntimeError::InvalidConfiguration(format!("Invalid linger time: {error}")) - })?; + let linger_time = IggyDuration::from_str(stream.linger_time.as_deref().unwrap_or("5ms")) + .map_err(|error| { + RuntimeError::InvalidConfiguration(format!("Invalid linger time: {error}")) + })?; let batch_length = stream.batch_length.unwrap_or(1000); let producer = iggy_client .producer(&stream.stream, &stream.topic)? @@ -298,6 +303,7 @@ pub(crate) async fn setup_source_producer( Ok((producer, encoder, transforms)) } +#[allow(clippy::too_many_arguments)] pub(crate) async fn source_forwarding_loop( plugin_id: u32, plugin_key: String, @@ -360,13 +366,9 @@ pub(crate) async fn source_forwarding_loop( number += 1; } - let Ok(iggy_messages) = process_messages( - plugin_id, - &encoder, - &topic_metadata, - messages, - &transforms, - ) else { + let Ok(iggy_messages) = + process_messages(plugin_id, &encoder, &topic_metadata, messages, &transforms) + else { let error_msg = format!( "Failed to process {count} messages by source connector with ID: {plugin_id} before sending them to stream: {}, topic: {}.", producer.stream(), From 20bd79b50c00f8b0d44edec3ca049e930131b855 Mon Sep 17 00:00:00 2001 From: shin Date: Fri, 20 Feb 2026 00:09:17 +0900 Subject: [PATCH 10/22] test(connectors): add unit tests for SinkManager and SourceManager Cover CRUD operations, status transitions, metrics integration, shutdown signal handling, error clearing, and edge cases for the connector restart lifecycle methods. Signed-off-by: shin --- core/connectors/runtime/src/manager/sink.rs | 280 ++++++++++++++++++ core/connectors/runtime/src/manager/source.rs | 274 +++++++++++++++++ 2 files changed, 554 insertions(+) diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 8eea202a8b..2455738ccd 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -287,3 +287,283 @@ impl fmt::Debug for SinkDetails { .finish() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::configs::connectors::SinkConfig; + + fn create_test_sink_info(key: &str, id: u32) -> SinkInfo { + SinkInfo { + id, + key: key.to_string(), + name: format!("{key} sink"), + path: format!("/path/to/{key}"), + version: "1.0.0".to_string(), + enabled: true, + status: ConnectorStatus::Running, + last_error: None, + plugin_config_format: None, + } + } + + fn create_test_sink_details(key: &str, id: u32) -> SinkDetails { + SinkDetails { + info: create_test_sink_info(key, id), + config: SinkConfig { + key: key.to_string(), + enabled: true, + version: 1, + name: format!("{key} sink"), + path: format!("/path/to/{key}"), + ..Default::default() + }, + shutdown_tx: None, + task_handles: vec![], + container: None, + } + } + + #[tokio::test] + async fn should_create_manager_with_sinks() { + let manager = SinkManager::new(vec![ + create_test_sink_details("es", 1), + create_test_sink_details("pg", 2), + ]); + + let all = manager.get_all().await; + assert_eq!(all.len(), 2); + } + + #[tokio::test] + async fn should_get_existing_sink() { + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + + let sink = manager.get("es").await; + assert!(sink.is_some()); + let binding = sink.unwrap(); + let details = binding.lock().await; + assert_eq!(details.info.key, "es"); + assert_eq!(details.info.id, 1); + } + + #[tokio::test] + async fn should_return_none_for_unknown_key() { + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + + assert!(manager.get("nonexistent").await.is_none()); + } + + #[tokio::test] + async fn should_get_config() { + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + + let config = manager.get_config("es").await; + assert!(config.is_some()); + assert_eq!(config.unwrap().key, "es"); + } + + #[tokio::test] + async fn should_return_none_config_for_unknown_key() { + let manager = SinkManager::new(vec![]); + + assert!(manager.get_config("nonexistent").await.is_none()); + } + + #[tokio::test] + async fn should_get_all_sinks() { + let manager = SinkManager::new(vec![ + create_test_sink_details("es", 1), + create_test_sink_details("pg", 2), + create_test_sink_details("stdout", 3), + ]); + + let all = manager.get_all().await; + assert_eq!(all.len(), 3); + let keys: Vec = all.iter().map(|s| s.key.clone()).collect(); + assert!(keys.contains(&"es".to_string())); + assert!(keys.contains(&"pg".to_string())); + assert!(keys.contains(&"stdout".to_string())); + } + + #[tokio::test] + async fn should_update_status() { + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + + manager + .update_status("es", ConnectorStatus::Stopped, None) + .await; + + let sink = manager.get("es").await.unwrap(); + let details = sink.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Stopped); + } + + #[tokio::test] + async fn should_increment_metrics_when_transitioning_to_running() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_sink_details("es", 1); + details.info.status = ConnectorStatus::Stopped; + let manager = SinkManager::new(vec![details]); + + manager + .update_status("es", ConnectorStatus::Running, Some(&metrics)) + .await; + + assert_eq!(metrics.get_sinks_running(), 1); + } + + #[tokio::test] + async fn should_decrement_metrics_when_leaving_running() { + let metrics = Arc::new(Metrics::init()); + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + metrics.increment_sinks_running(); + + manager + .update_status("es", ConnectorStatus::Stopped, Some(&metrics)) + .await; + + assert_eq!(metrics.get_sinks_running(), 0); + } + + #[tokio::test] + async fn should_clear_error_when_status_becomes_running() { + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + manager.set_error("es", "some error").await; + + manager + .update_status("es", ConnectorStatus::Running, None) + .await; + + let sink = manager.get("es").await.unwrap(); + let details = sink.lock().await; + assert!(details.info.last_error.is_none()); + } + + #[tokio::test] + async fn should_set_error_status_and_message() { + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + + manager.set_error("es", "connection failed").await; + + let sink = manager.get("es").await.unwrap(); + let details = sink.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Error); + assert!(details.info.last_error.is_some()); + } + + #[tokio::test] + async fn stop_should_return_not_found_for_unknown_key() { + let metrics = Arc::new(Metrics::init()); + let manager = SinkManager::new(vec![]); + + let result = manager.stop_connector("nonexistent", &metrics).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(matches!(err, RuntimeError::SinkNotFound(_))); + } + + #[tokio::test] + async fn stop_should_send_shutdown_signal_and_update_status() { + let metrics = Arc::new(Metrics::init()); + metrics.increment_sinks_running(); + let (shutdown_tx, mut shutdown_rx) = watch::channel(()); + let handle = tokio::spawn(async move { + let _ = shutdown_rx.changed().await; + }); + let mut details = create_test_sink_details("es", 1); + details.shutdown_tx = Some(shutdown_tx); + details.task_handles = vec![handle]; + let manager = SinkManager::new(vec![details]); + + let result = manager.stop_connector("es", &metrics).await; + assert!(result.is_ok()); + + let sink = manager.get("es").await.unwrap(); + let details = sink.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Stopped); + assert!(details.shutdown_tx.is_none()); + assert!(details.task_handles.is_empty()); + } + + #[tokio::test] + async fn stop_should_work_without_container() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_sink_details("es", 1); + details.container = None; + details.info.status = ConnectorStatus::Stopped; + let manager = SinkManager::new(vec![details]); + + let result = manager.stop_connector("es", &metrics).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn stop_should_decrement_metrics_from_running() { + let metrics = Arc::new(Metrics::init()); + metrics.increment_sinks_running(); + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + + manager.stop_connector("es", &metrics).await.unwrap(); + + assert_eq!(metrics.get_sinks_running(), 0); + } + + #[tokio::test] + async fn should_clear_error_when_status_becomes_stopped() { + let manager = SinkManager::new(vec![create_test_sink_details("es", 1)]); + manager.set_error("es", "some error").await; + + manager + .update_status("es", ConnectorStatus::Stopped, None) + .await; + + let sink = manager.get("es").await.unwrap(); + let details = sink.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Stopped); + assert!(details.info.last_error.is_none()); + } + + #[tokio::test] + async fn stop_should_clear_last_error() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_sink_details("es", 1); + details.info.status = ConnectorStatus::Error; + details.info.last_error = Some(ConnectorError::new("previous error")); + let manager = SinkManager::new(vec![details]); + + manager.stop_connector("es", &metrics).await.unwrap(); + + let sink = manager.get("es").await.unwrap(); + let details = sink.lock().await; + assert!(details.info.last_error.is_none()); + } + + #[tokio::test] + async fn stop_should_not_decrement_metrics_from_non_running() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_sink_details("es", 1); + details.info.status = ConnectorStatus::Stopped; + let manager = SinkManager::new(vec![details]); + + manager.stop_connector("es", &metrics).await.unwrap(); + + assert_eq!(metrics.get_sinks_running(), 0); + } + + #[tokio::test] + async fn update_status_should_be_noop_for_unknown_key() { + let manager = SinkManager::new(vec![]); + + manager + .update_status("nonexistent", ConnectorStatus::Running, None) + .await; + } + + #[tokio::test] + async fn set_error_should_be_noop_for_unknown_key() { + let manager = SinkManager::new(vec![]); + + manager.set_error("nonexistent", "some error").await; + } +} diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 53cd8775fa..ea06eb03e5 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -288,3 +288,277 @@ impl fmt::Debug for SourceDetails { .finish() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::configs::connectors::SourceConfig; + + fn create_test_source_info(key: &str, id: u32) -> SourceInfo { + SourceInfo { + id, + key: key.to_string(), + name: format!("{key} source"), + path: format!("/path/to/{key}"), + version: "1.0.0".to_string(), + enabled: true, + status: ConnectorStatus::Running, + last_error: None, + plugin_config_format: None, + } + } + + fn create_test_source_details(key: &str, id: u32) -> SourceDetails { + SourceDetails { + info: create_test_source_info(key, id), + config: SourceConfig { + key: key.to_string(), + enabled: true, + version: 1, + name: format!("{key} source"), + path: format!("/path/to/{key}"), + ..Default::default() + }, + handler_tasks: vec![], + container: None, + } + } + + #[tokio::test] + async fn should_create_manager_with_sources() { + let manager = SourceManager::new(vec![ + create_test_source_details("pg", 1), + create_test_source_details("random", 2), + ]); + + let all = manager.get_all().await; + assert_eq!(all.len(), 2); + } + + #[tokio::test] + async fn should_get_existing_source() { + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + + let source = manager.get("pg").await; + assert!(source.is_some()); + let binding = source.unwrap(); + let details = binding.lock().await; + assert_eq!(details.info.key, "pg"); + assert_eq!(details.info.id, 1); + } + + #[tokio::test] + async fn should_return_none_for_unknown_key() { + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + + assert!(manager.get("nonexistent").await.is_none()); + } + + #[tokio::test] + async fn should_get_config() { + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + + let config = manager.get_config("pg").await; + assert!(config.is_some()); + assert_eq!(config.unwrap().key, "pg"); + } + + #[tokio::test] + async fn should_return_none_config_for_unknown_key() { + let manager = SourceManager::new(vec![]); + + assert!(manager.get_config("nonexistent").await.is_none()); + } + + #[tokio::test] + async fn should_get_all_sources() { + let manager = SourceManager::new(vec![ + create_test_source_details("pg", 1), + create_test_source_details("random", 2), + create_test_source_details("es", 3), + ]); + + let all = manager.get_all().await; + assert_eq!(all.len(), 3); + let keys: Vec = all.iter().map(|s| s.key.clone()).collect(); + assert!(keys.contains(&"pg".to_string())); + assert!(keys.contains(&"random".to_string())); + assert!(keys.contains(&"es".to_string())); + } + + #[tokio::test] + async fn should_update_status() { + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + + manager + .update_status("pg", ConnectorStatus::Stopped, None) + .await; + + let source = manager.get("pg").await.unwrap(); + let details = source.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Stopped); + } + + #[tokio::test] + async fn should_increment_metrics_when_transitioning_to_running() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_source_details("pg", 1); + details.info.status = ConnectorStatus::Stopped; + let manager = SourceManager::new(vec![details]); + + manager + .update_status("pg", ConnectorStatus::Running, Some(&metrics)) + .await; + + assert_eq!(metrics.get_sources_running(), 1); + } + + #[tokio::test] + async fn should_decrement_metrics_when_leaving_running() { + let metrics = Arc::new(Metrics::init()); + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + metrics.increment_sources_running(); + + manager + .update_status("pg", ConnectorStatus::Stopped, Some(&metrics)) + .await; + + assert_eq!(metrics.get_sources_running(), 0); + } + + #[tokio::test] + async fn should_clear_error_when_status_becomes_running() { + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + manager.set_error("pg", "some error").await; + + manager + .update_status("pg", ConnectorStatus::Running, None) + .await; + + let source = manager.get("pg").await.unwrap(); + let details = source.lock().await; + assert!(details.info.last_error.is_none()); + } + + #[tokio::test] + async fn should_set_error_status_and_message() { + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + + manager.set_error("pg", "connection failed").await; + + let source = manager.get("pg").await.unwrap(); + let details = source.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Error); + assert!(details.info.last_error.is_some()); + } + + #[tokio::test] + async fn stop_should_return_not_found_for_unknown_key() { + let metrics = Arc::new(Metrics::init()); + let manager = SourceManager::new(vec![]); + + let result = manager.stop_connector("nonexistent", &metrics).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(matches!(err, RuntimeError::SourceNotFound(_))); + } + + #[tokio::test] + async fn stop_should_drain_tasks_and_update_status() { + let metrics = Arc::new(Metrics::init()); + metrics.increment_sources_running(); + let handle = tokio::spawn(async {}); + let mut details = create_test_source_details("pg", 1); + details.handler_tasks = vec![handle]; + let manager = SourceManager::new(vec![details]); + + let result = manager.stop_connector("pg", &metrics).await; + assert!(result.is_ok()); + + let source = manager.get("pg").await.unwrap(); + let details = source.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Stopped); + assert!(details.handler_tasks.is_empty()); + } + + #[tokio::test] + async fn stop_should_work_without_container() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_source_details("pg", 1); + details.container = None; + details.info.status = ConnectorStatus::Stopped; + let manager = SourceManager::new(vec![details]); + + let result = manager.stop_connector("pg", &metrics).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn stop_should_decrement_metrics_from_running() { + let metrics = Arc::new(Metrics::init()); + metrics.increment_sources_running(); + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + + manager.stop_connector("pg", &metrics).await.unwrap(); + + assert_eq!(metrics.get_sources_running(), 0); + } + + #[tokio::test] + async fn should_clear_error_when_status_becomes_stopped() { + let manager = SourceManager::new(vec![create_test_source_details("pg", 1)]); + manager.set_error("pg", "some error").await; + + manager + .update_status("pg", ConnectorStatus::Stopped, None) + .await; + + let source = manager.get("pg").await.unwrap(); + let details = source.lock().await; + assert_eq!(details.info.status, ConnectorStatus::Stopped); + assert!(details.info.last_error.is_none()); + } + + #[tokio::test] + async fn stop_should_clear_last_error() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_source_details("pg", 1); + details.info.status = ConnectorStatus::Error; + details.info.last_error = Some(ConnectorError::new("previous error")); + let manager = SourceManager::new(vec![details]); + + manager.stop_connector("pg", &metrics).await.unwrap(); + + let source = manager.get("pg").await.unwrap(); + let details = source.lock().await; + assert!(details.info.last_error.is_none()); + } + + #[tokio::test] + async fn stop_should_not_decrement_metrics_from_non_running() { + let metrics = Arc::new(Metrics::init()); + let mut details = create_test_source_details("pg", 1); + details.info.status = ConnectorStatus::Stopped; + let manager = SourceManager::new(vec![details]); + + manager.stop_connector("pg", &metrics).await.unwrap(); + + assert_eq!(metrics.get_sources_running(), 0); + } + + #[tokio::test] + async fn update_status_should_be_noop_for_unknown_key() { + let manager = SourceManager::new(vec![]); + + manager + .update_status("nonexistent", ConnectorStatus::Running, None) + .await; + } + + #[tokio::test] + async fn set_error_should_be_noop_for_unknown_key() { + let manager = SourceManager::new(vec![]); + + manager.set_error("nonexistent", "some error").await; + } +} From ee4a14cec7952a3e50897db4a9a522ccdb9321ce Mon Sep 17 00:00:00 2001 From: shin Date: Fri, 20 Feb 2026 01:07:34 +0900 Subject: [PATCH 11/22] fix(connectors): eliminate race condition in initial startup handle storage Replace detached tokio::spawn for storing shutdown handles with synchronous return from consume() and handle(), preventing race where restart API could be called before handles are stored. Also deduplicate ~145 lines in source::handle() by reusing source_forwarding_loop(). Signed-off-by: shin --- core/connectors/runtime/src/main.rs | 19 ++- core/connectors/runtime/src/sink.rs | 18 ++- core/connectors/runtime/src/source.rs | 186 +++++--------------------- 3 files changed, 55 insertions(+), 168 deletions(-) diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index ba85169657..51428c62e4 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -219,8 +219,23 @@ async fn main() -> Result<(), RuntimeError> { let context = Arc::new(context); api::init(&config.http, context.clone()).await; - source::handle(source_wrappers, context.clone()); - sink::consume(sink_wrappers, context.clone()); + let source_handles = source::handle(source_wrappers, context.clone()); + for (key, handler_tasks) in source_handles { + if let Some(details) = context.sources.get(&key).await { + let mut details = details.lock().await; + details.handler_tasks = handler_tasks; + } + } + + let sink_handles = sink::consume(sink_wrappers, context.clone()); + for (key, shutdown_tx, task_handles) in sink_handles { + if let Some(details) = context.sinks.get(&key).await { + let mut details = details.lock().await; + details.shutdown_tx = Some(shutdown_tx); + details.task_handles = task_handles; + } + } + info!("All sources and sinks spawned."); #[cfg(unix)] diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 2b3dbaed9e..24b171d9d3 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -43,6 +43,7 @@ use std::{ time::Instant, }; use tokio::sync::watch; +use tokio::task::JoinHandle; use tracing::{debug, error, info, warn}; pub async fn init( @@ -196,7 +197,11 @@ pub async fn init( Ok(sink_connectors) } -pub fn consume(sinks: Vec, context: Arc) { +pub fn consume( + sinks: Vec, + context: Arc, +) -> Vec<(String, watch::Sender<()>, Vec>)> { + let mut handles = Vec::new(); for sink in sinks { for plugin in sink.plugins { if let Some(error) = &plugin.error { @@ -258,17 +263,10 @@ pub fn consume(sinks: Vec, context: Arc) { task_handles.push(handle); } - let plugin_key = plugin.key.clone(); - let context_clone = context.clone(); - tokio::spawn(async move { - if let Some(details) = context_clone.sinks.get(&plugin_key).await { - let mut details = details.lock().await; - details.shutdown_tx = Some(shutdown_tx); - details.task_handles = task_handles; - } - }); + handles.push((plugin.key.clone(), shutdown_tx, task_handles)); } } + handles } #[allow(clippy::too_many_arguments)] diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index ad8e7c8023..e290313211 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -47,6 +47,7 @@ use crate::{ transform, }; use iggy_connector_sdk::api::ConnectorStatus; +use tokio::task::JoinHandle; pub static SOURCE_SENDERS: Lazy>> = Lazy::new(DashMap::new); @@ -445,12 +446,14 @@ pub(crate) async fn source_forwarding_loop( .await; } -pub fn handle(sources: Vec, context: Arc) { +pub fn handle( + sources: Vec, + context: Arc, +) -> Vec<(String, Vec>)> { + let mut handles = Vec::new(); for source in sources { for plugin in source.plugins { let plugin_id = plugin.id; - let plugin_key_for_store = plugin.key.clone(); - let context_for_store = context.clone(); let plugin_key = plugin.key.clone(); let context = context.clone(); @@ -462,169 +465,40 @@ pub fn handle(sources: Vec, context: Arc } info!("Starting handler for source connector with ID: {plugin_id}..."); - let handle = source.callback; + let callback = source.callback; tokio::task::spawn_blocking(move || { - handle(plugin_id, handle_produced_messages); + callback(plugin_id, handle_produced_messages); }); info!("Handler for source connector with ID: {plugin_id} started successfully."); - let (sender, receiver): (Sender, Receiver) = - flume::unbounded(); + let (sender, receiver) = flume::unbounded(); SOURCE_SENDERS.insert(plugin_id, sender); - let handler_task = tokio::spawn(async move { - info!("Source connector with ID: {plugin_id} started."); - let Some(producer) = &plugin.producer else { - error!("Producer not initialized for source connector with ID: {plugin_id}"); - context - .sources - .set_error(&plugin_key, "Producer not initialized") - .await; - return; - }; - - context - .sources - .update_status( - &plugin_key, - ConnectorStatus::Running, - Some(&context.metrics), - ) - .await; - let encoder = producer.encoder.clone(); - let producer = &producer.producer; - let mut number = 1u64; - - let topic_metadata = TopicMetadata { - stream: producer.stream().to_string(), - topic: producer.topic().to_string(), - }; - - while let Ok(produced_messages) = receiver.recv_async().await { - let count = produced_messages.messages.len(); - context - .metrics - .increment_messages_produced(&plugin_key, count as u64); - if plugin.verbose { - info!("Source connector with ID: {plugin_id} received {count} messages"); - } else { - debug!("Source connector with ID: {plugin_id} received {count} messages"); - } - let schema = produced_messages.schema; - let mut messages: Vec = Vec::with_capacity(count); - for message in produced_messages.messages { - let Ok(payload) = schema.try_into_payload(message.payload) else { - error!( - "Failed to decode message payload with schema: {} for source connector with ID: {plugin_id}", - produced_messages.schema - ); - continue; - }; - - debug!( - "Source connector with ID: {plugin_id}] received message: {number} | schema: {schema} | payload: {payload}" - ); - messages.push(DecodedMessage { - id: message.id, - offset: None, - headers: message.headers, - checksum: message.checksum, - timestamp: message.timestamp, - origin_timestamp: message.origin_timestamp, - payload, - }); - number += 1; - } - - let Ok(iggy_messages) = process_messages( - plugin_id, - &encoder, - &topic_metadata, - messages, - &plugin.transforms, - ) else { - let error_msg = format!( - "Failed to process {count} messages by source connector with ID: {plugin_id} before sending them to stream: {}, topic: {}.", - producer.stream(), - producer.topic() - ); - error!("{error_msg}"); - context - .metrics - .increment_errors(&plugin_key, ConnectorType::Source); - context.sources.set_error(&plugin_key, &error_msg).await; - continue; - }; - - if let Err(error) = producer.send(iggy_messages).await { - let error_msg = format!( - "Failed to send {count} messages to stream: {}, topic: {} by source connector with ID: {plugin_id}. {error}", - producer.stream(), - producer.topic(), - ); - error!("{error_msg}"); - context - .metrics - .increment_errors(&plugin_key, ConnectorType::Source); - context.sources.set_error(&plugin_key, &error_msg).await; - continue; - } - context - .metrics - .increment_messages_sent(&plugin_key, count as u64); - - if plugin.verbose { - info!( - "Sent {count} messages to stream: {}, topic: {} by source connector with ID: {plugin_id}", - producer.stream(), - producer.topic() - ); - } else { - debug!( - "Sent {count} messages to stream: {}, topic: {} by source connector with ID: {plugin_id}", - producer.stream(), - producer.topic() - ); - } - - let Some(state) = produced_messages.state else { - debug!("No state provided for source connector with ID: {plugin_id}"); - continue; - }; - - match &plugin.state_storage { - StateStorage::File(file) => { - if let Err(error) = file.save(state).await { - let error_msg = format!( - "Failed to save state for source connector with ID: {plugin_id}. {error}" - ); - error!("{error_msg}"); - context.sources.set_error(&plugin_key, &error_msg).await; - continue; - } - debug!("State saved for source connector with ID: {plugin_id}"); - } - } - } + let Some(producer_wrapper) = plugin.producer else { + error!("Producer not initialized for source connector with ID: {plugin_id}"); + continue; + }; - info!("Source connector with ID: {plugin_id} stopped."); - context - .sources - .update_status( - &plugin_key, - ConnectorStatus::Stopped, - Some(&context.metrics), - ) - .await; - }); - tokio::spawn(async move { - if let Some(details) = context_for_store.sources.get(&plugin_key_for_store).await { - let mut details = details.lock().await; - details.handler_tasks = vec![handler_task]; - } + let plugin_key_clone = plugin_key.clone(); + let handler_task = tokio::spawn(async move { + source_forwarding_loop( + plugin_id, + plugin_key_clone, + plugin.verbose, + producer_wrapper.producer, + producer_wrapper.encoder, + plugin.transforms, + plugin.state_storage, + receiver, + context, + ) + .await; }); + + handles.push((plugin_key, vec![handler_task])); } } + handles } fn process_messages( From 02a5ccf6d27b1608f5a2483d4bfb86d55b2f9d02 Mon Sep 17 00:00:00 2001 From: shin Date: Fri, 20 Feb 2026 01:11:40 +0900 Subject: [PATCH 12/22] fix(connectors): prevent resource leak in source handle() when producer is None Move producer check before spawn_blocking and SOURCE_SENDERS insert to match start_connector() pattern. Signed-off-by: shin --- core/connectors/runtime/src/source.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index e290313211..0aaa227be6 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -465,20 +465,20 @@ pub fn handle( } info!("Starting handler for source connector with ID: {plugin_id}..."); + let Some(producer_wrapper) = plugin.producer else { + error!("Producer not initialized for source connector with ID: {plugin_id}"); + continue; + }; + + let (sender, receiver) = flume::unbounded(); + SOURCE_SENDERS.insert(plugin_id, sender); + let callback = source.callback; tokio::task::spawn_blocking(move || { callback(plugin_id, handle_produced_messages); }); info!("Handler for source connector with ID: {plugin_id} started successfully."); - let (sender, receiver) = flume::unbounded(); - SOURCE_SENDERS.insert(plugin_id, sender); - - let Some(producer_wrapper) = plugin.producer else { - error!("Producer not initialized for source connector with ID: {plugin_id}"); - continue; - }; - let plugin_key_clone = plugin_key.clone(); let handler_task = tokio::spawn(async move { source_forwarding_loop( From 6d9fc15725c2c90888e0dd25774b0ed70ec5c35a Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 1 Mar 2026 20:23:10 +0900 Subject: [PATCH 13/22] refactor(connectors): add restart concurrency guard to SinkDetails and SourceDetails Signed-off-by: shin --- core/connectors/runtime/src/context.rs | 3 +++ core/connectors/runtime/src/manager/sink.rs | 13 +++++++++++++ core/connectors/runtime/src/manager/source.rs | 13 +++++++++++++ 3 files changed, 29 insertions(+) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index c5a6279299..a87095463b 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -32,6 +32,7 @@ use iggy_connector_sdk::api::ConnectorError; use iggy_connector_sdk::api::ConnectorStatus; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::Mutex; use tracing::error; pub struct RuntimeContext { @@ -114,6 +115,7 @@ fn map_sinks( shutdown_tx: None, task_handles: vec![], container: None, + restart_guard: Arc::new(Mutex::new(())), }); } } @@ -158,6 +160,7 @@ fn map_sources( config: source_config.clone(), handler_tasks: vec![], container: None, + restart_guard: Arc::new(Mutex::new(())), }); } } diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 2455738ccd..4224e5e08a 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -241,6 +241,17 @@ impl SinkManager { iggy_client: &IggyClient, metrics: &Arc, ) -> Result<(), RuntimeError> { + let guard = { + let details_arc = self + .sinks + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; + let details = details_arc.lock().await; + details.restart_guard.clone() + }; + let _lock = guard.lock().await; + info!("Restarting sink connector: {key}"); self.stop_connector(key, metrics).await?; @@ -276,6 +287,7 @@ pub struct SinkDetails { pub shutdown_tx: Option>, pub task_handles: Vec>, pub container: Option>>, + pub restart_guard: Arc>, } impl fmt::Debug for SinkDetails { @@ -321,6 +333,7 @@ mod tests { shutdown_tx: None, task_handles: vec![], container: None, + restart_guard: Arc::new(Mutex::new(())), } } diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index ea06eb03e5..ed4f6515ef 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -243,6 +243,17 @@ impl SourceManager { state_path: &str, context: &Arc, ) -> Result<(), RuntimeError> { + let guard = { + let details_arc = self + .sources + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; + let details = details_arc.lock().await; + details.restart_guard.clone() + }; + let _lock = guard.lock().await; + info!("Restarting source connector: {key}"); self.stop_connector(key, metrics).await?; @@ -277,6 +288,7 @@ pub struct SourceDetails { pub config: SourceConfig, pub handler_tasks: Vec>, pub container: Option>>, + pub restart_guard: Arc>, } impl fmt::Debug for SourceDetails { @@ -321,6 +333,7 @@ mod tests { }, handler_tasks: vec![], container: None, + restart_guard: Arc::new(Mutex::new(())), } } From 7d95a1fec81a471a25ffb3e6a4281604627c64cd Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 1 Mar 2026 21:15:22 +0900 Subject: [PATCH 14/22] refactor(connectors): extract spawn_consume_tasks and reuse setup_sink_consumers in init Signed-off-by: shin --- core/connectors/runtime/src/main.rs | 5 + core/connectors/runtime/src/manager/sink.rs | 36 +--- core/connectors/runtime/src/sink.rs | 174 ++++++++------------ 3 files changed, 79 insertions(+), 136 deletions(-) diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 51428c62e4..1dcbcb1376 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -29,6 +29,7 @@ use figlet_rs::FIGfont; use iggy::prelude::{Client, IggyConsumer, IggyProducer}; use iggy_connector_sdk::{ StreamDecoder, StreamEncoder, + api::ConnectorStatus, sink::ConsumeCallback, source::{HandleCallback, SendCallback}, transforms::Transform, @@ -234,6 +235,10 @@ async fn main() -> Result<(), RuntimeError> { details.shutdown_tx = Some(shutdown_tx); details.task_handles = task_handles; } + context + .sinks + .update_status(&key, ConnectorStatus::Running, Some(&context.metrics)) + .await; } info!("All sources and sinks spawned."); diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 4224e5e08a..ed5f923e71 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -33,7 +33,7 @@ use std::sync::atomic::Ordering; use std::time::Duration; use tokio::sync::{Mutex, watch}; use tokio::task::JoinHandle; -use tracing::{error, info}; +use tracing::info; #[derive(Debug)] pub struct SinkManager { @@ -100,6 +100,7 @@ impl SinkManager { } } + #[allow(dead_code)] pub async fn set_error(&self, key: &str, error_message: &str) { if let Some(sink) = self.sinks.get(key) { let mut sink = sink.lock().await; @@ -187,38 +188,9 @@ impl SinkManager { let consumers = sink::setup_sink_consumers(key, config, iggy_client).await?; - let (shutdown_tx, shutdown_rx) = watch::channel(()); let callback = container.iggy_sink_consume; - let verbose = config.verbose; - let mut task_handles = Vec::new(); - - for (consumer, decoder, batch_size, transforms) in consumers { - let plugin_key = key.to_string(); - let metrics_clone = metrics.clone(); - let shutdown_rx = shutdown_rx.clone(); - - let handle = tokio::spawn(async move { - if let Err(error) = sink::consume_messages( - plugin_id, - decoder, - batch_size, - callback, - transforms, - consumer, - verbose, - &plugin_key, - &metrics_clone, - shutdown_rx, - ) - .await - { - error!( - "Failed to consume messages for sink connector with ID: {plugin_id}: {error}" - ); - } - }); - task_handles.push(handle); - } + let (shutdown_tx, task_handles) = + sink::spawn_consume_tasks(plugin_id, key, consumers, callback, config.verbose, metrics); { let mut details = details_arc.lock().await; diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 24b171d9d3..f25449e8f1 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -31,7 +31,6 @@ use iggy::prelude::{ AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration, IggyMessage, PollingStrategy, }; -use iggy_connector_sdk::api::ConnectorStatus; use iggy_connector_sdk::{ DecodedMessage, MessagesMetadata, RawMessage, RawMessages, ReceivedMessage, StreamDecoder, TopicMetadata, sink::ConsumeCallback, transforms::Transform, @@ -52,7 +51,7 @@ pub async fn init( ) -> Result, RuntimeError> { let mut sink_connectors: HashMap = HashMap::new(); for (key, config) in sink_configs { - let name = config.name; + let name = config.name.clone(); if !config.enabled { warn!("Sink: {name} is disabled ({key})"); continue; @@ -70,7 +69,7 @@ pub async fn init( let version = get_plugin_version(&container.container); init_error = init_sink( &container.container, - &config.plugin_config.unwrap_or_default(), + &config.plugin_config.clone().unwrap_or_default(), plugin_id, ) .err() @@ -98,7 +97,7 @@ pub async fn init( let version = get_plugin_version(&container); init_error = init_sink( &container, - &config.plugin_config.unwrap_or_default(), + &config.plugin_config.clone().unwrap_or_default(), plugin_id, ) .err() @@ -131,21 +130,7 @@ pub async fn init( ); } - let transforms = if let Some(transforms_config) = config.transforms { - let transforms = transform::load(&transforms_config).map_err(|error| { - RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) - })?; - let types = transforms - .iter() - .map(|t| t.r#type().into()) - .collect::>() - .join(", "); - info!("Enabled transforms for sink: {name} ({key}): {types}",); - transforms - } else { - vec![] - }; - + let consumers = setup_sink_consumers(&key, &config, iggy_client).await?; let connector = sink_connectors.get_mut(&path).ok_or_else(|| { RuntimeError::InvalidConfiguration(format!("Sink connector not found for path: {path}")) })?; @@ -158,39 +143,13 @@ pub async fn init( "Sink plugin not found for ID: {plugin_id}" )) })?; - - for stream in config.streams.iter() { - let poll_interval = IggyDuration::from_str( - stream.poll_interval.as_deref().unwrap_or("5ms"), - ) - .map_err(|error| { - RuntimeError::InvalidConfiguration(format!("Invalid poll interval: {error}")) - })?; - let default_consumer_group = format!("iggy-connect-sink-{key}"); - let consumer_group = stream - .consumer_group - .as_deref() - .unwrap_or(&default_consumer_group); - let batch_length = stream.batch_length.unwrap_or(1000); - for topic in stream.topics.iter() { - let mut consumer = iggy_client - .consumer_group(consumer_group, &stream.stream, topic)? - .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) - .create_consumer_group_if_not_exists() - .auto_join_consumer_group() - .polling_strategy(PollingStrategy::next()) - .poll_interval(poll_interval) - .batch_length(batch_length) - .build(); - - consumer.init().await?; - plugin.consumers.push(SinkConnectorConsumer { - consumer, - decoder: stream.schema.decoder(), - batch_size: batch_length, - transforms: transforms.clone(), - }); - } + for (consumer, decoder, batch_size, transforms) in consumers { + plugin.consumers.push(SinkConnectorConsumer { + consumer, + decoder, + batch_size, + transforms, + }); } } @@ -212,63 +171,70 @@ pub fn consume( continue; } info!("Starting consume for sink with ID: {}...", plugin.id); - let (shutdown_tx, shutdown_rx) = watch::channel(()); - let mut task_handles = Vec::new(); - - for consumer in plugin.consumers { - let plugin_key = plugin.key.clone(); - let context = context.clone(); - let shutdown_rx = shutdown_rx.clone(); - - let handle = tokio::spawn(async move { - context - .sinks - .update_status( - &plugin_key, - ConnectorStatus::Running, - Some(&context.metrics), - ) - .await; - - if let Err(error) = consume_messages( - plugin.id, - consumer.decoder, - consumer.batch_size, - sink.callback, - consumer.transforms, - consumer.consumer, - plugin.verbose, - &plugin_key, - &context.metrics, - shutdown_rx, - ) - .await - { - let error_msg = format!( - "Failed to consume messages for sink connector with ID: {}. {error}", - plugin.id - ); - error!("{error_msg}"); - context - .metrics - .increment_errors(&plugin_key, ConnectorType::Sink); - context.sinks.set_error(&plugin_key, &error_msg).await; - return; - } - info!( - "Consume messages for sink connector with ID: {} completed.", - plugin.id - ); - }); - task_handles.push(handle); - } - - handles.push((plugin.key.clone(), shutdown_tx, task_handles)); + let consumers = plugin + .consumers + .into_iter() + .map(|c| (c.consumer, c.decoder, c.batch_size, c.transforms)) + .collect(); + let (shutdown_tx, task_handles) = spawn_consume_tasks( + plugin.id, + &plugin.key, + consumers, + sink.callback, + plugin.verbose, + &context.metrics, + ); + handles.push((plugin.key, shutdown_tx, task_handles)); } } handles } +#[allow(clippy::type_complexity)] +pub(crate) fn spawn_consume_tasks( + plugin_id: u32, + plugin_key: &str, + consumers: Vec<( + IggyConsumer, + Arc, + u32, + Vec>, + )>, + callback: ConsumeCallback, + verbose: bool, + metrics: &Arc, +) -> (watch::Sender<()>, Vec>) { + let (shutdown_tx, shutdown_rx) = watch::channel(()); + let mut task_handles = Vec::new(); + for (consumer, decoder, batch_size, transforms) in consumers { + let plugin_key = plugin_key.to_string(); + let metrics = metrics.clone(); + let shutdown_rx = shutdown_rx.clone(); + let handle = tokio::spawn(async move { + if let Err(error) = consume_messages( + plugin_id, + decoder, + batch_size, + callback, + transforms, + consumer, + verbose, + &plugin_key, + &metrics, + shutdown_rx, + ) + .await + { + error!( + "Failed to consume messages for sink connector with ID: {plugin_id}: {error}" + ); + } + }); + task_handles.push(handle); + } + (shutdown_tx, task_handles) +} + #[allow(clippy::too_many_arguments)] pub(crate) async fn consume_messages( plugin_id: u32, From 274d03ad81c37f907aea7a60413b3f7cd46f5473 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 1 Mar 2026 21:41:45 +0900 Subject: [PATCH 15/22] refactor(connectors): extract spawn_source_handler and reuse setup_source_producer in init Signed-off-by: shin --- core/connectors/runtime/src/manager/source.rs | 37 ++--- core/connectors/runtime/src/source.rs | 127 ++++++++---------- 2 files changed, 69 insertions(+), 95 deletions(-) diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index ed4f6515ef..a3a2269cc8 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -195,31 +195,18 @@ impl SourceManager { let (producer, encoder, transforms) = source::setup_source_producer(config, iggy_client).await?; - let (sender, receiver) = flume::unbounded(); - source::SOURCE_SENDERS.insert(plugin_id, sender); - let callback = container.iggy_source_handle; - tokio::task::spawn_blocking(move || { - callback(plugin_id, source::handle_produced_messages); - }); - - let plugin_key = key.to_string(); - let verbose = config.verbose; - let context_clone = context.clone(); - let handler_task = tokio::spawn(async move { - source::source_forwarding_loop( - plugin_id, - plugin_key, - verbose, - producer, - encoder, - transforms, - state_storage, - receiver, - context_clone, - ) - .await; - }); + let handler_tasks = source::spawn_source_handler( + plugin_id, + key, + config.verbose, + producer, + encoder, + transforms, + state_storage, + callback, + context.clone(), + ); { let mut details = details_arc.lock().await; @@ -227,7 +214,7 @@ impl SourceManager { details.info.status = ConnectorStatus::Running; details.info.last_error = None; details.config = config.clone(); - details.handler_tasks = vec![handler_task]; + details.handler_tasks = handler_tasks; metrics.increment_sources_running(); } diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 0aaa227be6..5fe4bd3c82 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -26,7 +26,7 @@ use iggy::prelude::{ }; use iggy_connector_sdk::{ ConnectorState, DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, - transforms::Transform, + source::HandleCallback, transforms::Transform, }; use once_cell::sync::Lazy; use std::{ @@ -62,7 +62,7 @@ pub async fn init( ) -> Result, RuntimeError> { let mut source_connectors: HashMap = HashMap::new(); for (key, config) in source_configs { - let name = config.name; + let name = config.name.clone(); if !config.enabled { warn!("Source: {name} is disabled ({key})"); continue; @@ -84,7 +84,7 @@ pub async fn init( let version = get_plugin_version(&container.container); init_error = init_source( &container.container, - &config.plugin_config.unwrap_or_default(), + &config.plugin_config.clone().unwrap_or_default(), plugin_id, state, ) @@ -115,7 +115,7 @@ pub async fn init( let version = get_plugin_version(&container); init_error = init_source( &container, - &config.plugin_config.unwrap_or_default(), + &config.plugin_config.clone().unwrap_or_default(), plugin_id, state, ) @@ -151,20 +151,7 @@ pub async fn init( ); } - let transforms = if let Some(transforms_config) = config.transforms { - let transforms = transform::load(&transforms_config).map_err(|error| { - RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) - })?; - let types = transforms - .iter() - .map(|t| t.r#type().into()) - .collect::>() - .join(", "); - info!("Enabled transforms for source: {name} ({key}): {types}",); - transforms - } else { - vec![] - }; + let (producer, encoder, transforms) = setup_source_producer(&config, iggy_client).await?; let connector = source_connectors.get_mut(&path).ok_or_else(|| { RuntimeError::InvalidConfiguration(format!( @@ -180,32 +167,8 @@ pub async fn init( "Source plugin not found for ID: {plugin_id}" )) })?; - - for stream in config.streams.iter() { - let linger_time = IggyDuration::from_str( - stream.linger_time.as_deref().unwrap_or("5ms"), - ) - .map_err(|error| { - RuntimeError::InvalidConfiguration(format!("Invalid linger time: {error}")) - })?; - let batch_length = stream.batch_length.unwrap_or(1000); - let producer = iggy_client - .producer(&stream.stream, &stream.topic)? - .direct( - DirectConfig::builder() - .batch_length(batch_length) - .linger_time(linger_time) - .build(), - ) - .build(); - - producer.init().await?; - plugin.producer = Some(SourceConnectorProducer { - producer, - encoder: stream.schema.encoder(), - }); - plugin.transforms = transforms.clone(); - } + plugin.producer = Some(SourceConnectorProducer { producer, encoder }); + plugin.transforms = transforms; } Ok(source_connectors) @@ -446,6 +409,44 @@ pub(crate) async fn source_forwarding_loop( .await; } +#[allow(clippy::too_many_arguments)] +pub(crate) fn spawn_source_handler( + plugin_id: u32, + plugin_key: &str, + verbose: bool, + producer: IggyProducer, + encoder: Arc, + transforms: Vec>, + state_storage: StateStorage, + callback: HandleCallback, + context: Arc, +) -> Vec> { + let (sender, receiver) = flume::unbounded(); + SOURCE_SENDERS.insert(plugin_id, sender); + + tokio::task::spawn_blocking(move || { + callback(plugin_id, handle_produced_messages); + }); + + let plugin_key = plugin_key.to_string(); + let handler_task = tokio::spawn(async move { + source_forwarding_loop( + plugin_id, + plugin_key, + verbose, + producer, + encoder, + transforms, + state_storage, + receiver, + context, + ) + .await; + }); + + vec![handler_task] +} + pub fn handle( sources: Vec, context: Arc, @@ -455,7 +456,6 @@ pub fn handle( for plugin in source.plugins { let plugin_id = plugin.id; let plugin_key = plugin.key.clone(); - let context = context.clone(); if let Some(error) = &plugin.error { error!( @@ -470,32 +470,19 @@ pub fn handle( continue; }; - let (sender, receiver) = flume::unbounded(); - SOURCE_SENDERS.insert(plugin_id, sender); - - let callback = source.callback; - tokio::task::spawn_blocking(move || { - callback(plugin_id, handle_produced_messages); - }); - info!("Handler for source connector with ID: {plugin_id} started successfully."); - - let plugin_key_clone = plugin_key.clone(); - let handler_task = tokio::spawn(async move { - source_forwarding_loop( - plugin_id, - plugin_key_clone, - plugin.verbose, - producer_wrapper.producer, - producer_wrapper.encoder, - plugin.transforms, - plugin.state_storage, - receiver, - context, - ) - .await; - }); + let handler_tasks = spawn_source_handler( + plugin_id, + &plugin_key, + plugin.verbose, + producer_wrapper.producer, + producer_wrapper.encoder, + plugin.transforms, + plugin.state_storage, + source.callback, + context.clone(), + ); - handles.push((plugin_key, vec![handler_task])); + handles.push((plugin_key, handler_tasks)); } } handles From 2ddb9f9dc1779fe84bce2ee5bfce69e3cfa1fa43 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 1 Mar 2026 21:46:42 +0900 Subject: [PATCH 16/22] test(connectors): add restart integration test for sink connector Signed-off-by: shin --- .../tests/connectors/postgres/mod.rs | 1 + .../tests/connectors/postgres/restart.rs | 163 ++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 core/integration/tests/connectors/postgres/restart.rs diff --git a/core/integration/tests/connectors/postgres/mod.rs b/core/integration/tests/connectors/postgres/mod.rs index cb16dfbfba..c91abf9aba 100644 --- a/core/integration/tests/connectors/postgres/mod.rs +++ b/core/integration/tests/connectors/postgres/mod.rs @@ -19,6 +19,7 @@ mod postgres_sink; mod postgres_source; +mod restart; use crate::connectors::TestMessage; use serde::Deserialize; diff --git a/core/integration/tests/connectors/postgres/restart.rs b/core/integration/tests/connectors/postgres/restart.rs new file mode 100644 index 0000000000..d55b60220b --- /dev/null +++ b/core/integration/tests/connectors/postgres/restart.rs @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT}; +use crate::connectors::fixtures::{PostgresOps, PostgresSinkFixture}; +use crate::connectors::{TestMessage, create_test_messages}; +use bytes::Bytes; +use iggy::prelude::{IggyMessage, Partitioning}; +use iggy_binary_protocol::MessageClient; +use iggy_common::Identifier; +use iggy_connector_sdk::api::{ConnectorStatus, SinkInfoResponse}; +use integration::harness::seeds; +use integration::iggy_harness; +use reqwest::Client; +use std::time::Duration; +use tokio::time::sleep; + +const API_KEY: &str = "test-api-key"; +const SINK_TABLE: &str = "iggy_messages"; +const SINK_KEY: &str = "postgres"; + +type SinkRow = (i64, String, String, Vec); + +async fn wait_for_sink_status( + http: &Client, + api_url: &str, + expected: ConnectorStatus, +) -> SinkInfoResponse { + for _ in 0..POLL_ATTEMPTS { + if let Ok(resp) = http + .get(format!("{api_url}/sinks/{SINK_KEY}")) + .header("api-key", API_KEY) + .send() + .await + && let Ok(info) = resp.json::().await + && info.status == expected + { + return info; + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + panic!("Sink connector did not reach {expected:?} status in time"); +} + +fn build_messages(messages_data: &[TestMessage], id_offset: usize) -> Vec { + messages_data + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((id_offset + i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect() +} + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), + seed = seeds::connector_stream +)] +async fn restart_sink_connector_continues_processing( + harness: &TestHarness, + fixture: PostgresSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + let api_url = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http = Client::new(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + + fixture.wait_for_table(&pool, SINK_TABLE).await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await; + + let first_batch = create_test_messages(TEST_MESSAGE_COUNT); + let mut messages = build_messages(&first_batch, 0); + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send first batch"); + + let query = format!( + "SELECT iggy_offset, iggy_stream, iggy_topic, payload FROM {SINK_TABLE} ORDER BY iggy_offset" + ); + let rows: Vec = fixture + .fetch_rows_as(&pool, &query, TEST_MESSAGE_COUNT) + .await + .expect("Failed to fetch first batch rows"); + + assert_eq!( + rows.len(), + TEST_MESSAGE_COUNT, + "Expected {TEST_MESSAGE_COUNT} rows before restart" + ); + + let resp = http + .post(format!("{api_url}/sinks/{SINK_KEY}/restart")) + .header("api-key", API_KEY) + .send() + .await + .expect("Failed to call restart endpoint"); + + assert_eq!( + resp.status().as_u16(), + 204, + "Restart endpoint should return 204 No Content" + ); + + wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await; + + let second_batch = create_test_messages(TEST_MESSAGE_COUNT); + let mut messages = build_messages(&second_batch, TEST_MESSAGE_COUNT); + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send second batch"); + + let total_expected = TEST_MESSAGE_COUNT * 2; + let rows: Vec = fixture + .fetch_rows_as(&pool, &query, total_expected) + .await + .expect("Failed to fetch rows after restart"); + + assert!( + rows.len() >= total_expected, + "Expected at least {total_expected} rows after restart, got {}", + rows.len() + ); +} From 49e99f0b6896957a886f772593bf1cfc4ae35001 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 1 Mar 2026 22:06:34 +0900 Subject: [PATCH 17/22] fix(connectors): improve restart robustness with error reporting and shutdown safety - Add RuntimeContext to spawn_consume_tasks for error state reporting - Track spawn_blocking JoinHandle in spawn_source_handler to prevent zombie tasks - Acquire restart_guard in shutdown path to prevent race with concurrent restarts - Restore transform type info logs in setup_sink_consumers and setup_source_producer Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/api/sink.rs | 1 + core/connectors/runtime/src/main.rs | 20 ++++++++++++++++++++ core/connectors/runtime/src/manager/sink.rs | 16 +++++++++++++--- core/connectors/runtime/src/sink.rs | 15 +++++++++++++-- core/connectors/runtime/src/source.rs | 12 ++++++++---- 5 files changed, 55 insertions(+), 9 deletions(-) diff --git a/core/connectors/runtime/src/api/sink.rs b/core/connectors/runtime/src/api/sink.rs index 0b20425ff5..50c9d9b7a4 100644 --- a/core/connectors/runtime/src/api/sink.rs +++ b/core/connectors/runtime/src/api/sink.rs @@ -259,6 +259,7 @@ async fn restart_sink( context.config_provider.as_ref(), &context.iggy_clients.consumer, &context.metrics, + &context, ) .await?; Ok(StatusCode::NO_CONTENT) diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 1dcbcb1376..b3d3062a62 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -270,6 +270,16 @@ async fn main() -> Result<(), RuntimeError> { .map(|s| s.key) .collect(); for key in &source_keys { + let guard = if let Some(details) = context.sources.get(key).await { + let details = details.lock().await; + Some(details.restart_guard.clone()) + } else { + None + }; + let _lock = match &guard { + Some(g) => Some(g.lock().await), + None => None, + }; if let Err(err) = context.sources.stop_connector(key, &context.metrics).await { error!("Failed to stop source connector: {key}. {err}"); } @@ -283,6 +293,16 @@ async fn main() -> Result<(), RuntimeError> { .map(|s| s.key) .collect(); for key in &sink_keys { + let guard = if let Some(details) = context.sinks.get(key).await { + let details = details.lock().await; + Some(details.restart_guard.clone()) + } else { + None + }; + let _lock = match &guard { + Some(g) => Some(g.lock().await), + None => None, + }; if let Err(err) = context.sinks.stop_connector(key, &context.metrics).await { error!("Failed to stop sink connector: {key}. {err}"); } diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index ed5f923e71..a245d38a75 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -19,6 +19,7 @@ use crate::PLUGIN_ID; use crate::SinkApi; use crate::configs::connectors::{ConfigFormat, ConnectorsConfigProvider, SinkConfig}; +use crate::context::RuntimeContext; use crate::error::RuntimeError; use crate::metrics::Metrics; use crate::sink; @@ -163,6 +164,7 @@ impl SinkManager { config: &SinkConfig, iggy_client: &IggyClient, metrics: &Arc, + context: &Arc, ) -> Result<(), RuntimeError> { let details_arc = self .sinks @@ -189,8 +191,15 @@ impl SinkManager { let consumers = sink::setup_sink_consumers(key, config, iggy_client).await?; let callback = container.iggy_sink_consume; - let (shutdown_tx, task_handles) = - sink::spawn_consume_tasks(plugin_id, key, consumers, callback, config.verbose, metrics); + let (shutdown_tx, task_handles) = sink::spawn_consume_tasks( + plugin_id, + key, + consumers, + callback, + config.verbose, + metrics, + context.clone(), + ); { let mut details = details_arc.lock().await; @@ -212,6 +221,7 @@ impl SinkManager { config_provider: &dyn ConnectorsConfigProvider, iggy_client: &IggyClient, metrics: &Arc, + context: &Arc, ) -> Result<(), RuntimeError> { let guard = { let details_arc = self @@ -233,7 +243,7 @@ impl SinkManager { .map_err(|e| RuntimeError::InvalidConfiguration(e.to_string()))? .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; - self.start_connector(key, &config, iggy_client, metrics) + self.start_connector(key, &config, iggy_client, metrics, context) .await?; info!("Sink connector: {key} restarted successfully."); Ok(()) diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index f25449e8f1..2074f845ca 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -183,6 +183,7 @@ pub fn consume( sink.callback, plugin.verbose, &context.metrics, + context.clone(), ); handles.push((plugin.key, shutdown_tx, task_handles)); } @@ -203,6 +204,7 @@ pub(crate) fn spawn_consume_tasks( callback: ConsumeCallback, verbose: bool, metrics: &Arc, + context: Arc, ) -> (watch::Sender<()>, Vec>) { let (shutdown_tx, shutdown_rx) = watch::channel(()); let mut task_handles = Vec::new(); @@ -210,6 +212,7 @@ pub(crate) fn spawn_consume_tasks( let plugin_key = plugin_key.to_string(); let metrics = metrics.clone(); let shutdown_rx = shutdown_rx.clone(); + let context = context.clone(); let handle = tokio::spawn(async move { if let Err(error) = consume_messages( plugin_id, @@ -228,6 +231,10 @@ pub(crate) fn spawn_consume_tasks( error!( "Failed to consume messages for sink connector with ID: {plugin_id}: {error}" ); + context + .sinks + .set_error(&plugin_key, &error.to_string()) + .await; } }); task_handles.push(handle); @@ -384,9 +391,13 @@ pub(crate) async fn setup_sink_consumers( RuntimeError, > { let transforms = if let Some(transforms_config) = &config.transforms { - transform::load(transforms_config).map_err(|error| { + let loaded = transform::load(transforms_config).map_err(|error| { RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) - })? + })?; + for t in &loaded { + info!("Loaded transform: {:?} for sink: {key}", t.r#type()); + } + loaded } else { vec![] }; diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 5fe4bd3c82..c9cd5ee7fa 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -228,9 +228,13 @@ pub(crate) async fn setup_source_producer( RuntimeError, > { let transforms = if let Some(transforms_config) = &config.transforms { - transform::load(transforms_config).map_err(|error| { + let loaded = transform::load(transforms_config).map_err(|error| { RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) - })? + })?; + for t in &loaded { + info!("Loaded transform: {:?} for source", t.r#type()); + } + loaded } else { vec![] }; @@ -424,7 +428,7 @@ pub(crate) fn spawn_source_handler( let (sender, receiver) = flume::unbounded(); SOURCE_SENDERS.insert(plugin_id, sender); - tokio::task::spawn_blocking(move || { + let blocking_handle = tokio::task::spawn_blocking(move || { callback(plugin_id, handle_produced_messages); }); @@ -444,7 +448,7 @@ pub(crate) fn spawn_source_handler( .await; }); - vec![handler_task] + vec![blocking_handle, handler_task] } pub fn handle( From 2be1a09125315f06151c19fde90925795924f432 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 1 Mar 2026 22:14:51 +0900 Subject: [PATCH 18/22] refactor(connectors): code review fixes for restart robustness - Remove #[allow(dead_code)] from set_error() now that it's used - Add source error state reporting when forwarding loop terminates - Add key parameter to setup_source_producer() for consistent logging - Extract stop_connector_with_guard() to deduplicate shutdown guard pattern Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/main.rs | 32 ++++++------------- core/connectors/runtime/src/manager/sink.rs | 19 ++++++++++- core/connectors/runtime/src/manager/source.rs | 20 +++++++++++- core/connectors/runtime/src/source.rs | 15 +++++++-- 4 files changed, 60 insertions(+), 26 deletions(-) diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index b3d3062a62..6232f90016 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -270,17 +270,11 @@ async fn main() -> Result<(), RuntimeError> { .map(|s| s.key) .collect(); for key in &source_keys { - let guard = if let Some(details) = context.sources.get(key).await { - let details = details.lock().await; - Some(details.restart_guard.clone()) - } else { - None - }; - let _lock = match &guard { - Some(g) => Some(g.lock().await), - None => None, - }; - if let Err(err) = context.sources.stop_connector(key, &context.metrics).await { + if let Err(err) = context + .sources + .stop_connector_with_guard(key, &context.metrics) + .await + { error!("Failed to stop source connector: {key}. {err}"); } } @@ -293,17 +287,11 @@ async fn main() -> Result<(), RuntimeError> { .map(|s| s.key) .collect(); for key in &sink_keys { - let guard = if let Some(details) = context.sinks.get(key).await { - let details = details.lock().await; - Some(details.restart_guard.clone()) - } else { - None - }; - let _lock = match &guard { - Some(g) => Some(g.lock().await), - None => None, - }; - if let Err(err) = context.sinks.stop_connector(key, &context.metrics).await { + if let Err(err) = context + .sinks + .stop_connector_with_guard(key, &context.metrics) + .await + { error!("Failed to stop sink connector: {key}. {err}"); } } diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index a245d38a75..af29d4cfa5 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -101,7 +101,6 @@ impl SinkManager { } } - #[allow(dead_code)] pub async fn set_error(&self, key: &str, error_message: &str) { if let Some(sink) = self.sinks.get(key) { let mut sink = sink.lock().await; @@ -110,6 +109,24 @@ impl SinkManager { } } + pub async fn stop_connector_with_guard( + &self, + key: &str, + metrics: &Arc, + ) -> Result<(), RuntimeError> { + let guard = { + let details_arc = self + .sinks + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; + let details = details_arc.lock().await; + details.restart_guard.clone() + }; + let _lock = guard.lock().await; + self.stop_connector(key, metrics).await + } + pub async fn stop_connector( &self, key: &str, diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index a3a2269cc8..4bae41beb8 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -110,6 +110,24 @@ impl SourceManager { } } + pub async fn stop_connector_with_guard( + &self, + key: &str, + metrics: &Arc, + ) -> Result<(), RuntimeError> { + let guard = { + let details_arc = self + .sources + .get(key) + .map(|e| e.value().clone()) + .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; + let details = details_arc.lock().await; + details.restart_guard.clone() + }; + let _lock = guard.lock().await; + self.stop_connector(key, metrics).await + } + pub async fn stop_connector( &self, key: &str, @@ -193,7 +211,7 @@ impl SourceManager { info!("Source connector with ID: {plugin_id} for plugin: {key} initialized successfully."); let (producer, encoder, transforms) = - source::setup_source_producer(config, iggy_client).await?; + source::setup_source_producer(key, config, iggy_client).await?; let callback = container.iggy_source_handle; let handler_tasks = source::spawn_source_handler( diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index c9cd5ee7fa..b79bace5b4 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -151,7 +151,8 @@ pub async fn init( ); } - let (producer, encoder, transforms) = setup_source_producer(&config, iggy_client).await?; + let (producer, encoder, transforms) = + setup_source_producer(&key, &config, iggy_client).await?; let connector = source_connectors.get_mut(&path).ok_or_else(|| { RuntimeError::InvalidConfiguration(format!( @@ -217,6 +218,7 @@ pub(crate) fn get_state_storage(state_path: &str, key: &str) -> StateStorage { } pub(crate) async fn setup_source_producer( + key: &str, config: &SourceConfig, iggy_client: &IggyClient, ) -> Result< @@ -232,7 +234,7 @@ pub(crate) async fn setup_source_producer( RuntimeError::InvalidConfiguration(format!("Failed to load transforms: {error}")) })?; for t in &loaded { - info!("Loaded transform: {:?} for source", t.r#type()); + info!("Loaded transform: {:?} for source: {key}", t.r#type()); } loaded } else { @@ -433,6 +435,8 @@ pub(crate) fn spawn_source_handler( }); let plugin_key = plugin_key.to_string(); + let context_for_error = context.clone(); + let plugin_key_for_error = plugin_key.clone(); let handler_task = tokio::spawn(async move { source_forwarding_loop( plugin_id, @@ -446,6 +450,13 @@ pub(crate) fn spawn_source_handler( context, ) .await; + context_for_error + .sources + .set_error( + &plugin_key_for_error, + "Source forwarding loop terminated unexpectedly", + ) + .await; }); vec![blocking_handle, handler_task] From 48ae54055535e4e35adfe060290438f1e4f0ea5d Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 1 Mar 2026 22:44:12 +0900 Subject: [PATCH 19/22] fix(connectors): fix source false error on shutdown and startup handle race - Remove unconditional set_error() after source forwarding loop exits, as the loop already handles error/stopped status internally - Move API server init after handle storage to eliminate race window where restart API could arrive before handles are stored Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/main.rs | 2 +- core/connectors/runtime/src/source.rs | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 6232f90016..cca15d47a4 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -218,7 +218,6 @@ async fn main() -> Result<(), RuntimeError> { } let context = Arc::new(context); - api::init(&config.http, context.clone()).await; let source_handles = source::handle(source_wrappers, context.clone()); for (key, handler_tasks) in source_handles { @@ -242,6 +241,7 @@ async fn main() -> Result<(), RuntimeError> { } info!("All sources and sinks spawned."); + api::init(&config.http, context.clone()).await; #[cfg(unix)] let (mut ctrl_c, mut sigterm) = { diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index b79bace5b4..51ec060269 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -435,8 +435,6 @@ pub(crate) fn spawn_source_handler( }); let plugin_key = plugin_key.to_string(); - let context_for_error = context.clone(); - let plugin_key_for_error = plugin_key.clone(); let handler_task = tokio::spawn(async move { source_forwarding_loop( plugin_id, @@ -450,13 +448,6 @@ pub(crate) fn spawn_source_handler( context, ) .await; - context_for_error - .sources - .set_error( - &plugin_key_for_error, - "Source forwarding loop terminated unexpectedly", - ) - .await; }); vec![blocking_handle, handler_task] From 4d3ff4f150e31261b44c3cbb754cd425a08e93c1 Mon Sep 17 00:00:00 2001 From: shin Date: Sat, 7 Mar 2026 00:08:19 +0900 Subject: [PATCH 20/22] refactor(connectors): rename details_arc to details in manager modules Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/manager/sink.rs | 20 +++++++++---------- core/connectors/runtime/src/manager/source.rs | 20 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index af29d4cfa5..36eb6e11e9 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -115,12 +115,12 @@ impl SinkManager { metrics: &Arc, ) -> Result<(), RuntimeError> { let guard = { - let details_arc = self + let details = self .sinks .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; - let details = details_arc.lock().await; + let details = details.lock().await; details.restart_guard.clone() }; let _lock = guard.lock().await; @@ -132,14 +132,14 @@ impl SinkManager { key: &str, metrics: &Arc, ) -> Result<(), RuntimeError> { - let details_arc = self + let details = self .sinks .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; let (shutdown_tx, task_handles, plugin_id, container) = { - let mut details = details_arc.lock().await; + let mut details = details.lock().await; ( details.shutdown_tx.take(), std::mem::take(&mut details.task_handles), @@ -163,7 +163,7 @@ impl SinkManager { } { - let mut details = details_arc.lock().await; + let mut details = details.lock().await; let old_status = details.info.status; details.info.status = ConnectorStatus::Stopped; details.info.last_error = None; @@ -183,14 +183,14 @@ impl SinkManager { metrics: &Arc, context: &Arc, ) -> Result<(), RuntimeError> { - let details_arc = self + let details = self .sinks .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; let container = { - let details = details_arc.lock().await; + let details = details.lock().await; details.container.clone().ok_or_else(|| { RuntimeError::InvalidConfiguration(format!("No container loaded for sink: {key}")) })? @@ -219,7 +219,7 @@ impl SinkManager { ); { - let mut details = details_arc.lock().await; + let mut details = details.lock().await; details.info.id = plugin_id; details.info.status = ConnectorStatus::Running; details.info.last_error = None; @@ -241,12 +241,12 @@ impl SinkManager { context: &Arc, ) -> Result<(), RuntimeError> { let guard = { - let details_arc = self + let details = self .sinks .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?; - let details = details_arc.lock().await; + let details = details.lock().await; details.restart_guard.clone() }; let _lock = guard.lock().await; diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 4bae41beb8..4afc74b2ae 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -116,12 +116,12 @@ impl SourceManager { metrics: &Arc, ) -> Result<(), RuntimeError> { let guard = { - let details_arc = self + let details = self .sources .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; - let details = details_arc.lock().await; + let details = details.lock().await; details.restart_guard.clone() }; let _lock = guard.lock().await; @@ -133,14 +133,14 @@ impl SourceManager { key: &str, metrics: &Arc, ) -> Result<(), RuntimeError> { - let details_arc = self + let details = self .sources .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; let (task_handles, plugin_id, container) = { - let mut details = details_arc.lock().await; + let mut details = details.lock().await; ( std::mem::take(&mut details.handler_tasks), details.info.id, @@ -161,7 +161,7 @@ impl SourceManager { } { - let mut details = details_arc.lock().await; + let mut details = details.lock().await; let old_status = details.info.status; details.info.status = ConnectorStatus::Stopped; details.info.last_error = None; @@ -182,14 +182,14 @@ impl SourceManager { state_path: &str, context: &Arc, ) -> Result<(), RuntimeError> { - let details_arc = self + let details = self .sources .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; let container = { - let details = details_arc.lock().await; + let details = details.lock().await; details.container.clone().ok_or_else(|| { RuntimeError::InvalidConfiguration(format!("No container loaded for source: {key}")) })? @@ -227,7 +227,7 @@ impl SourceManager { ); { - let mut details = details_arc.lock().await; + let mut details = details.lock().await; details.info.id = plugin_id; details.info.status = ConnectorStatus::Running; details.info.last_error = None; @@ -249,12 +249,12 @@ impl SourceManager { context: &Arc, ) -> Result<(), RuntimeError> { let guard = { - let details_arc = self + let details = self .sources .get(key) .map(|e| e.value().clone()) .ok_or_else(|| RuntimeError::SourceNotFound(key.to_string()))?; - let details = details_arc.lock().await; + let details = details.lock().await; details.restart_guard.clone() }; let _lock = guard.lock().await; From e06aeef677140977118db9e1447a7aa4f87cb089 Mon Sep 17 00:00:00 2001 From: shin Date: Sat, 7 Mar 2026 00:41:07 +0900 Subject: [PATCH 21/22] refactor(connectors): use try_lock for restart guard to avoid queuing Signed-off-by: seokjin0414 Signed-off-by: shin --- core/connectors/runtime/src/manager/sink.rs | 5 ++++- core/connectors/runtime/src/manager/source.rs | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/connectors/runtime/src/manager/sink.rs b/core/connectors/runtime/src/manager/sink.rs index 36eb6e11e9..9c6cafa1ef 100644 --- a/core/connectors/runtime/src/manager/sink.rs +++ b/core/connectors/runtime/src/manager/sink.rs @@ -249,7 +249,10 @@ impl SinkManager { let details = details.lock().await; details.restart_guard.clone() }; - let _lock = guard.lock().await; + let Ok(_lock) = guard.try_lock() else { + info!("Restart already in progress for sink connector: {key}, skipping."); + return Ok(()); + }; info!("Restarting sink connector: {key}"); self.stop_connector(key, metrics).await?; diff --git a/core/connectors/runtime/src/manager/source.rs b/core/connectors/runtime/src/manager/source.rs index 4afc74b2ae..14fbb2c0d8 100644 --- a/core/connectors/runtime/src/manager/source.rs +++ b/core/connectors/runtime/src/manager/source.rs @@ -257,7 +257,10 @@ impl SourceManager { let details = details.lock().await; details.restart_guard.clone() }; - let _lock = guard.lock().await; + let Ok(_lock) = guard.try_lock() else { + info!("Restart already in progress for source connector: {key}, skipping."); + return Ok(()); + }; info!("Restarting source connector: {key}"); self.stop_connector(key, metrics).await?; From f7e5ba8533cbf58762054de21c46ea3563170d2b Mon Sep 17 00:00:00 2001 From: shin Date: Sat, 7 Mar 2026 00:44:49 +0900 Subject: [PATCH 22/22] test(connectors): add parallel restart test and move helpers to bottom Signed-off-by: seokjin0414 Signed-off-by: shin --- .../tests/connectors/postgres/restart.rs | 147 +++++++++++++----- 1 file changed, 111 insertions(+), 36 deletions(-) diff --git a/core/integration/tests/connectors/postgres/restart.rs b/core/integration/tests/connectors/postgres/restart.rs index d55b60220b..97d9f2422c 100644 --- a/core/integration/tests/connectors/postgres/restart.rs +++ b/core/integration/tests/connectors/postgres/restart.rs @@ -37,42 +37,6 @@ const SINK_KEY: &str = "postgres"; type SinkRow = (i64, String, String, Vec); -async fn wait_for_sink_status( - http: &Client, - api_url: &str, - expected: ConnectorStatus, -) -> SinkInfoResponse { - for _ in 0..POLL_ATTEMPTS { - if let Ok(resp) = http - .get(format!("{api_url}/sinks/{SINK_KEY}")) - .header("api-key", API_KEY) - .send() - .await - && let Ok(info) = resp.json::().await - && info.status == expected - { - return info; - } - sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; - } - panic!("Sink connector did not reach {expected:?} status in time"); -} - -fn build_messages(messages_data: &[TestMessage], id_offset: usize) -> Vec { - messages_data - .iter() - .enumerate() - .map(|(i, msg)| { - let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); - IggyMessage::builder() - .id((id_offset + i + 1) as u128) - .payload(Bytes::from(payload)) - .build() - .expect("Failed to build message") - }) - .collect() -} - #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), seed = seeds::connector_stream @@ -161,3 +125,114 @@ async fn restart_sink_connector_continues_processing( rows.len() ); } + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), + seed = seeds::connector_stream +)] +async fn parallel_restart_requests_should_not_break_connector( + harness: &TestHarness, + fixture: PostgresSinkFixture, +) { + let client = harness.root_client().await.unwrap(); + let api_url = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http = Client::new(); + let pool = fixture.create_pool().await.expect("Failed to create pool"); + + fixture.wait_for_table(&pool, SINK_TABLE).await; + + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + + wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await; + + let mut tasks = Vec::new(); + for _ in 0..5 { + let http = http.clone(); + let url = format!("{api_url}/sinks/{SINK_KEY}/restart"); + tasks.push(tokio::spawn(async move { + http.post(&url) + .header("api-key", API_KEY) + .send() + .await + .expect("Failed to call restart endpoint") + })); + } + + let responses = futures::future::join_all(tasks).await; + for resp in responses { + let resp = resp.expect("Task panicked"); + assert_eq!( + resp.status().as_u16(), + 204, + "All restart requests should return 204" + ); + } + + wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await; + + let batch = create_test_messages(TEST_MESSAGE_COUNT); + let mut messages = build_messages(&batch, 0); + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send messages after parallel restarts"); + + let query = format!( + "SELECT iggy_offset, iggy_stream, iggy_topic, payload FROM {SINK_TABLE} ORDER BY iggy_offset" + ); + let rows: Vec = fixture + .fetch_rows_as(&pool, &query, TEST_MESSAGE_COUNT) + .await + .expect("Failed to fetch rows after parallel restarts"); + + assert!( + rows.len() >= TEST_MESSAGE_COUNT, + "Expected at least {TEST_MESSAGE_COUNT} rows after parallel restarts, got {}", + rows.len() + ); +} + +async fn wait_for_sink_status( + http: &Client, + api_url: &str, + expected: ConnectorStatus, +) -> SinkInfoResponse { + for _ in 0..POLL_ATTEMPTS { + if let Ok(resp) = http + .get(format!("{api_url}/sinks/{SINK_KEY}")) + .header("api-key", API_KEY) + .send() + .await + && let Ok(info) = resp.json::().await + && info.status == expected + { + return info; + } + sleep(Duration::from_millis(POLL_INTERVAL_MS)).await; + } + panic!("Sink connector did not reach {expected:?} status in time"); +} + +fn build_messages(messages_data: &[TestMessage], id_offset: usize) -> Vec { + messages_data + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((id_offset + i + 1) as u128) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect() +}