Skip to content
Open
32 changes: 11 additions & 21 deletions src/paimon/format/parquet/column_index_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Result<RowRanges> ColumnIndexFilter::VisitLeafPredicate(
const auto& literals = leaf_predicate->Literals();
FieldType field_type = leaf_predicate->GetFieldType();

if (function_type != Function::Type::IS_NULL && function_type != Function::Type::IS_NOT_NULL &&
literals.empty()) {
return Status::Invalid(
fmt::format("predicate on column '{}' requires at least one literal", field_name));
}
std::vector<int32_t> matching_pages;

switch (function_type) {
Expand All @@ -106,37 +111,22 @@ Result<RowRanges> ColumnIndexFilter::VisitLeafPredicate(
matching_pages = FilterPagesByIsNotNull(column_index_ptr);
break;
case Function::Type::EQUAL:
if (!literals.empty()) {
matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::NOT_EQUAL:
if (!literals.empty()) {
matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::LESS_THAN:
if (!literals.empty()) {
matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type);
break;
case Function::Type::LESS_OR_EQUAL:
if (!literals.empty()) {
matching_pages =
FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::GREATER_THAN:
if (!literals.empty()) {
matching_pages =
FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type);
break;
case Function::Type::GREATER_OR_EQUAL:
if (!literals.empty()) {
matching_pages =
FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::IN:
matching_pages = FilterPagesByIn(column_index_ptr, literals, field_type);
Expand Down
22 changes: 22 additions & 0 deletions src/paimon/format/parquet/column_index_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
#include "gtest/gtest.h"
#include "paimon/common/predicate/equal.h"
#include "paimon/common/predicate/in.h"
#include "paimon/common/predicate/leaf_predicate_impl.h"
#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
#include "paimon/common/utils/arrow/mem_utils.h"
#include "paimon/defs.h"
Expand Down Expand Up @@ -480,4 +483,23 @@ TEST_F(ColumnIndexFilterTest, NullPredicateReturnsAllRows) {
EXPECT_EQ(row_group_row_count_, ranges.RowCount());
}

/// Predicates other than IsNull/IsNotNull are not allowed without a literal.
/// PredicateBuilder (public API) does not support constructing them without
/// a literal, so the filter should return an error for this invalid input.
TEST_F(ColumnIndexFilterTest, EmptyLiteralsReturnsError) {
auto pred = std::make_shared<paimon::LeafPredicateImpl>(paimon::Equal::Instance(), 0, "val",
FieldType::INT, std::vector<Literal>());
Comment thread
zhf999 marked this conversation as resolved.
auto result = Filter(pred);
EXPECT_FALSE(result.ok());
}

/// Empty literals for IN predicate — same rule applies: non-IS_NULL/IS_NOT_NULL
/// predicates without literals are invalid and should return an error.
TEST_F(ColumnIndexFilterTest, EmptyLiteralsInReturnsError) {
auto pred = std::make_shared<paimon::LeafPredicateImpl>(paimon::In::Instance(), 0, "val",
FieldType::INT, std::vector<Literal>());
auto result = Filter(pred);
EXPECT_FALSE(result.ok());
}

} // namespace paimon::parquet::test
12 changes: 1 addition & 11 deletions src/paimon/format/parquet/file_reader_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,13 @@
#include "fmt/format.h"
#include "paimon/format/parquet/column_index_filter.h"
#include "paimon/format/parquet/page_filtered_row_group_reader.h"
#include "paimon/format/parquet/parquet_format_defs.h"
#include "paimon/macros.h"
#include "parquet/arrow/reader.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/page_index.h"

// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a
// Status. Used as the trailing catch clauses of a try block in every public
// method that calls into the parquet C++ API, so the read layer never throws.
#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \
catch (const std::exception& e) { \
return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \
} \
catch (...) { \
return Status::UnknownError((context), ": unknown error"); \
}

namespace paimon::parquet {

namespace {
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/format/parquet/page_filtered_row_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ Result<std::unique_ptr<arrow::RecordBatchReader>> PageFilteredRowGroupReader::Re
// Pre-buffering failed, fall back to row-group level PreBuffer
::arrow::io::IOContext io_ctx(pool);
parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options);
PAIMON_RETURN_NOT_OK_FROM_ARROW(
parquet_reader->WhenBuffered(rg_vec, col_vec).status());
}
} else {
PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status());
Expand Down
35 changes: 12 additions & 23 deletions src/paimon/format/parquet/parquet_file_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,6 @@
#include "parquet/arrow/reader.h"
#include "parquet/properties.h"

// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a
// Status. Used as the trailing catch clauses of a try block in every public
// method that calls into the parquet C++ API, so the read layer never throws.
#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \
catch (const std::exception& e) { \
return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \
} \
catch (...) { \
return Status::UnknownError((context), ": unknown error"); \
}

namespace arrow {
class MemoryPool;
} // namespace arrow
Expand Down Expand Up @@ -159,18 +148,6 @@ Status ParquetFileBatchReader::SetReadSchema(
}
}

// Build column name to index map for page-level filtering.
// For leaf columns, indices[0] is the correct leaf column index in Parquet.
// For nested types (struct/list/map), FlattenSchema produces multiple leaf indices,
// but predicate pushdown only targets leaf columns with simple types, so indices[0]
// is always the correct single leaf index for predicate evaluation.
std::map<std::string, int32_t> column_name_to_index;
for (const auto& [name, indices] : field_index_map) {
if (!indices.empty()) {
column_name_to_index[name] = indices[0];
}
}

std::vector<int32_t> row_groups = arrow::internal::Iota(reader_->GetNumberOfRowGroups());
if (predicate) {
PAIMON_ASSIGN_OR_RAISE(row_groups,
Expand All @@ -188,6 +165,18 @@ Status ParquetFileBatchReader::SetReadSchema(
OptionsUtils::GetValueFromMap<bool>(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER,
DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER));
if (enable_page_index_filter) {
// Build column name to index map for page-level filtering.
// For leaf columns, indices[0] is the correct leaf column index in Parquet.
// For nested types (struct/list/map), FlattenSchema produces multiple leaf indices,
// but predicate pushdown only targets leaf columns with simple types, so indices[0]
// is always the correct single leaf index for predicate evaluation.
std::map<std::string, int32_t> column_name_to_index;
for (const auto& [name, indices] : field_index_map) {
if (!indices.empty()) {
column_name_to_index[name] = indices[0];
}
}

PAIMON_ASSIGN_OR_RAISE(
auto page_filter_result,
FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups));
Expand Down
14 changes: 14 additions & 0 deletions src/paimon/format/parquet/parquet_format_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,22 @@
#include <cstdint>
#include <limits>

#include "fmt/format.h"
#include "paimon/status.h"

namespace paimon::parquet {

// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a
// Status. Used as the trailing catch clauses of a try block in every public
// method that calls into the parquet C++ API, so the read layer never throws.
#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \
catch (const std::exception& e) { \
return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \
} \
catch (...) { \
return Status::UnknownError(fmt::format("{}: unknown error", (context))); \
}

// write
static inline const char PARQUET_BLOCK_SIZE[] = "parquet.block.size";
static inline const char PARQUET_PAGE_SIZE[] = "parquet.page.size";
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/format/parquet/row_ranges.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class RowRanges {
/// Creates a RowRanges from a list of ranges.
explicit RowRanges(const std::vector<Range>& ranges) : ranges_(ranges) {}

/// Creates a RowRanges from a list of ranges, taking ownership of the vector.
explicit RowRanges(std::vector<Range>&& ranges) : ranges_(std::move(ranges)) {}

/// Creates a RowRanges with a single range [0, row_count - 1].
static RowRanges CreateSingle(int64_t row_count) {
if (row_count <= 0) {
Expand Down
Loading