Skip to content
203 changes: 174 additions & 29 deletions src/iceberg/arrow/s3/arrow_s3_file_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/

#include <cstdlib>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>

#include <arrow/filesystem/filesystem.h>
#if ICEBERG_S3_ENABLED
Expand All @@ -36,9 +40,10 @@

namespace iceberg::arrow {

#if ICEBERG_S3_ENABLED

namespace {

#if ICEBERG_S3_ENABLED
const std::string* FindProperty(
const std::unordered_map<std::string, std::string>& properties,
std::string_view key) {
Expand Down Expand Up @@ -74,6 +79,24 @@ Status EnsureS3Initialized() {
return {};
}

// Splits any URI scheme off `endpoint` into `options.scheme`, returning the bare
// host[:port] that Arrow's `endpoint_override` expects.
std::string SplitEndpointScheme(std::string_view endpoint,
::arrow::fs::S3Options& options) {
if (const auto pos = endpoint.find("://"); pos != std::string_view::npos) {
options.scheme = std::string(endpoint.substr(0, pos));
endpoint = endpoint.substr(pos + 3);
}
return std::string(endpoint);
}

bool IsS3FileIOCredentialPrefix(std::string_view prefix) {
return prefix == "s3" || prefix.starts_with("s3://") || prefix.starts_with("s3a://") ||
prefix.starts_with("s3n://");
}

} // namespace

/// \brief Configure S3Options from a properties map.
///
/// \param properties The configuration properties map.
Expand All @@ -100,26 +123,21 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
}

// Configure region
if (const auto* region = FindProperty(properties, S3Properties::kRegion);
if (const auto* region = FindProperty(properties, S3Properties::kClientRegion);
region != nullptr) {
options.region = *region;
}

// Configure endpoint (for MinIO, LocalStack, etc.)
if (const auto* endpoint = FindProperty(properties, S3Properties::kEndpoint);
endpoint != nullptr) {
options.endpoint_override = *endpoint;
} else {
// Fall back to AWS standard environment variables for endpoint override
const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3");
if (s3_endpoint_env != nullptr) {
options.endpoint_override = s3_endpoint_env;
} else {
const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL");
if (endpoint_env != nullptr) {
options.endpoint_override = endpoint_env;
}
}
options.endpoint_override = SplitEndpointScheme(*endpoint, options);
} else if (const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3");
s3_endpoint_env != nullptr) {
options.endpoint_override = SplitEndpointScheme(s3_endpoint_env, options);
} else if (const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL");
endpoint_env != nullptr) {
options.endpoint_override = SplitEndpointScheme(endpoint_env, options);
}

ICEBERG_ASSIGN_OR_RAISE(const auto path_style_access,
Expand All @@ -128,11 +146,11 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
options.force_virtual_addressing = !*path_style_access;
}

// Configure SSL
// Explicit `s3.ssl.enabled` overrides any endpoint-derived scheme.
ICEBERG_ASSIGN_OR_RAISE(const auto ssl_enabled,
ParseOptionalBool(properties, S3Properties::kSslEnabled));
if (ssl_enabled.has_value() && !*ssl_enabled) {
options.scheme = "http";
if (ssl_enabled.has_value()) {
options.scheme = *ssl_enabled ? "https" : "http";
}

// Configure timeouts
Expand All @@ -152,33 +170,160 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(

return options;
}
#endif

} // namespace
namespace {

Result<std::unique_ptr<FileIO>> MakeS3FileIO(
Result<std::shared_ptr<::arrow::fs::FileSystem>> BuildArrowS3FileSystem(
const std::unordered_map<std::string, std::string>& properties) {
#if ICEBERG_S3_ENABLED
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());

// Configure S3 options from properties (uses default credentials if empty)
ICEBERG_ASSIGN_OR_RAISE(auto options, ConfigureS3Options(properties));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options));
return std::shared_ptr<::arrow::fs::FileSystem>(std::move(fs));
}

return std::make_unique<ArrowFileSystemFileIO>(std::move(fs));
#else
return NotSupported("Arrow S3 support is not enabled");
#endif
std::string CanonicalizeS3Scheme(std::string_view location) {
for (std::string_view scheme : {"s3a://", "s3n://", "oss://"}) {
if (location.starts_with(scheme)) {
return std::string("s3://").append(location.substr(scheme.size()));
}
}
return std::string(location);
}

class ArrowS3FileIO final : public FileIO, public SupportsStorageCredentials {
public:
ArrowS3FileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs,
std::unordered_map<std::string, std::string> default_properties)
: default_file_io_(std::move(arrow_fs)),
default_properties_(std::move(default_properties)) {}

Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location) override;

Result<std::unique_ptr<InputFile>> NewInputFile(std::string file_location,
size_t length) override;

Result<std::unique_ptr<OutputFile>> NewOutputFile(std::string file_location) override;

Status DeleteFile(const std::string& file_location) override;

Status DeleteFiles(const std::vector<std::string>& file_locations) override;

Status SetStorageCredentials(
const std::vector<StorageCredential>& storage_credentials) override;

const std::vector<StorageCredential>& credentials() const override {
return storage_credentials_;
}

SupportsStorageCredentials* AsSupportsStorageCredentials() override { return this; }

private:
ArrowFileSystemFileIO& FileIOForPath(std::string_view location);

ArrowFileSystemFileIO default_file_io_;
std::unordered_map<std::string, std::string> default_properties_;
std::vector<StorageCredential> storage_credentials_;
std::vector<std::pair<std::string, std::unique_ptr<ArrowFileSystemFileIO>>>
file_io_by_prefix_;
};

Status ArrowS3FileIO::SetStorageCredentials(
const std::vector<StorageCredential>& storage_credentials) {
std::vector<std::pair<std::string, std::unique_ptr<ArrowFileSystemFileIO>>>
file_io_by_prefix;
file_io_by_prefix.reserve(storage_credentials.size());
// TODO(gangwu): Refresh vended credentials via credentials.uri before tokens expire.
for (const auto& credential : storage_credentials) {
ICEBERG_RETURN_UNEXPECTED(credential.Validate());
if (!IsS3FileIOCredentialPrefix(credential.prefix)) {
return NotSupported(
"Storage credential prefix '{}' is unsupported by Arrow S3 FileIO",
credential.prefix);
}
auto properties = default_properties_;
for (const auto& [key, value] : credential.config) {
properties[key] = value;
}
ICEBERG_ASSIGN_OR_RAISE(auto fs, BuildArrowS3FileSystem(properties));
file_io_by_prefix.emplace_back(
CanonicalizeS3Scheme(credential.prefix),
std::make_unique<ArrowFileSystemFileIO>(std::move(fs)));
}
file_io_by_prefix_ = std::move(file_io_by_prefix);
storage_credentials_ = storage_credentials;
return {};
}

ArrowFileSystemFileIO& ArrowS3FileIO::FileIOForPath(std::string_view location) {
if (file_io_by_prefix_.empty()) {
return default_file_io_;
}
const std::string canonical = CanonicalizeS3Scheme(location);
ArrowFileSystemFileIO* best = &default_file_io_;
size_t best_len = 0;
for (const auto& [prefix, file_io] : file_io_by_prefix_) {
if (prefix.size() > best_len && canonical.starts_with(prefix)) {
best = file_io.get();
best_len = prefix.size();
}
}
return *best;
}

Result<std::unique_ptr<InputFile>> ArrowS3FileIO::NewInputFile(
std::string file_location) {
return FileIOForPath(file_location).NewInputFile(std::move(file_location));
}

Result<std::unique_ptr<InputFile>> ArrowS3FileIO::NewInputFile(std::string file_location,
size_t length) {
return FileIOForPath(file_location).NewInputFile(std::move(file_location), length);
}

Result<std::unique_ptr<OutputFile>> ArrowS3FileIO::NewOutputFile(
std::string file_location) {
return FileIOForPath(file_location).NewOutputFile(std::move(file_location));
}

Status ArrowS3FileIO::DeleteFile(const std::string& file_location) {
return FileIOForPath(file_location).DeleteFile(file_location);
}

Status ArrowS3FileIO::DeleteFiles(const std::vector<std::string>& file_locations) {
std::unordered_map<ArrowFileSystemFileIO*, std::vector<std::string>> locations_by_io;
for (const auto& file_location : file_locations) {
locations_by_io[&FileIOForPath(file_location)].push_back(file_location);
}
for (auto& [file_io, locations] : locations_by_io) {
ICEBERG_RETURN_UNEXPECTED(file_io->DeleteFiles(locations));
}
return {};
}

} // namespace

Result<std::unique_ptr<FileIO>> MakeS3FileIO(
const std::unordered_map<std::string, std::string>& properties) {
// Uses default credentials if properties are empty.
ICEBERG_ASSIGN_OR_RAISE(auto fs, BuildArrowS3FileSystem(properties));
return std::make_unique<ArrowS3FileIO>(std::move(fs), properties);
}

Status FinalizeS3() {
#if ICEBERG_S3_ENABLED
auto status = ::arrow::fs::FinalizeS3();
ICEBERG_ARROW_RETURN_NOT_OK(status);
return {};
}

#else

Result<std::unique_ptr<FileIO>> MakeS3FileIO(
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
return NotSupported("Arrow S3 support is not enabled");
#endif
}

Status FinalizeS3() { return NotSupported("Arrow S3 support is not enabled"); }

#endif

} // namespace iceberg::arrow
4 changes: 2 additions & 2 deletions src/iceberg/arrow/s3/s3_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ struct S3Properties {
static constexpr std::string_view kSecretAccessKey = "s3.secret-access-key";
/// AWS session token (for temporary credentials)
static constexpr std::string_view kSessionToken = "s3.session-token";
/// AWS region
static constexpr std::string_view kRegion = "s3.region";
/// AWS region, standard Iceberg client property.
static constexpr std::string_view kClientRegion = "client.region";
/// Custom endpoint override (for MinIO, LocalStack, etc.)
static constexpr std::string_view kEndpoint = "s3.endpoint";
/// Whether to use path-style access (needed for MinIO)
Expand Down
36 changes: 36 additions & 0 deletions src/iceberg/catalog/rest/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ constexpr std::string_view kSource = "source";
constexpr std::string_view kDestination = "destination";
constexpr std::string_view kMetadata = "metadata";
constexpr std::string_view kConfig = "config";
constexpr std::string_view kStorageCredentials = "storage-credentials";
constexpr std::string_view kPrefix = "prefix";
constexpr std::string_view kIdentifiers = "identifiers";
constexpr std::string_view kOverrides = "overrides";
constexpr std::string_view kDefaults = "defaults";
Expand Down Expand Up @@ -133,6 +135,23 @@ constexpr std::string_view kResidualFilter = "residual-filter";
constexpr std::string_view kMapKeys = "keys";
constexpr std::string_view kMapValues = "values";

Result<nlohmann::json> StorageCredentialToJson(const StorageCredential& credential) {
ICEBERG_RETURN_UNEXPECTED(credential.Validate());
nlohmann::json json;
json[kPrefix] = credential.prefix;
json[kConfig] = credential.config;
return json;
}

Result<StorageCredential> StorageCredentialFromJson(const nlohmann::json& json) {
StorageCredential credential;
ICEBERG_ASSIGN_OR_RAISE(credential.prefix, GetJsonValue<std::string>(json, kPrefix));
ICEBERG_ASSIGN_OR_RAISE(credential.config,
GetJsonValue<decltype(credential.config)>(json, kConfig));
ICEBERG_RETURN_UNEXPECTED(credential.Validate());
return credential;
}

template <typename Value>
Result<std::map<int32_t, Value>> KeyValueMapFromJson(const nlohmann::json& json,
std::string_view key) {
Expand Down Expand Up @@ -695,6 +714,14 @@ Result<nlohmann::json> ToJson(const LoadTableResult& result) {
SetOptionalStringField(json, kMetadataLocation, result.metadata_location);
ICEBERG_ASSIGN_OR_RAISE(json[kMetadata], ToJson(*result.metadata));
SetContainerField(json, kConfig, result.config);
if (!result.storage_credentials.empty()) {
nlohmann::json creds = nlohmann::json::array();
for (const auto& cred : result.storage_credentials) {
ICEBERG_ASSIGN_OR_RAISE(auto entry, StorageCredentialToJson(cred));
creds.push_back(std::move(entry));
}
json[kStorageCredentials] = std::move(creds);
}
return json;
}

Expand All @@ -707,6 +734,15 @@ Result<LoadTableResult> LoadTableResultFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json));
ICEBERG_ASSIGN_OR_RAISE(result.config,
GetJsonValueOrDefault<decltype(result.config)>(json, kConfig));
if (auto it = json.find(kStorageCredentials); it != json.end() && !it->is_null()) {
if (!it->is_array()) {
return JsonParseError("Cannot parse storage credentials from non-array");
}
for (const auto& entry : *it) {
ICEBERG_ASSIGN_OR_RAISE(auto cred, StorageCredentialFromJson(entry));
result.storage_credentials.push_back(std::move(cred));
}
}
ICEBERG_RETURN_UNEXPECTED(result.Validate());
return result;
}
Expand Down
Loading
Loading