Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a5c4b2a
refactor(connectors): add iggy_clients and state_path to RuntimeContext
seokjin0414 Feb 19, 2026
a9f2009
refactor(connectors): add lifecycle fields to SinkDetails and SourceD…
seokjin0414 Feb 19, 2026
524a62b
feat(connectors): add shutdown channel to sink consume loop
seokjin0414 Feb 19, 2026
4437792
refactor(connectors): store Arc<Container> in Manager for restart sup…
seokjin0414 Feb 19, 2026
c020704
feat(connectors): add stop/start/restart methods to SinkManager
seokjin0414 Feb 19, 2026
dc4425d
feat(connectors): add stop/start/restart methods to SourceManager
seokjin0414 Feb 19, 2026
d17e202
feat(connectors): add POST /restart API endpoints for sink and source…
seokjin0414 Feb 19, 2026
d46727e
refactor(connectors): replace direct shutdown with Manager-based stops
seokjin0414 Feb 19, 2026
3d681cd
refactor(connectors): fmt and clippy fixes for restart support
seokjin0414 Feb 19, 2026
20bd79b
test(connectors): add unit tests for SinkManager and SourceManager
seokjin0414 Feb 19, 2026
ee4a14c
fix(connectors): eliminate race condition in initial startup handle s…
seokjin0414 Feb 19, 2026
02a5ccf
fix(connectors): prevent resource leak in source handle() when produc…
seokjin0414 Feb 19, 2026
6d9fc15
refactor(connectors): add restart concurrency guard to SinkDetails an…
seokjin0414 Mar 1, 2026
7d95a1f
refactor(connectors): extract spawn_consume_tasks and reuse setup_sin…
seokjin0414 Mar 1, 2026
274d03a
refactor(connectors): extract spawn_source_handler and reuse setup_so…
seokjin0414 Mar 1, 2026
2ddb9f9
test(connectors): add restart integration test for sink connector
seokjin0414 Mar 1, 2026
49e99f0
fix(connectors): improve restart robustness with error reporting and …
seokjin0414 Mar 1, 2026
2be1a09
refactor(connectors): code review fixes for restart robustness
seokjin0414 Mar 1, 2026
48ae540
fix(connectors): fix source false error on shutdown and startup handl…
seokjin0414 Mar 1, 2026
53aa370
Merge branch 'master' into 2417-connector-restart-with-new-config
spetz Mar 6, 2026
4d3ff4f
refactor(connectors): rename details_arc to details in manager modules
seokjin0414 Mar 6, 2026
e06aeef
refactor(connectors): use try_lock for restart guard to avoid queuing
seokjin0414 Mar 6, 2026
f7e5ba8
test(connectors): add parallel restart test and move helpers to bottom
seokjin0414 Mar 6, 2026
7a7c730
Merge branch 'master' into 2417-connector-restart-with-new-config
mmodzelewski Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion core/connectors/runtime/src/api/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use axum::{
extract::{Path, Query, State},
http::{HeaderMap, StatusCode, header},
response::IntoResponse,
routing::get,
routing::{get, post},
};
use serde::Deserialize;
use std::sync::Arc;
Expand All @@ -52,6 +52,7 @@ pub fn router(state: Arc<RuntimeContext>) -> Router {
"/sinks/{key}/configs/active",
get(get_sink_active_config).put(update_sink_active_config),
)
.route("/sinks/{key}/restart", post(restart_sink))
.with_state(state)
}

Expand Down Expand Up @@ -246,3 +247,20 @@ async fn delete_sink_config(
.await?;
Ok(StatusCode::NO_CONTENT)
}

async fn restart_sink(
State(context): State<Arc<RuntimeContext>>,
Path(key): Path<String>,
) -> Result<StatusCode, ApiError> {
context
.sinks
.restart_connector(
&key,
context.config_provider.as_ref(),
&context.iggy_clients.consumer,
&context.metrics,
&context,
)
.await?;
Ok(StatusCode::NO_CONTENT)
}
21 changes: 20 additions & 1 deletion core/connectors/runtime/src/api/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use axum::{
extract::{Path, Query, State},
http::{HeaderMap, StatusCode, header},
response::IntoResponse,
routing::get,
routing::{get, post},
};
use serde::Deserialize;
use std::sync::Arc;
Expand All @@ -55,6 +55,7 @@ pub fn router(state: Arc<RuntimeContext>) -> Router {
"/sources/{key}/configs/active",
get(get_source_active_config).put(update_source_active_config),
)
.route("/sources/{key}/restart", post(restart_source))
.with_state(state)
}

Expand Down Expand Up @@ -249,3 +250,21 @@ async fn delete_source_config(
.await?;
Ok(StatusCode::NO_CONTENT)
}

async fn restart_source(
State(context): State<Arc<RuntimeContext>>,
Path(key): Path<String>,
) -> Result<StatusCode, ApiError> {
context
.sources
.restart_connector(
&key,
context.config_provider.as_ref(),
&context.iggy_clients.producer,
&context.metrics,
&context.state_path,
&context,
)
.await?;
Ok(StatusCode::NO_CONTENT)
}
16 changes: 16 additions & 0 deletions core/connectors/runtime/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::configs::connectors::{ConnectorsConfigProvider, SinkConfig, SourceConfig};
use crate::configs::runtime::ConnectorsRuntimeConfig;
use crate::metrics::Metrics;
use crate::stream::IggyClients;
use crate::{
SinkConnectorWrapper, SourceConnectorWrapper,
manager::{
Expand All @@ -31,6 +32,7 @@ use iggy_connector_sdk::api::ConnectorError;
use iggy_connector_sdk::api::ConnectorStatus;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;

pub struct RuntimeContext {
Expand All @@ -40,15 +42,20 @@ pub struct RuntimeContext {
pub config_provider: Arc<dyn ConnectorsConfigProvider>,
pub metrics: Arc<Metrics>,
pub start_time: IggyTimestamp,
pub iggy_clients: Arc<IggyClients>,
pub state_path: String,
}

#[allow(clippy::too_many_arguments)]
pub fn init(
config: &ConnectorsRuntimeConfig,
sinks_config: &HashMap<String, SinkConfig>,
sources_config: &HashMap<String, SourceConfig>,
sink_wrappers: &[SinkConnectorWrapper],
source_wrappers: &[SourceConnectorWrapper],
config_provider: Box<dyn ConnectorsConfigProvider>,
iggy_clients: Arc<IggyClients>,
state_path: String,
) -> RuntimeContext {
let metrics = Arc::new(Metrics::init());
let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers));
Expand All @@ -64,6 +71,8 @@ pub fn init(
config_provider: Arc::from(config_provider),
metrics,
start_time: IggyTimestamp::now(),
iggy_clients,
state_path,
}
}

Expand Down Expand Up @@ -103,6 +112,10 @@ fn map_sinks(
plugin_config_format: sink_plugin.config_format,
},
config: sink_config.clone(),
shutdown_tx: None,
task_handles: vec![],
container: None,
restart_guard: Arc::new(Mutex::new(())),
});
}
}
Expand Down Expand Up @@ -145,6 +158,9 @@ fn map_sources(
plugin_config_format: source_plugin.config_format,
},
config: source_config.clone(),
handler_tasks: vec![],
container: None,
restart_guard: Arc::new(Mutex::new(())),
});
}
}
Expand Down
156 changes: 88 additions & 68 deletions core/connectors/runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use figlet_rs::FIGfont;
use iggy::prelude::{Client, IggyConsumer, IggyProducer};
use iggy_connector_sdk::{
StreamDecoder, StreamEncoder,
api::ConnectorStatus,
sink::ConsumeCallback,
source::{HandleCallback, SendCallback},
transforms::Transform,
Expand All @@ -40,7 +41,7 @@ use std::{
env,
sync::{Arc, atomic::AtomicU32},
};
use tracing::info;
use tracing::{error, info};

mod api;
pub(crate) mod configs;
Expand Down Expand Up @@ -69,8 +70,8 @@ static PLUGIN_ID: AtomicU32 = AtomicU32::new(1);
const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"];
const DEFAULT_CONFIG_PATH: &str = "core/connectors/runtime/config.toml";

#[derive(WrapperApi)]
struct SourceApi {
#[derive(WrapperApi, Debug)]
pub(crate) struct SourceApi {
iggy_source_open: extern "C" fn(
id: u32,
config_ptr: *const u8,
Expand All @@ -84,8 +85,8 @@ struct SourceApi {
iggy_source_version: extern "C" fn() -> *const std::ffi::c_char,
}

#[derive(WrapperApi)]
struct SinkApi {
#[derive(WrapperApi, Debug)]
pub(crate) struct SinkApi {
iggy_sink_open: extern "C" fn(
id: u32,
config_ptr: *const u8,
Expand Down Expand Up @@ -143,7 +144,7 @@ async fn main() -> Result<(), RuntimeError> {

info!("State will be stored in: {}", config.state.path);

let iggy_clients = stream::init(config.iggy.clone()).await?;
let iggy_clients = Arc::new(stream::init(config.iggy.clone()).await?);

let connectors_config_provider: Box<dyn ConnectorsConfigProvider> =
create_connectors_config_provider(&config.connectors).await?;
Expand All @@ -166,47 +167,31 @@ async fn main() -> Result<(), RuntimeError> {
let sinks = sink::init(sinks_config.clone(), &iggy_clients.consumer).await?;

let mut sink_wrappers = vec![];
let mut sink_with_plugins = HashMap::new();
for (key, sink) in sinks {
let plugin_ids = sink
.plugins
.iter()
.filter(|plugin| plugin.error.is_none())
.map(|plugin| plugin.id)
.collect();
let mut sink_containers_by_key: HashMap<String, Arc<Container<SinkApi>>> = HashMap::new();
for (_path, sink) in sinks {
let container = Arc::new(sink.container);
let callback = container.iggy_sink_consume;
for plugin in &sink.plugins {
sink_containers_by_key.insert(plugin.key.clone(), container.clone());
}
sink_wrappers.push(SinkConnectorWrapper {
callback: sink.container.iggy_sink_consume,
callback,
plugins: sink.plugins,
});
sink_with_plugins.insert(
key,
SinkWithPlugins {
container: sink.container,
plugin_ids,
},
);
}

let mut source_wrappers = vec![];
let mut source_with_plugins = HashMap::new();
for (key, source) in sources {
let plugin_ids = source
.plugins
.iter()
.filter(|plugin| plugin.error.is_none())
.map(|plugin| plugin.id)
.collect();
let mut source_containers_by_key: HashMap<String, Arc<Container<SourceApi>>> = HashMap::new();
for (_path, source) in sources {
let container = Arc::new(source.container);
let callback = container.iggy_source_handle;
for plugin in &source.plugins {
source_containers_by_key.insert(plugin.key.clone(), container.clone());
}
source_wrappers.push(SourceConnectorWrapper {
callback: source.container.iggy_source_handle,
callback,
plugins: source.plugins,
});
source_with_plugins.insert(
key,
SourceWithPlugins {
container: source.container,
plugin_ids,
},
);
}

let context = context::init(
Expand All @@ -216,13 +201,47 @@ async fn main() -> Result<(), RuntimeError> {
&sink_wrappers,
&source_wrappers,
connectors_config_provider,
iggy_clients.clone(),
config.state.path.clone(),
);
for (key, container) in sink_containers_by_key {
if let Some(details) = context.sinks.get(&key).await {
let mut details = details.lock().await;
details.container = Some(container);
}
}
for (key, container) in source_containers_by_key {
if let Some(details) = context.sources.get(&key).await {
let mut details = details.lock().await;
details.container = Some(container);
}
}

let context = Arc::new(context);
api::init(&config.http, context.clone()).await;

let source_handler_tasks = source::handle(source_wrappers, context.clone());
sink::consume(sink_wrappers, context.clone());
let source_handles = source::handle(source_wrappers, context.clone());
for (key, handler_tasks) in source_handles {
if let Some(details) = context.sources.get(&key).await {
let mut details = details.lock().await;
details.handler_tasks = handler_tasks;
}
}

let sink_handles = sink::consume(sink_wrappers, context.clone());
for (key, shutdown_tx, task_handles) in sink_handles {
if let Some(details) = context.sinks.get(&key).await {
let mut details = details.lock().await;
details.shutdown_tx = Some(shutdown_tx);
details.task_handles = task_handles;
}
context
.sinks
.update_status(&key, ConnectorStatus::Running, Some(&context.metrics))
.await;
}

info!("All sources and sinks spawned.");
api::init(&config.http, context.clone()).await;

#[cfg(unix)]
let (mut ctrl_c, mut sigterm) = {
Expand All @@ -243,26 +262,37 @@ async fn main() -> Result<(), RuntimeError> {
}
}

for (key, source) in source_with_plugins {
for id in source.plugin_ids {
info!("Closing source connector with ID: {id} for plugin: {key}");
source.container.iggy_source_close(id);
source::cleanup_sender(id);
info!("Closed source connector with ID: {id} for plugin: {key}");
let source_keys: Vec<String> = context
.sources
.get_all()
.await
.into_iter()
.map(|s| s.key)
.collect();
for key in &source_keys {
if let Err(err) = context
.sources
.stop_connector_with_guard(key, &context.metrics)
.await
{
error!("Failed to stop source connector: {key}. {err}");
}
}

// Wait for source handler tasks to drain remaining messages and persist state
// before shutting down the Iggy clients they depend on.
for handle in source_handler_tasks {
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
}

for (key, sink) in sink_with_plugins {
for id in sink.plugin_ids {
info!("Closing sink connector with ID: {id} for plugin: {key}");
sink.container.iggy_sink_close(id);
info!("Closed sink connector with ID: {id} for plugin: {key}");
let sink_keys: Vec<String> = context
.sinks
.get_all()
.await
.into_iter()
.map(|s| s.key)
.collect();
for key in &sink_keys {
if let Err(err) = context
.sinks
.stop_connector_with_guard(key, &context.metrics)
.await
{
error!("Failed to stop sink connector: {key}. {err}");
}
}

Expand Down Expand Up @@ -400,11 +430,6 @@ struct SinkConnectorWrapper {
plugins: Vec<SinkConnectorPlugin>,
}

struct SinkWithPlugins {
container: Container<SinkApi>,
plugin_ids: Vec<u32>,
}

struct SourceConnector {
container: Container<SourceApi>,
plugins: Vec<SourceConnectorPlugin>,
Expand All @@ -429,11 +454,6 @@ struct SourceConnectorProducer {
producer: IggyProducer,
}

struct SourceWithPlugins {
container: Container<SourceApi>,
plugin_ids: Vec<u32>,
}

struct SourceConnectorWrapper {
callback: HandleCallback,
plugins: Vec<SourceConnectorPlugin>,
Expand Down
Loading
Loading