Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/paimon/format/parquet/page_filtered_row_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,18 +316,20 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange
for (int32_t col_idx : column_indices) {
auto col_chunk = rg_metadata->ColumnChunk(col_idx);
int64_t data_page_offset = col_chunk->data_page_offset();
int64_t total_compressed_size = col_chunk->total_compressed_size();
int64_t chunk_end = data_page_offset + total_compressed_size;

int64_t data_page_compressed_size = col_chunk->total_compressed_size();
// Dictionary page: always include if present
if (col_chunk->has_dictionary_page()) {
int64_t dict_offset = col_chunk->dictionary_page_offset();
int64_t dict_size = data_page_offset - dict_offset;
if (dict_size > 0) {
// if dictionary exists, the data page size should be reduced by the dictionary
data_page_compressed_size -= dict_size;
ranges.push_back({dict_offset, dict_size});
}
}

int64_t chunk_end = data_page_offset + data_page_compressed_size;
Comment thread
zhf999 marked this conversation as resolved.

// Try to get OffsetIndex for page-level ranges
std::shared_ptr<::parquet::OffsetIndex> offset_index;
if (rg_page_index_reader) {
Expand All @@ -336,7 +338,7 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange

if (!offset_index) {
// No OffsetIndex: fall back to entire column chunk
ranges.push_back({data_page_offset, total_compressed_size});
ranges.push_back({data_page_offset, data_page_compressed_size});
continue;
}

Expand Down
170 changes: 170 additions & 0 deletions src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,4 +719,174 @@ TEST_F(PageFilteredRowGroupReaderTest, EndToEndPageLevelPreBuffer) {
ASSERT_EQ(10, offset);
}

/// Test: ComputePageRanges with dictionary encoding produces correct chunk_end.
///
/// When dictionary encoding is enabled, the column chunk layout is:
/// [Dictionary Page] [Data Page 0] [Data Page 1] ... [Data Page N]
/// And total_compressed_size covers the entire chunk starting from dictionary_page_offset.
///
/// The bug: chunk_end = data_page_offset + total_compressed_size is wrong because
/// total_compressed_size already includes the dictionary page size. The correct
/// chunk_end should be dictionary_page_offset + total_compressed_size.
///
/// This test verifies that:
/// 1. No range exceeds the true chunk boundary (overshoot regression).
/// 2. At least one non-dictionary data-page range is present (not truncated).
/// 3. The maximum range_end equals true_chunk_end when requesting all rows.
/// 4. End-to-end reads with page-level filtering return correct query results.
TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesWithDictionaryEncoding) {
std::string file_name = dir_->Str() + "/compute_ranges_dict.parquet";

// Use low-cardinality data to ensure dictionary encoding is actually used.
// 100 rows with values cycling through 0..9 → dictionary will have 10 entries.
arrow::Int32Builder val_builder;
ASSERT_TRUE(val_builder.Reserve(100).ok());
for (int32_t i = 0; i < 100; ++i) {
val_builder.UnsafeAppend(i % 10);
}
auto val_array = val_builder.Finish().ValueOrDie();
auto field = arrow::field("val", arrow::int32());
auto struct_array = arrow::StructArray::Make({val_array}, {field}).ValueOrDie();
Comment thread
lxy-9602 marked this conversation as resolved.

// Write with dictionary encoding enabled (the key difference from other tests).
auto data_type = struct_array->struct_type();
auto data_schema = arrow::schema(data_type->fields());
auto data_arrow_array = std::make_unique<ArrowArray>();
ASSERT_TRUE(arrow::ExportArray(*struct_array, data_arrow_array.get()).ok());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
fs_->Create(file_name, /*overwrite=*/false));
Comment thread
zhf999 marked this conversation as resolved.
::parquet::WriterProperties::Builder builder;
builder.write_batch_size(10);
builder.max_row_group_length(100);
builder.enable_dictionary(); // Enable dictionary → triggers the bug
builder.enable_write_page_index();
builder.data_pagesize(1); // Force small pages
auto writer_properties = builder.build();
ASSERT_OK_AND_ASSIGN(
auto format_writer,
ParquetFormatWriter::Create(out, data_schema, writer_properties,
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, arrow_pool_));
ASSERT_OK(format_writer->AddBatch(data_arrow_array.get()));
ASSERT_OK(format_writer->Finish());
ASSERT_OK(out->Close());

// Open the file and verify metadata confirms dictionary page presence
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs_->Open(file_name));
ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length());
auto in_stream = std::make_shared<ArrowInputStreamAdapter>(in, arrow_pool_, length);
auto parquet_reader = ::parquet::ParquetFileReader::Open(in_stream);
ASSERT_TRUE(parquet_reader);

auto file_metadata = parquet_reader->metadata();
auto rg_metadata = file_metadata->RowGroup(0);
auto col_chunk = rg_metadata->ColumnChunk(0);

// Precondition: dictionary page must exist for this test to be meaningful
ASSERT_TRUE(col_chunk->has_dictionary_page())
<< "Dictionary page not present - test setup error";

int64_t dict_offset = col_chunk->dictionary_page_offset();
int64_t data_page_offset = col_chunk->data_page_offset();
int64_t total_compressed_size = col_chunk->total_compressed_size();

// The true chunk end is dict_offset + total_compressed_size
int64_t true_chunk_end = dict_offset + total_compressed_size;
// The buggy chunk end would be data_page_offset + total_compressed_size
int64_t buggy_chunk_end = data_page_offset + total_compressed_size;

// Sanity: dict page is before data pages, so buggy end > true end
ASSERT_LT(dict_offset, data_page_offset)
<< "Dictionary offset should be before data page offset";
ASSERT_GT(buggy_chunk_end, true_chunk_end)
<< "Buggy chunk_end should exceed true chunk_end when dictionary is present";

// Now call ComputePageRanges with all rows matching
RowRanges row_ranges;
row_ranges.Add(RowRanges::Range(0, 99));

auto ranges = PageFilteredRowGroupReader::ComputePageRanges(
parquet_reader.get(), /*row_group_index=*/0, row_ranges, /*column_indices=*/{0});

ASSERT_FALSE(ranges.empty());

// --- Check 1: No range should extend beyond the true chunk end ---
// With the bug, the last data page's range would use chunk_end = data_page_offset +
// total_compressed_size, which overshoots by the dictionary page size.
for (size_t i = 0; i < ranges.size(); ++i) {
int64_t range_end = ranges[i].offset + ranges[i].length;
EXPECT_LE(range_end, true_chunk_end)
<< "Range " << i << " [offset=" << ranges[i].offset << ", length=" << ranges[i].length
<< "] exceeds true chunk end (" << true_chunk_end << "). "
<< "This indicates chunk_end is computed as data_page_offset + "
"total_compressed_size instead of dictionary_page_offset + "
"total_compressed_size.";
}
Comment thread
zhf999 marked this conversation as resolved.

// --- Check 2: At least one non-dictionary data-page range is present ---
// Guards against truncation: if only the dictionary range is returned, the test
// would still pass the overshoot check but miss that data pages are lost.
int data_page_range_count = 0;
for (const auto& range : ranges) {
if (range.offset >= data_page_offset) {
++data_page_range_count;
}
}
ASSERT_GE(data_page_range_count, 1)
<< "Expected at least one data-page range (offset >= " << data_page_offset
<< "), but only dictionary range(s) were returned.";

// --- Check 3: Maximum range_end equals true_chunk_end when requesting all rows ---
int64_t max_range_end = 0;
for (const auto& range : ranges) {
int64_t range_end = range.offset + range.length;
max_range_end = std::max(max_range_end, range_end);
}
ASSERT_EQ(max_range_end, true_chunk_end)
<< "When requesting all rows, the maximum range_end should exactly equal "
<< "true_chunk_end (" << true_chunk_end << "), but got " << max_range_end
<< ". The last data page range may be truncated or missing.";

// --- Check 4: No range exceeds file size ---
for (const auto& range : ranges) {
EXPECT_LE(range.offset + range.length, static_cast<int64_t>(length))
<< "Range exceeds file size";
}

// --- End-to-end check 1: read all rows (no predicate filtering) ---
// Verifies that reading a dictionary-encoded file with page index enabled
// returns all 100 rows with correct values.
auto read_schema = arrow::schema({field});
auto predicate_all = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(0));
std::shared_ptr<arrow::ChunkedArray> result_all;
ReadWithPredicateImpl(file_name, read_schema, predicate_all, &result_all);
ASSERT_TRUE(result_all);
ASSERT_EQ(100, result_all->length())
<< "End-to-end read with dictionary encoding should return all 100 rows";

// --- End-to-end check 2: manual row-level filtering ---
// Page-level filter does not do row-level filtering. Verify data content by
// scanning all returned rows and checking val == 5 appears exactly 10 times
// (val = i % 10, so rows 5,15,25,...,95 have val == 5).
int32_t count_val5 = 0;
int64_t total_rows_checked = 0;
for (int i = 0; i < result_all->num_chunks(); ++i) {
auto struct_arr = std::dynamic_pointer_cast<arrow::StructArray>(result_all->chunk(i));
ASSERT_TRUE(struct_arr);
auto val_arr = std::dynamic_pointer_cast<arrow::Int32Array>(struct_arr->field(0));
ASSERT_TRUE(val_arr);
for (int64_t j = 0; j < val_arr->length(); ++j) {
auto expected = static_cast<int32_t>(total_rows_checked % 10);
ASSERT_EQ(expected, val_arr->Value(j))
<< "Value mismatch at row " << total_rows_checked;
if (val_arr->Value(j) == 5) {
++count_val5;
}
++total_rows_checked;
}
}
ASSERT_EQ(100, total_rows_checked);
ASSERT_EQ(10, count_val5) << "val == 5 should appear exactly 10 times in 100 rows";
}

} // namespace paimon::parquet::test
Loading