Skip to content

Commit 1ae3328

Browse files
committed
fix(connectors): support default credential provider chain for iceberg sink
1 parent 411a697 commit 1ae3328

File tree

9 files changed

+185
-29
lines changed

9 files changed

+185
-29
lines changed

core/connectors/sinks/iceberg_sink/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ pub struct IcebergSinkConfig {
6060
pub dynamic_routing: bool,
6161
pub dynamic_route_field: String,
6262
pub store_url: String,
63-
pub store_access_key_id: String,
64-
pub store_secret_access_key: String,
63+
pub store_access_key_id: Option<String>,
64+
pub store_secret_access_key: Option<String>,
6565
pub store_region: String,
6666
pub store_class: IcebergSinkStoreClass,
6767
}

core/connectors/sinks/iceberg_sink/src/props.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ pub fn init_props(config: &IcebergSinkConfig) -> Result<HashMap<String, String>,
3030
fn get_props_s3(config: &IcebergSinkConfig) -> Result<HashMap<String, String>, Error> {
3131
let mut props: HashMap<String, String> = HashMap::new();
3232
props.insert("s3.region".to_string(), config.store_region.clone());
33-
props.insert(
34-
"s3.access-key-id".to_string(),
35-
config.store_access_key_id.clone(),
36-
);
37-
props.insert(
38-
"s3.secret-access-key".to_string(),
39-
config.store_secret_access_key.clone(),
40-
);
33+
if let Some(access_key_id) = &config.store_access_key_id {
34+
props.insert("s3.access-key-id".to_string(), access_key_id.clone());
35+
}
36+
if let Some(secret_access_key) = &config.store_secret_access_key {
37+
props.insert(
38+
"s3.secret-access-key".to_string(),
39+
secret_access_key.clone(),
40+
);
41+
}
4142
props.insert("s3.endpoint".to_string(), config.store_url.clone());
4243
Ok(props)
4344
}

core/connectors/sinks/iceberg_sink/src/sink.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,22 @@ use tracing::{debug, error, info};
3030
#[async_trait]
3131
impl Sink for IcebergSink {
3232
async fn open(&mut self) -> Result<(), Error> {
33-
let redacted_store_key = self
34-
.config
35-
.store_access_key_id
36-
.chars()
37-
.take(3)
38-
.collect::<String>();
39-
let redacted_store_secret = self
40-
.config
41-
.store_secret_access_key
42-
.chars()
43-
.take(3)
44-
.collect::<String>();
45-
info!(
46-
"Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***",
47-
self.id, self.config.uri
48-
);
33+
if let (Some(store_access_key_id), Some(store_secret_access_key)) = (
34+
&self.config.store_access_key_id,
35+
&self.config.store_secret_access_key,
36+
) {
37+
let redacted_store_key = store_access_key_id.chars().take(3).collect::<String>();
38+
let redacted_store_secret = store_secret_access_key.chars().take(3).collect::<String>();
39+
info!(
40+
"Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***",
41+
self.id, self.config.uri
42+
);
43+
} else {
44+
info!(
45+
"Opened Iceberg sink connector with ID: {} for URL: {}. No explicit credentials provided, falling back to default credential provider chain",
46+
self.id, self.config.uri
47+
);
48+
}
4949

5050
info!(
5151
"Configuring Iceberg catalog with the following config:\n-region: {}\n-url: {}\n-store class: {}\n-catalog type: {}\n",

core/integration/tests/connectors/fixtures/iceberg/container.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ pub const ENV_SINK_STORE_SECRET: &str =
5151
pub const ENV_SINK_STORE_REGION: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PLUGIN_CONFIG_STORE_REGION";
5252
pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PATH";
5353

54+
pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
55+
pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";
56+
5457
pub struct MinioContainer {
5558
#[allow(dead_code)]
5659
container: ContainerAsync<GenericImage>,
@@ -527,3 +530,39 @@ impl TestFixture for IcebergPreCreatedFixture {
527530
self.inner.connectors_runtime_envs()
528531
}
529532
}
533+
534+
pub struct IcebergEnvAuthFixture {
535+
inner: IcebergPreCreatedFixture,
536+
}
537+
538+
impl IcebergOps for IcebergEnvAuthFixture {
539+
fn catalog_url(&self) -> &str {
540+
self.inner.catalog_url()
541+
}
542+
543+
fn http_client(&self) -> &HttpClient {
544+
self.inner.http_client()
545+
}
546+
}
547+
548+
#[async_trait]
549+
impl TestFixture for IcebergEnvAuthFixture {
550+
async fn setup() -> Result<Self, TestBinaryError> {
551+
let inner = IcebergPreCreatedFixture::setup().await?;
552+
// Set credentials before test server initialization.
553+
unsafe {
554+
std::env::set_var(ENV_AWS_ACCESS_KEY_ID, MINIO_ACCESS_KEY);
555+
std::env::set_var(ENV_AWS_SECRET_ACCESS_KEY, MINIO_SECRET_KEY);
556+
}
557+
Ok(Self { inner })
558+
}
559+
560+
fn connectors_runtime_envs(&self) -> HashMap<String, String> {
561+
let mut envs = self.inner.connectors_runtime_envs();
562+
// Remove the explicit credentials injected by the underlying fixture.
563+
// This forces the Iceberg Sink to use the default credential provider chain instead of explicit config.
564+
envs.remove(ENV_SINK_STORE_ACCESS_KEY);
565+
envs.remove(ENV_SINK_STORE_SECRET);
566+
envs
567+
}
568+
}

core/integration/tests/connectors/fixtures/iceberg/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@
1919

2020
mod container;
2121

22-
pub use container::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture};
22+
pub use container::{
23+
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
24+
};

core/integration/tests/connectors/fixtures/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ mod quickwit;
2525
mod wiremock;
2626

2727
pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture};
28-
pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture};
28+
pub use iceberg::{
29+
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
30+
};
2931
pub use mongodb::{
3032
MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture,
3133
MongoDbSinkFixture, MongoDbSinkJsonFixture, MongoDbSinkWriteConcernFixture,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
type = "sink"
19+
key = "iceberg"
20+
enabled = true
21+
version = 0
22+
name = "Iceberg sink"
23+
path = "../../target/release/libiggy_connector_iceberg_sink"
24+
verbose = true
25+
26+
[[streams]]
27+
stream = "test_stream"
28+
topics = ["test_topic"]
29+
schema = "json"
30+
batch_length = 100
31+
poll_interval = "5ms"
32+
consumer_group = "iceberg_sink_connector"
33+
34+
# Notice: This configuration deliberately omits 'store_access_key_id' and 'store_secret_access_key'.
35+
# It is used exclusively by integration tests to test the default credential provider chain fallback behavior.
36+
[plugin_config]
37+
tables = ["test.messages"]
38+
catalog_type = "rest"
39+
warehouse = "warehouse"
40+
uri = "http://localhost:8181"
41+
dynamic_routing = false
42+
dynamic_route_field = ""
43+
store_url = "http://localhost:9000"
44+
store_region = "us-east-1"
45+
store_class = "s3"

core/integration/tests/connectors/iceberg/iceberg_sink.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use crate::connectors::create_test_messages;
2121
use crate::connectors::fixtures::{
22-
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture,
22+
DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture,
2323
};
2424
use bytes::Bytes;
2525
use iggy::prelude::{IggyMessage, Partitioning};
@@ -207,3 +207,50 @@ async fn iceberg_sink_handles_bulk_messages(
207207
assert_eq!(sinks.len(), 1);
208208
assert!(sinks[0].last_error.is_none());
209209
}
210+
211+
#[iggy_harness(
212+
server(connectors_runtime(config_path = "tests/connectors/iceberg/sink_default_credentials.toml")),
213+
seed = seeds::connector_stream
214+
)]
215+
async fn iceberg_sink_uses_default_credential_chain(
216+
harness: &TestHarness,
217+
fixture: IcebergEnvAuthFixture,
218+
) {
219+
let client = harness.root_client().await.unwrap();
220+
let stream_id: iggy_common::Identifier = seeds::names::STREAM.try_into().unwrap();
221+
let topic_id: iggy_common::Identifier = seeds::names::TOPIC.try_into().unwrap();
222+
let test_messages = crate::connectors::create_test_messages(5);
223+
let mut messages: Vec<IggyMessage> = test_messages
224+
.iter()
225+
.enumerate()
226+
.map(|(i, msg)| {
227+
let payload = serde_json::to_vec(msg).unwrap();
228+
IggyMessage::builder()
229+
.id((i + 1) as u128)
230+
.payload(bytes::Bytes::from(payload))
231+
.build()
232+
.unwrap()
233+
})
234+
.collect();
235+
client
236+
.send_messages(
237+
&stream_id,
238+
&topic_id,
239+
&Partitioning::partition_id(0),
240+
&mut messages,
241+
)
242+
.await
243+
.expect("Failed to send fake messages");
244+
let snapshot_count = fixture
245+
.wait_for_snapshots(
246+
DEFAULT_NAMESPACE,
247+
DEFAULT_TABLE,
248+
1,
249+
SNAPSHOT_POLL_ATTEMPTS,
250+
SNAPSHOT_POLL_INTERVAL_MS,
251+
)
252+
.await
253+
.expect("Data should be written to Iceberg table");
254+
assert!(snapshot_count >= 1);
255+
drop(fixture);
256+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[connectors]
19+
config_type = "local"
20+
config_dir = "tests/connectors/iceberg/default_credentials_config"

0 commit comments

Comments
 (0)