Skip to content

Commit 1a58799

Browse files
committed
refactor(integration): migrate connectors tests to iggy_harness proc macro
Eliminates boilerplate in connector tests by leveraging the same proc macro infrastructure used for MCP tests. Connector-specific fixtures (postgres, wiremock, random) moved to tests/ since they're not part of core harness. MCP and connectors runtime are now owned by ServerHandle rather than TestHarness, enabling per-node dependent binaries in cluster setups. Also switches test output capture from env_logger to tracing-subscriber for consistency with server logging.
1 parent c51e45f commit 1a58799

File tree

35 files changed

+1509
-1746
lines changed

35 files changed

+1509
-1746
lines changed

Cargo.lock

Lines changed: 4 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

DEPENDENCIES.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,6 @@ embedded-io: 0.6.1, "Apache-2.0 OR MIT",
255255
encode_unicode: 1.0.0, "Apache-2.0 OR MIT",
256256
encoding_rs: 0.8.35, "(Apache-2.0 OR MIT) AND BSD-3-Clause",
257257
enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
258-
env_filter: 0.1.4, "Apache-2.0 OR MIT",
259-
env_logger: 0.11.8, "Apache-2.0 OR MIT",
260258
equivalent: 1.0.2, "Apache-2.0 OR MIT",
261259
err_trail: 0.11.0, "Apache-2.0",
262260
errno: 0.3.14, "Apache-2.0 OR MIT",

core/harness_derive/src/codegen.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ fn generate_harness_setup(
476476
#config_resolution
477477
let mut __harness = ::integration::harness::TestHarness::builder()
478478
.server(#server_config)
479-
.primary_client(#client_config)
479+
.client(#client_config)
480480
#mcp_builder_call
481481
#connectors_runtime_builder_call
482482
#cluster_builder_call
@@ -526,6 +526,12 @@ fn generate_start_and_seed(attrs: &IggyTestAttrs, fixture_seed: TokenStream) ->
526526
}
527527
}
528528

529+
fn fixture_var_ident(name: &syn::Ident) -> syn::Ident {
530+
let name_str = name.to_string();
531+
let clean_name = name_str.trim_start_matches('_');
532+
format_ident!("__fixture_{}", clean_name)
533+
}
534+
529535
/// Generate fixture setup calls (before harness setup).
530536
fn generate_fixture_setup(params: &[DetectedParam]) -> TokenStream {
531537
let fixtures = fixture_params(params);
@@ -537,7 +543,7 @@ fn generate_fixture_setup(params: &[DetectedParam]) -> TokenStream {
537543
.iter()
538544
.filter_map(|p| {
539545
if let DetectedParam::Fixture { name, ty } = p {
540-
let var_name = format_ident!("__fixture_{}", name);
546+
let var_name = fixture_var_ident(name);
541547
Some(quote! {
542548
let #var_name = <#ty as ::integration::harness::TestFixture>::setup()
543549
.await
@@ -563,7 +569,7 @@ fn generate_fixture_envs_collection(params: &[DetectedParam]) -> TokenStream {
563569
.iter()
564570
.filter_map(|p| {
565571
if let DetectedParam::Fixture { name, .. } = p {
566-
let var_name = format_ident!("__fixture_{}", name);
572+
let var_name = fixture_var_ident(name);
567573
Some(quote! {
568574
__fixture_envs.extend(
569575
::integration::harness::TestFixture::connectors_runtime_envs(&#var_name)
@@ -622,7 +628,7 @@ fn generate_param_bindings(params: &[DetectedParam]) -> TokenStream {
622628
});
623629
}
624630
DetectedParam::Fixture { name, .. } => {
625-
let fixture_var = format_ident!("__fixture_{}", name);
631+
let fixture_var = fixture_var_ident(name);
626632
bindings.push(quote! {
627633
let #name = #fixture_var;
628634
});

core/integration/Cargo.toml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,15 @@ ci-qemu = []
2828

2929
[dependencies]
3030
ahash = { workspace = true }
31-
anyhow = { workspace = true }
3231
assert_cmd = { workspace = true }
3332
async-trait = { workspace = true }
3433
bon = { workspace = true }
3534
bytes = { workspace = true }
36-
chrono = { workspace = true }
3735
compio = { workspace = true }
3836
configs = { workspace = true }
3937
configs_derive = { workspace = true }
4038
ctor = { workspace = true }
4139
derive_more = { workspace = true }
42-
env_logger = { workspace = true }
4340
figment = { workspace = true }
4441
futures = { workspace = true }
4542
harness_derive = { workspace = true }
@@ -50,7 +47,6 @@ iggy_common = { workspace = true }
5047
keyring = { workspace = true }
5148
lazy_static = { workspace = true }
5249
libc = { workspace = true }
53-
log = { workspace = true }
5450
once_cell = { workspace = true }
5551
predicates = { workspace = true }
5652
rand = { workspace = true }
@@ -71,12 +67,12 @@ server = { workspace = true }
7167
socket2 = { workspace = true }
7268
sqlx = { workspace = true }
7369
strum = { workspace = true }
74-
strum_macros = { workspace = true }
7570
tempfile = { workspace = true }
7671
test-case = { workspace = true }
7772
testcontainers-modules = { workspace = true }
7873
tokio = { workspace = true }
7974
tracing = { workspace = true }
75+
tracing-subscriber = { workspace = true }
8076
twox-hash = { workspace = true }
8177
uuid = { workspace = true }
8278
zip = { workspace = true }

core/integration/src/harness/context.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -115,24 +115,27 @@ impl TestContext {
115115
&self.base_dir
116116
}
117117

118-
pub fn mcp_stdout_path(&self) -> PathBuf {
119-
self.base_dir.join("mcp_stdout.log")
118+
pub fn mcp_stdout_path(&self, server_id: u32) -> PathBuf {
119+
self.base_dir.join(format!("mcp_{server_id}_stdout.log"))
120120
}
121121

122-
pub fn mcp_stderr_path(&self) -> PathBuf {
123-
self.base_dir.join("mcp_stderr.log")
122+
pub fn mcp_stderr_path(&self, server_id: u32) -> PathBuf {
123+
self.base_dir.join(format!("mcp_{server_id}_stderr.log"))
124124
}
125125

126-
pub fn connectors_runtime_state_path(&self) -> PathBuf {
127-
self.base_dir.join("connectors_state")
126+
pub fn connectors_runtime_state_path(&self, server_id: u32) -> PathBuf {
127+
self.base_dir
128+
.join(format!("connectors_runtime_{server_id}_state"))
128129
}
129130

130-
pub fn connectors_runtime_stdout_path(&self) -> PathBuf {
131-
self.base_dir.join("connectors_stdout.log")
131+
pub fn connectors_runtime_stdout_path(&self, server_id: u32) -> PathBuf {
132+
self.base_dir
133+
.join(format!("connectors_runtime_{server_id}_stdout.log"))
132134
}
133135

134-
pub fn connectors_runtime_stderr_path(&self) -> PathBuf {
135-
self.base_dir.join("connectors_stderr.log")
136+
pub fn connectors_runtime_stderr_path(&self, server_id: u32) -> PathBuf {
137+
self.base_dir
138+
.join(format!("connectors_runtime_{server_id}_stderr.log"))
136139
}
137140

138141
pub fn test_stdout_path(&self) -> PathBuf {
@@ -185,10 +188,11 @@ mod tests {
185188
#[test]
186189
fn test_context_paths() {
187190
let ctx = TestContext::new(Some("test_context_paths".to_string()), true).unwrap();
188-
assert!(ctx.mcp_stdout_path().ends_with("mcp_stdout.log"));
191+
assert!(ctx.mcp_stdout_path(0).ends_with("mcp_0_stdout.log"));
189192
assert!(
190-
ctx.connectors_runtime_state_path()
191-
.ends_with("connectors_state")
193+
ctx.connectors_runtime_state_path(0)
194+
.ends_with("connectors_runtime_0_state")
192195
);
196+
assert!(ctx.mcp_stdout_path(1).ends_with("mcp_1_stdout.log"));
193197
}
194198
}

core/integration/src/harness/fixtures/mod.rs renamed to core/integration/src/harness/fixture.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,6 @@
1717
* under the License.
1818
*/
1919

20-
mod postgres;
21-
mod random;
22-
mod wiremock;
23-
24-
pub use postgres::{
25-
PostgresContainer, PostgresSinkByteaFixture, PostgresSinkFixture, PostgresSinkJsonFixture,
26-
PostgresSourceByteaFixture, PostgresSourceDeleteFixture, PostgresSourceFixture,
27-
PostgresSourceJsonFixture, PostgresSourceJsonbFixture, PostgresSourceMarkFixture,
28-
SinkPayloadFormat, SinkSchema,
29-
};
30-
pub use random::RandomSourceFixture;
31-
pub use wiremock::{WireMockDirectFixture, WireMockWrappedFixture};
32-
3320
use crate::harness::error::TestBinaryError;
3421
use crate::harness::seeds::SeedError;
3522
use async_trait::async_trait;

core/integration/src/harness/handle/connectors_runtime.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use tokio::time::sleep;
3737
const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
3838

3939
pub struct ConnectorsRuntimeHandle {
40+
server_id: u32,
4041
config: ConnectorsRuntimeConfig,
4142
context: Arc<TestContext>,
4243
envs: HashMap<String, String>,
@@ -72,15 +73,15 @@ impl ConnectorsRuntimeHandle {
7273
}
7374

7475
pub fn state_path(&self) -> PathBuf {
75-
self.context.connectors_runtime_state_path()
76+
self.context.connectors_runtime_state_path(self.server_id)
7677
}
7778

7879
pub fn collect_logs(&self) -> (String, String) {
7980
common::collect_logs(&self.stdout_path, &self.stderr_path)
8081
}
8182

8283
fn build_envs(&mut self) {
83-
let state_path = self.context.connectors_runtime_state_path();
84+
let state_path = self.context.connectors_runtime_state_path(self.server_id);
8485
self.envs.insert(
8586
"IGGY_CONNECTORS_STATE_PATH".to_string(),
8687
state_path.display().to_string(),
@@ -108,15 +109,18 @@ impl ConnectorsRuntimeHandle {
108109
}
109110
}
110111

111-
impl TestBinary for ConnectorsRuntimeHandle {
112-
type Config = ConnectorsRuntimeConfig;
113-
114-
fn with_config(config: Self::Config, context: Arc<TestContext>) -> Self {
112+
impl ConnectorsRuntimeHandle {
113+
pub fn with_server_id(
114+
config: ConnectorsRuntimeConfig,
115+
context: Arc<TestContext>,
116+
server_id: u32,
117+
) -> Self {
115118
let reserver =
116119
SinglePortReserver::new().expect("Failed to reserve port for connectors runtime");
117120
let server_address = reserver.address();
118121

119122
Self {
123+
server_id,
120124
config,
121125
context,
122126
envs: HashMap::new(),
@@ -128,6 +132,14 @@ impl TestBinary for ConnectorsRuntimeHandle {
128132
port_reserver: Some(reserver),
129133
}
130134
}
135+
}
136+
137+
impl TestBinary for ConnectorsRuntimeHandle {
138+
type Config = ConnectorsRuntimeConfig;
139+
140+
fn with_config(config: Self::Config, context: Arc<TestContext>) -> Self {
141+
Self::with_server_id(config, context, 0)
142+
}
131143

132144
fn start(&mut self) -> Result<(), TestBinaryError> {
133145
self.build_envs();
@@ -151,8 +163,8 @@ impl TestBinary for ConnectorsRuntimeHandle {
151163
command.stdout(Stdio::inherit());
152164
command.stderr(Stdio::inherit());
153165
} else {
154-
let stdout_path = self.context.connectors_runtime_stdout_path();
155-
let stderr_path = self.context.connectors_runtime_stderr_path();
166+
let stdout_path = self.context.connectors_runtime_stdout_path(self.server_id);
167+
let stderr_path = self.context.connectors_runtime_stderr_path(self.server_id);
156168

157169
let stdout_file =
158170
File::create(&stdout_path).map_err(|e| TestBinaryError::FileSystemError {

core/integration/src/harness/handle/mcp.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const MCP_HEALTH_CHECK_RETRIES: u32 = common::DEFAULT_HEALTH_CHECK_RETRIES * 3;
4545
pub type McpClient = RunningService<RoleClient, InitializeRequestParams>;
4646

4747
pub struct McpHandle {
48+
server_id: u32,
4849
config: McpConfig,
4950
context: Arc<TestContext>,
5051
envs: HashMap<String, String>,
@@ -135,14 +136,13 @@ impl McpHandle {
135136
}
136137
}
137138

138-
impl TestBinary for McpHandle {
139-
type Config = McpConfig;
140-
141-
fn with_config(config: Self::Config, context: Arc<TestContext>) -> Self {
139+
impl McpHandle {
140+
pub fn with_server_id(config: McpConfig, context: Arc<TestContext>, server_id: u32) -> Self {
142141
let reserver = SinglePortReserver::new().expect("Failed to reserve port for MCP server");
143142
let server_address = reserver.address();
144143

145144
Self {
145+
server_id,
146146
config,
147147
context,
148148
envs: HashMap::new(),
@@ -154,6 +154,14 @@ impl TestBinary for McpHandle {
154154
port_reserver: Some(reserver),
155155
}
156156
}
157+
}
158+
159+
impl TestBinary for McpHandle {
160+
type Config = McpConfig;
161+
162+
fn with_config(config: Self::Config, context: Arc<TestContext>) -> Self {
163+
Self::with_server_id(config, context, 0)
164+
}
157165

158166
#[allow(deprecated)]
159167
fn start(&mut self) -> Result<(), TestBinaryError> {
@@ -177,8 +185,8 @@ impl TestBinary for McpHandle {
177185
command.stdout(Stdio::inherit());
178186
command.stderr(Stdio::inherit());
179187
} else {
180-
let stdout_path = self.context.mcp_stdout_path();
181-
let stderr_path = self.context.mcp_stderr_path();
188+
let stdout_path = self.context.mcp_stdout_path(self.server_id);
189+
let stderr_path = self.context.mcp_stderr_path(self.server_id);
182190

183191
let stdout_file =
184192
File::create(&stdout_path).map_err(|e| TestBinaryError::FileSystemError {

0 commit comments

Comments
 (0)