Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
handle initial sequence number
  • Loading branch information
zhjwpku committed Apr 15, 2025
commit 296d04f21226aa3883ed2cf1dc7068b019f247d0
16 changes: 11 additions & 5 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/json_internal.h"

#include <cstdint>
#include <format>
#include <regex>
#include <unordered_set>
Expand Down Expand Up @@ -82,6 +83,8 @@ constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep";
constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";

constexpr int64_t kInitialSequenceNumber = 0;

const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = {
SnapshotSummaryFields::kOperation,
SnapshotSummaryFields::kAddedDataFiles,
Expand Down Expand Up @@ -324,7 +327,9 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
nlohmann::json json;
json[kSnapshotId] = snapshot.snapshot_id;
SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
json[kSequenceNumber] = snapshot.sequence_number;
if (snapshot.sequence_number > kInitialSequenceNumber) {
json[kSequenceNumber] = snapshot.sequence_number;
}
json[kTimestampMs] = snapshot.timestamp_ms;
json[kManifestList] = snapshot.manifest_list;
// If there is an operation, write the summary map
Expand Down Expand Up @@ -552,7 +557,7 @@ Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& j
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be consistent with the Java impl: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SnapshotParser.java. Specifically, we need to deal with cases where sequence number or summary is missing.

@Fokko Will it actually happen that a snapshot does not have summary (and thus operation is also missing)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the spec, summary is required for v2 and v3 but optional for v1. So I believe the spec answers my question. We have to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct!

  • For V1 the summary is optional.
  • For V2/V3 the summary is required, and also the operation. Some writers produced some malformed metadata in the past. Instead of throwing an exception, we would it is an overwrite operation, since that's the most generic one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we need to set operation to overwrite when summary is available but operation is missing. @zhjwpku

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, please take a look.

ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
GetJsonValue<int64_t>(json, kSequenceNumber));
GetJsonValueOptional<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue<int64_t>(json, kTimestampMs));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
GetJsonValue<std::string>(json, kManifestList));
Expand Down Expand Up @@ -591,9 +596,10 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {

ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));

return std::make_unique<Snapshot>(snapshot_id, parent_snapshot_id, sequence_number,
timestamp_ms, manifest_list, std::move(summary),
schema_id);
return std::make_unique<Snapshot>(
snapshot_id, parent_snapshot_id,
sequence_number.has_value() ? *sequence_number : kInitialSequenceNumber,
timestamp_ms, manifest_list, std::move(summary), schema_id);
}

} // namespace iceberg
Loading