Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/paimon/core/operation/file_store_commit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "paimon/core/operation/file_store_commit_impl.h"
#include "paimon/core/schema/schema_manager.h"
#include "paimon/core/schema/table_schema.h"
#include "paimon/core/table/bucket_mode.h"
#include "paimon/core/utils/field_mapping.h"
#include "paimon/core/utils/file_store_path_factory.h"
#include "paimon/core/utils/snapshot_manager.h"
Expand Down Expand Up @@ -68,7 +69,12 @@ Result<std::unique_ptr<FileStoreCommit>> FileStoreCommit::Create(
const auto& schema = table_schema.value();
if (!schema->PrimaryKeys().empty() &&
ctx->GetOptions().find("enable-pk-commit-in-inte-test") == ctx->GetOptions().end()) {
return Status::NotImplemented("not support pk table commit yet");
// Postpone bucket mode (bucket=-2) writes all data files to the bucket-postpone/ directory.
// A compaction job will later redistribute files into real buckets. The commit logic
// (manifest and snapshot generation) is the same as append tables, so we allow it.
if (schema->NumBuckets() != BucketModeDefine::POSTPONE_BUCKET) {
return Status::NotImplemented("not support pk table commit yet");
}
}
auto opts = schema->Options();
for (const auto& [key, value] : ctx->GetOptions()) {
Expand Down
58 changes: 58 additions & 0 deletions src/paimon/core/operation/file_store_commit_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1688,4 +1688,62 @@ TEST_F(FileStoreCommitImplTest, TestObjectStoreAllowedWithRESTCatalogCommit) {
ASSERT_FALSE(json.empty());
}

// Verify that FileStoreCommit::Create succeeds for PK tables with postpone bucket mode (bucket=-2)
// without requiring the enable-pk-commit-in-inte-test workaround flag.
TEST_F(FileStoreCommitImplTest, TestPostponeBucketPKTableCommitAllowed) {
auto pk_dir = UniqueTestDirectory::Create();
ASSERT_TRUE(pk_dir);
std::string pk_root = pk_dir->Str();
ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {}));
ASSERT_OK(catalog->CreateDatabase("db", {}, false));

arrow::Schema pk_schema(
{arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())});
::ArrowSchema arrow_schema;
ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok());
std::map<std::string, std::string> table_options = {{Options::BUCKET, "-2"}};
ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl"), &arrow_schema,
/*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options,
/*ignore_if_exists=*/false));

std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl");

// Create FileStoreCommit WITHOUT the workaround flag — should succeed for postpone bucket
CommitContextBuilder builder(pk_table_path, "test_user");
builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true);
ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish());
ASSERT_OK_AND_ASSIGN(auto committer, FileStoreCommit::Create(std::move(commit_context)));
ASSERT_TRUE(committer != nullptr);
}

// Verify that FileStoreCommit::Create still rejects PK tables with fixed bucket (bucket > 0)
// when the workaround flag is not set.
TEST_F(FileStoreCommitImplTest, TestFixedBucketPKTableCommitRejected) {
auto pk_dir = UniqueTestDirectory::Create();
ASSERT_TRUE(pk_dir);
std::string pk_root = pk_dir->Str();
ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(pk_root, {}));
ASSERT_OK(catalog->CreateDatabase("db", {}, false));

arrow::Schema pk_schema(
{arrow::field("pk", arrow::int32()), arrow::field("val", arrow::utf8())});
::ArrowSchema arrow_schema;
ASSERT_TRUE(arrow::ExportSchema(pk_schema, &arrow_schema).ok());
std::map<std::string, std::string> table_options = {{Options::BUCKET, "4"}};
ASSERT_OK(catalog->CreateTable(Identifier("db", "pk_tbl_fixed"), &arrow_schema,
/*partition_keys=*/{}, /*primary_keys=*/{"pk"}, table_options,
/*ignore_if_exists=*/false));

std::string pk_table_path = PathUtil::JoinPath(pk_root, "db.db/pk_tbl_fixed");

CommitContextBuilder builder(pk_table_path, "test_user");
builder.AddOption(Options::FILE_SYSTEM, "local").UseRESTCatalogCommit(true);
ASSERT_OK_AND_ASSIGN(auto commit_context, builder.Finish());
auto result = FileStoreCommit::Create(std::move(commit_context));
ASSERT_FALSE(result.ok());
ASSERT_TRUE(result.status().IsNotImplemented());
ASSERT_TRUE(result.status().ToString().find("not support pk table commit") !=
std::string::npos);
}
Comment thread
liujiayi771 marked this conversation as resolved.

} // namespace paimon::test
Loading