Skip to content

Commit 2ddb9f9

Browse files
committed
test(connectors): add restart integration test for sink connector
Signed-off-by: shin <sars21@hanmail.net>
1 parent 274d03a commit 2ddb9f9

File tree

2 files changed

+164
-0
lines changed

2 files changed

+164
-0
lines changed

core/integration/tests/connectors/postgres/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
mod postgres_sink;
2121
mod postgres_source;
22+
mod restart;
2223

2324
use crate::connectors::TestMessage;
2425
use serde::Deserialize;
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
use super::{POLL_ATTEMPTS, POLL_INTERVAL_MS, TEST_MESSAGE_COUNT};
21+
use crate::connectors::fixtures::{PostgresOps, PostgresSinkFixture};
22+
use crate::connectors::{TestMessage, create_test_messages};
23+
use bytes::Bytes;
24+
use iggy::prelude::{IggyMessage, Partitioning};
25+
use iggy_binary_protocol::MessageClient;
26+
use iggy_common::Identifier;
27+
use iggy_connector_sdk::api::{ConnectorStatus, SinkInfoResponse};
28+
use integration::harness::seeds;
29+
use integration::iggy_harness;
30+
use reqwest::Client;
31+
use std::time::Duration;
32+
use tokio::time::sleep;
33+
34+
const API_KEY: &str = "test-api-key";
35+
const SINK_TABLE: &str = "iggy_messages";
36+
const SINK_KEY: &str = "postgres";
37+
38+
type SinkRow = (i64, String, String, Vec<u8>);
39+
40+
async fn wait_for_sink_status(
41+
http: &Client,
42+
api_url: &str,
43+
expected: ConnectorStatus,
44+
) -> SinkInfoResponse {
45+
for _ in 0..POLL_ATTEMPTS {
46+
if let Ok(resp) = http
47+
.get(format!("{api_url}/sinks/{SINK_KEY}"))
48+
.header("api-key", API_KEY)
49+
.send()
50+
.await
51+
&& let Ok(info) = resp.json::<SinkInfoResponse>().await
52+
&& info.status == expected
53+
{
54+
return info;
55+
}
56+
sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
57+
}
58+
panic!("Sink connector did not reach {expected:?} status in time");
59+
}
60+
61+
fn build_messages(messages_data: &[TestMessage], id_offset: usize) -> Vec<IggyMessage> {
62+
messages_data
63+
.iter()
64+
.enumerate()
65+
.map(|(i, msg)| {
66+
let payload = serde_json::to_vec(msg).expect("Failed to serialize message");
67+
IggyMessage::builder()
68+
.id((id_offset + i + 1) as u128)
69+
.payload(Bytes::from(payload))
70+
.build()
71+
.expect("Failed to build message")
72+
})
73+
.collect()
74+
}
75+
76+
#[iggy_harness(
77+
server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")),
78+
seed = seeds::connector_stream
79+
)]
80+
async fn restart_sink_connector_continues_processing(
81+
harness: &TestHarness,
82+
fixture: PostgresSinkFixture,
83+
) {
84+
let client = harness.root_client().await.unwrap();
85+
let api_url = harness
86+
.connectors_runtime()
87+
.expect("connector runtime should be available")
88+
.http_url();
89+
let http = Client::new();
90+
let pool = fixture.create_pool().await.expect("Failed to create pool");
91+
92+
fixture.wait_for_table(&pool, SINK_TABLE).await;
93+
94+
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
95+
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
96+
97+
wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
98+
99+
let first_batch = create_test_messages(TEST_MESSAGE_COUNT);
100+
let mut messages = build_messages(&first_batch, 0);
101+
client
102+
.send_messages(
103+
&stream_id,
104+
&topic_id,
105+
&Partitioning::partition_id(0),
106+
&mut messages,
107+
)
108+
.await
109+
.expect("Failed to send first batch");
110+
111+
let query = format!(
112+
"SELECT iggy_offset, iggy_stream, iggy_topic, payload FROM {SINK_TABLE} ORDER BY iggy_offset"
113+
);
114+
let rows: Vec<SinkRow> = fixture
115+
.fetch_rows_as(&pool, &query, TEST_MESSAGE_COUNT)
116+
.await
117+
.expect("Failed to fetch first batch rows");
118+
119+
assert_eq!(
120+
rows.len(),
121+
TEST_MESSAGE_COUNT,
122+
"Expected {TEST_MESSAGE_COUNT} rows before restart"
123+
);
124+
125+
let resp = http
126+
.post(format!("{api_url}/sinks/{SINK_KEY}/restart"))
127+
.header("api-key", API_KEY)
128+
.send()
129+
.await
130+
.expect("Failed to call restart endpoint");
131+
132+
assert_eq!(
133+
resp.status().as_u16(),
134+
204,
135+
"Restart endpoint should return 204 No Content"
136+
);
137+
138+
wait_for_sink_status(&http, &api_url, ConnectorStatus::Running).await;
139+
140+
let second_batch = create_test_messages(TEST_MESSAGE_COUNT);
141+
let mut messages = build_messages(&second_batch, TEST_MESSAGE_COUNT);
142+
client
143+
.send_messages(
144+
&stream_id,
145+
&topic_id,
146+
&Partitioning::partition_id(0),
147+
&mut messages,
148+
)
149+
.await
150+
.expect("Failed to send second batch");
151+
152+
let total_expected = TEST_MESSAGE_COUNT * 2;
153+
let rows: Vec<SinkRow> = fixture
154+
.fetch_rows_as(&pool, &query, total_expected)
155+
.await
156+
.expect("Failed to fetch rows after restart");
157+
158+
assert!(
159+
rows.len() >= total_expected,
160+
"Expected at least {total_expected} rows after restart, got {}",
161+
rows.len()
162+
);
163+
}

0 commit comments

Comments
 (0)