Skip to content

Commit f7e5ba8

Browse files
committed
test(connectors): add parallel restart test and move helpers to bottom
Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
1 parent e06aeef commit f7e5ba8

File tree

1 file changed

+111
-36
lines changed
  • core/integration/tests/connectors/postgres

1 file changed

+111
-36
lines changed

core/integration/tests/connectors/postgres/restart.rs

Lines changed: 111 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -37,42 +37,6 @@ const SINK_KEY: &str = "postgres";
3737

3838
type SinkRow = (i64, String, String, Vec<u8>);
3939

40-
async fn wait_for_sink_status(
41-
http: &Client,
42-
api_url: &str,
43-
expected: ConnectorStatus,
44-
) -> SinkInfoResponse {
45-
for _ in 0..POLL_ATTEMPTS {
46-
if let Ok(resp) = http
47-
.get(format!("{api_url}/sinks/{SINK_KEY}"))
48-
.header("api-key", API_KEY)
49-
.send()
50-
.await
51-
&& let Ok(info) = resp.json::<SinkInfoResponse>().await
52-
&& info.status == expected
53-
{
54-
return info;
55-
}
56-
sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
57-
}
58-
panic!("Sink connector did not reach {expected:?} status in time");
59-
}
60-
61-
fn build_messages(messages_data: &[TestMessage], id_offset: usize) -> Vec<IggyMessage> {
62-
messages_data
63-
.iter()
64-
.enumerate()
65-
.map(|(i, msg)| {
66-
let payload = serde_json::to_vec(msg).expect("Failed to serialize message");
67-
IggyMessage::builder()
68-
.id((id_offset + i + 1) as u128)
69-
.payload(Bytes::from(payload))
70-
.build()
71-
.expect("Failed to build message")
72-
})
73-
.collect()
74-
}
75-
7640
#[iggy_harness(
7741
server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")),
7842
seed = seeds::connector_stream
@@ -161,3 +125,114 @@ async fn restart_sink_connector_continues_processing(
161125
rows.len()
162126
);
163127
}
128+
129+
#[iggy_harness(
130+
server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")),
131+
seed = seeds::connector_stream
132+
)]
133+
async fn parallel_restart_requests_should_not_break_connector(
134+
harness: &TestHarness,
135+
fixture: PostgresSinkFixture,
136+
) {
137+
let client = harness.root_client().await.unwrap();
138+
let api_url = harness
139+
.connectors_runtime()
140+
.expect("connector runtime should be available")
141+
.http_url();
142+
let http = Client::new();
143+
let pool = fixture.create_pool().await.expect("Failed to create pool");
144+
145+
fixture.wait_for_table(&pool, SINK_TABLE).await;
146+
147+
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
148+
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
149+
150+
wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
151+
152+
let mut tasks = Vec::new();
153+
for _ in 0..5 {
154+
let http = http.clone();
155+
let url = format!("{api_url}/sinks/{SINK_KEY}/restart");
156+
tasks.push(tokio::spawn(async move {
157+
http.post(&url)
158+
.header("api-key", API_KEY)
159+
.send()
160+
.await
161+
.expect("Failed to call restart endpoint")
162+
}));
163+
}
164+
165+
let responses = futures::future::join_all(tasks).await;
166+
for resp in responses {
167+
let resp = resp.expect("Task panicked");
168+
assert_eq!(
169+
resp.status().as_u16(),
170+
204,
171+
"All restart requests should return 204"
172+
);
173+
}
174+
175+
wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
176+
177+
let batch = create_test_messages(TEST_MESSAGE_COUNT);
178+
let mut messages = build_messages(&batch, 0);
179+
client
180+
.send_messages(
181+
&stream_id,
182+
&topic_id,
183+
&Partitioning::partition_id(0),
184+
&mut messages,
185+
)
186+
.await
187+
.expect("Failed to send messages after parallel restarts");
188+
189+
let query = format!(
190+
"SELECT iggy_offset, iggy_stream, iggy_topic, payload FROM {SINK_TABLE} ORDER BY iggy_offset"
191+
);
192+
let rows: Vec<SinkRow> = fixture
193+
.fetch_rows_as(&pool, &query, TEST_MESSAGE_COUNT)
194+
.await
195+
.expect("Failed to fetch rows after parallel restarts");
196+
197+
assert!(
198+
rows.len() >= TEST_MESSAGE_COUNT,
199+
"Expected at least {TEST_MESSAGE_COUNT} rows after parallel restarts, got {}",
200+
rows.len()
201+
);
202+
}
203+
204+
async fn wait_for_sink_status(
205+
http: &Client,
206+
api_url: &str,
207+
expected: ConnectorStatus,
208+
) -> SinkInfoResponse {
209+
for _ in 0..POLL_ATTEMPTS {
210+
if let Ok(resp) = http
211+
.get(format!("{api_url}/sinks/{SINK_KEY}"))
212+
.header("api-key", API_KEY)
213+
.send()
214+
.await
215+
&& let Ok(info) = resp.json::<SinkInfoResponse>().await
216+
&& info.status == expected
217+
{
218+
return info;
219+
}
220+
sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
221+
}
222+
panic!("Sink connector did not reach {expected:?} status in time");
223+
}
224+
225+
fn build_messages(messages_data: &[TestMessage], id_offset: usize) -> Vec<IggyMessage> {
226+
messages_data
227+
.iter()
228+
.enumerate()
229+
.map(|(i, msg)| {
230+
let payload = serde_json::to_vec(msg).expect("Failed to serialize message");
231+
IggyMessage::builder()
232+
.id((id_offset + i + 1) as u128)
233+
.payload(Bytes::from(payload))
234+
.build()
235+
.expect("Failed to build message")
236+
})
237+
.collect()
238+
}

0 commit comments

Comments
 (0)