Skip to content

Commit 36f746c

Browse files
authored
feat: add manifest evaluator (#403)
1 parent 42aa1d0 commit 36f746c

File tree

11 files changed

+940
-1
lines changed

11 files changed

+940
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ set(ICEBERG_SOURCES
2727
expression/expressions.cc
2828
expression/inclusive_metrics_evaluator.cc
2929
expression/literal.cc
30+
expression/manifest_evaluator.cc
3031
expression/predicate.cc
3132
expression/residual_evaluator.cc
3233
expression/rewrite_not.cc
Lines changed: 384 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,384 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/expression/manifest_evaluator.h"
21+
22+
#include "iceberg/expression/binder.h"
23+
#include "iceberg/expression/expression_visitor.h"
24+
#include "iceberg/expression/rewrite_not.h"
25+
#include "iceberg/manifest/manifest_list.h"
26+
#include "iceberg/row/struct_like.h"
27+
#include "iceberg/schema.h"
28+
#include "iceberg/util/macros.h"
29+
30+
namespace iceberg {
31+
32+
namespace {
33+
constexpr bool kRowsMightMatch = true;
34+
constexpr bool kRowCannotMatch = false;
35+
constexpr int32_t kInPredicateLimit = 200;
36+
} // namespace
37+
38+
class ManifestEvalVisitor : public BoundVisitor<bool> {
39+
public:
40+
explicit ManifestEvalVisitor(const ManifestFile& manifest)
41+
: stats_(manifest.partitions) {}
42+
43+
Result<bool> AlwaysTrue() override { return kRowsMightMatch; }
44+
45+
Result<bool> AlwaysFalse() override { return kRowCannotMatch; }
46+
47+
Result<bool> Not(bool child_result) override { return !child_result; }
48+
49+
Result<bool> And(bool left_result, bool right_result) override {
50+
return left_result && right_result;
51+
}
52+
53+
Result<bool> Or(bool left_result, bool right_result) override {
54+
return left_result || right_result;
55+
}
56+
57+
Result<bool> IsNull(const std::shared_ptr<Bound>& expr) override {
58+
// no need to check whether the field is required because binding evaluates that case
59+
// if the column has no null values, the expression cannot match
60+
const auto& ref = expr->reference();
61+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
62+
if (!stats_.at(pos).contains_null) {
63+
return kRowCannotMatch;
64+
}
65+
66+
return kRowsMightMatch;
67+
}
68+
69+
Result<bool> NotNull(const std::shared_ptr<Bound>& expr) override {
70+
const auto& ref = expr->reference();
71+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
72+
if (AllValuesAreNull(stats_.at(pos), ref->type()->type_id())) {
73+
return kRowCannotMatch;
74+
}
75+
76+
return kRowsMightMatch;
77+
}
78+
79+
Result<bool> IsNaN(const std::shared_ptr<Bound>& expr) override {
80+
const auto& ref = expr->reference();
81+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
82+
if (stats_.at(pos).contains_nan.has_value() && !stats_.at(pos).contains_nan.value()) {
83+
return kRowCannotMatch;
84+
}
85+
if (AllValuesAreNull(stats_.at(pos), ref->type()->type_id())) {
86+
return kRowCannotMatch;
87+
}
88+
89+
return kRowsMightMatch;
90+
}
91+
92+
Result<bool> NotNaN(const std::shared_ptr<Bound>& expr) override {
93+
const auto& ref = expr->reference();
94+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
95+
const auto& summary = stats_.at(pos);
96+
// if containsNaN is true, containsNull is false and lowerBound is null, all values
97+
// are NaN
98+
if (summary.contains_nan.has_value() && summary.contains_nan.value() &&
99+
!summary.contains_null && !summary.lower_bound.has_value()) {
100+
return kRowCannotMatch;
101+
}
102+
103+
return kRowsMightMatch;
104+
}
105+
106+
Result<bool> Lt(const std::shared_ptr<Bound>& expr, const Literal& lit) override {
107+
const auto& ref = expr->reference();
108+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
109+
const auto& summary = stats_.at(pos);
110+
if (!summary.lower_bound.has_value()) {
111+
return kRowCannotMatch; // values are all null
112+
}
113+
ICEBERG_ASSIGN_OR_RAISE(
114+
auto lower, DeserializeBoundLiteral(summary.lower_bound.value(), ref->type()));
115+
if (lower >= lit) {
116+
return kRowCannotMatch;
117+
}
118+
return kRowsMightMatch;
119+
}
120+
121+
Result<bool> LtEq(const std::shared_ptr<Bound>& expr, const Literal& lit) override {
122+
const auto& ref = expr->reference();
123+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
124+
const auto& summary = stats_.at(pos);
125+
if (!summary.lower_bound.has_value()) {
126+
return kRowCannotMatch; // values are all null
127+
}
128+
ICEBERG_ASSIGN_OR_RAISE(
129+
auto lower, DeserializeBoundLiteral(summary.lower_bound.value(), ref->type()));
130+
if (lower > lit) {
131+
return kRowCannotMatch;
132+
}
133+
return kRowsMightMatch;
134+
}
135+
136+
Result<bool> Gt(const std::shared_ptr<Bound>& expr, const Literal& lit) override {
137+
const auto& ref = expr->reference();
138+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
139+
const auto& summary = stats_.at(pos);
140+
if (!summary.upper_bound.has_value()) {
141+
return kRowCannotMatch; // values are all null
142+
}
143+
ICEBERG_ASSIGN_OR_RAISE(
144+
auto upper, DeserializeBoundLiteral(summary.upper_bound.value(), ref->type()));
145+
if (upper <= lit) {
146+
return kRowCannotMatch;
147+
}
148+
return kRowsMightMatch;
149+
}
150+
151+
Result<bool> GtEq(const std::shared_ptr<Bound>& expr, const Literal& lit) override {
152+
const auto& ref = expr->reference();
153+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
154+
const auto& summary = stats_.at(pos);
155+
if (!summary.upper_bound.has_value()) {
156+
return kRowCannotMatch; // values are all null
157+
}
158+
ICEBERG_ASSIGN_OR_RAISE(
159+
auto upper,
160+
DeserializeBoundLiteral(summary.upper_bound.value(), expr->reference()->type()));
161+
if (upper < lit) {
162+
return kRowCannotMatch;
163+
}
164+
return kRowsMightMatch;
165+
}
166+
167+
Result<bool> Eq(const std::shared_ptr<Bound>& expr, const Literal& lit) override {
168+
const auto& ref = expr->reference();
169+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
170+
const auto& summary = stats_.at(pos);
171+
if (!summary.lower_bound.has_value() || !summary.upper_bound.has_value()) {
172+
return kRowCannotMatch; // values are all null and literal cannot contain null
173+
}
174+
ICEBERG_ASSIGN_OR_RAISE(
175+
auto lower, DeserializeBoundLiteral(summary.lower_bound.value(), ref->type()));
176+
if (lower > lit) {
177+
return kRowCannotMatch;
178+
}
179+
180+
ICEBERG_ASSIGN_OR_RAISE(
181+
auto upper, DeserializeBoundLiteral(summary.upper_bound.value(), ref->type()));
182+
if (upper < lit) {
183+
return kRowCannotMatch;
184+
}
185+
186+
return kRowsMightMatch;
187+
}
188+
189+
Result<bool> NotEq(const std::shared_ptr<Bound>& expr, const Literal& lit) override {
190+
// because the bounds are not necessarily a min or max value, this cannot be answered
191+
// using them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
192+
return kRowsMightMatch;
193+
}
194+
195+
Result<bool> In(const std::shared_ptr<Bound>& expr,
196+
const BoundSetPredicate::LiteralSet& literal_set) override {
197+
const auto& ref = expr->reference();
198+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
199+
const auto& summary = stats_.at(pos);
200+
if (!summary.lower_bound.has_value() || !summary.upper_bound.has_value()) {
201+
// values are all null and literalSet cannot contain null.
202+
return kRowCannotMatch;
203+
}
204+
if (literal_set.size() > kInPredicateLimit) {
205+
// skip evaluating the predicate if the number of values is too big
206+
return kRowsMightMatch;
207+
}
208+
209+
ICEBERG_ASSIGN_OR_RAISE(
210+
auto lower, DeserializeBoundLiteral(summary.lower_bound.value(), ref->type()));
211+
ICEBERG_ASSIGN_OR_RAISE(
212+
auto upper, DeserializeBoundLiteral(summary.upper_bound.value(), ref->type()));
213+
214+
if (std::ranges::all_of(literal_set, [&](const Literal& lit) {
215+
return lit < lower || lit > upper;
216+
})) {
217+
// if all values are less than lower bound or greater than upper bound,
218+
// rows cannot match.
219+
return kRowCannotMatch;
220+
}
221+
return kRowsMightMatch;
222+
}
223+
224+
Result<bool> NotIn(const std::shared_ptr<Bound>& expr,
225+
const BoundSetPredicate::LiteralSet& literal_set) override {
226+
// because the bounds are not necessarily a min or max value, this cannot be answered
227+
// using them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in
228+
// col.
229+
return kRowsMightMatch;
230+
}
231+
232+
Result<bool> StartsWith(const std::shared_ptr<Bound>& expr,
233+
const Literal& lit) override {
234+
const auto& ref = expr->reference();
235+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
236+
const auto& summary = stats_.at(pos);
237+
if (!summary.lower_bound.has_value() || !summary.upper_bound.has_value()) {
238+
return kRowCannotMatch;
239+
}
240+
if (lit.type()->type_id() != TypeId::kString) {
241+
return InvalidExpression("Invalid literal: not a string, cannot use StartsWith");
242+
}
243+
const auto& prefix = std::get<std::string>(lit.value());
244+
ICEBERG_ASSIGN_OR_RAISE(
245+
auto lower, DeserializeBoundLiteral(summary.lower_bound.value(), ref->type()));
246+
ICEBERG_ASSIGN_OR_RAISE(
247+
auto upper, DeserializeBoundLiteral(summary.upper_bound.value(), ref->type()));
248+
const auto& lower_bound = std::get<std::string>(lower.value());
249+
const auto& upper_bound = std::get<std::string>(upper.value());
250+
// truncate lower bound so that its length in bytes is not greater than the length of
251+
// prefix
252+
size_t length = std::min(prefix.size(), lower_bound.size());
253+
if (lower_bound.substr(0, length) > prefix) {
254+
return kRowCannotMatch;
255+
}
256+
length = std::min(prefix.size(), upper_bound.size());
257+
if (upper_bound.substr(0, length) < prefix) {
258+
return kRowCannotMatch;
259+
}
260+
return kRowsMightMatch;
261+
}
262+
263+
Result<bool> NotStartsWith(const std::shared_ptr<Bound>& expr,
264+
const Literal& lit) override {
265+
const auto& ref = expr->reference();
266+
ICEBERG_ASSIGN_OR_RAISE(auto pos, GetPosition(*ref));
267+
const auto& summary = stats_.at(pos);
268+
if (summary.contains_null || !summary.lower_bound.has_value() ||
269+
!summary.upper_bound.has_value()) {
270+
return kRowsMightMatch;
271+
}
272+
if (lit.type()->type_id() != TypeId::kString) {
273+
return InvalidExpression("Invalid literal: not a string, cannot use notStartsWith");
274+
}
275+
// notStartsWith will match unless all values must start with the prefix. This happens
276+
// when the lower and upper bounds both start with the prefix.
277+
const auto& prefix = std::get<std::string>(lit.value());
278+
ICEBERG_ASSIGN_OR_RAISE(
279+
auto lower, DeserializeBoundLiteral(summary.lower_bound.value(), ref->type()));
280+
ICEBERG_ASSIGN_OR_RAISE(
281+
auto upper, DeserializeBoundLiteral(summary.upper_bound.value(), ref->type()));
282+
const auto& lower_bound = std::get<std::string>(lower.value());
283+
const auto& upper_bound = std::get<std::string>(upper.value());
284+
285+
// if lower is shorter than the prefix, it can't start with the prefix
286+
if (lower_bound.size() < prefix.size()) {
287+
return kRowsMightMatch;
288+
}
289+
if (lower_bound.starts_with(prefix)) {
290+
// the lower bound starts with the prefix; check the upper bound
291+
// if upper is shorter than the prefix, it can't start with the prefix
292+
if (upper_bound.size() < prefix.size()) {
293+
return kRowsMightMatch;
294+
}
295+
// truncate upper bound so that its length in bytes is not greater than the length
296+
// of prefix
297+
if (upper_bound.starts_with(prefix)) {
298+
return kRowCannotMatch;
299+
}
300+
}
301+
return kRowsMightMatch;
302+
}
303+
304+
private:
305+
Result<size_t> GetPosition(const BoundReference& ref) const {
306+
const auto& accessor = ref.accessor();
307+
const auto& position_path = accessor.position_path();
308+
if (position_path.empty()) {
309+
return InvalidArgument("Invalid accessor: empty position path.");
310+
}
311+
// nested accessors are not supported for partition fields
312+
if (position_path.size() > 1) {
313+
return InvalidArgument("Cannot convert nested accessor to position");
314+
}
315+
auto pos = position_path.at(0);
316+
if (pos >= stats_.size()) {
317+
return InvalidArgument("Position {} is out of partition field range {}", pos,
318+
stats_.size());
319+
}
320+
return pos;
321+
}
322+
323+
bool AllValuesAreNull(const PartitionFieldSummary& summary, TypeId typeId) {
324+
// containsNull encodes whether at least one partition value is null,
325+
// lowerBound is null if all partition values are null
326+
bool allNull = summary.contains_null && !summary.lower_bound.has_value();
327+
328+
if (allNull && (typeId == TypeId::kDouble || typeId == TypeId::kFloat)) {
329+
// floating point types may include NaN values, which we check separately.
330+
// In case bounds don't include NaN value, containsNaN needs to be checked against.
331+
allNull = summary.contains_nan.has_value() && !summary.contains_nan.value();
332+
}
333+
return allNull;
334+
}
335+
336+
Result<Literal> DeserializeBoundLiteral(const std::vector<uint8_t>& bound,
337+
const std::shared_ptr<Type>& type) const {
338+
if (!type->is_primitive()) {
339+
return NotSupported("Bounds of non-primitive partition fields are not supported.");
340+
}
341+
return Literal::Deserialize(
342+
bound, std::move(internal::checked_pointer_cast<PrimitiveType>(type)));
343+
}
344+
345+
private:
346+
const std::vector<PartitionFieldSummary>& stats_;
347+
};
348+
349+
ManifestEvaluator::ManifestEvaluator(std::shared_ptr<Expression> expr)
350+
: expr_(std::move(expr)) {}
351+
352+
ManifestEvaluator::~ManifestEvaluator() = default;
353+
354+
Result<std::unique_ptr<ManifestEvaluator>> ManifestEvaluator::MakeRowFilter(
355+
[[maybe_unused]] std::shared_ptr<Expression> expr,
356+
[[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
357+
[[maybe_unused]] const Schema& schema, [[maybe_unused]] bool case_sensitive) {
358+
// TODO(xiao.dong) we need a projection util to project row filter to the partition col
359+
return NotImplemented("ManifestEvaluator::MakeRowFilter");
360+
}
361+
362+
Result<std::unique_ptr<ManifestEvaluator>> ManifestEvaluator::MakePartitionFilter(
363+
std::shared_ptr<Expression> expr, const std::shared_ptr<PartitionSpec>& spec,
364+
const Schema& schema, bool case_sensitive) {
365+
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(schema));
366+
auto field_span = partition_type->fields();
367+
std::vector<SchemaField> fields(field_span.begin(), field_span.end());
368+
auto partition_schema = std::make_shared<Schema>(fields);
369+
ICEBERG_ASSIGN_OR_RAISE(auto rewrite_expr, RewriteNot::Visit(std::move(expr)));
370+
ICEBERG_ASSIGN_OR_RAISE(auto partition_expr,
371+
Binder::Bind(*partition_schema, rewrite_expr, case_sensitive));
372+
return std::unique_ptr<ManifestEvaluator>(
373+
new ManifestEvaluator(std::move(partition_expr)));
374+
}
375+
376+
Result<bool> ManifestEvaluator::Evaluate(const ManifestFile& manifest) const {
377+
if (manifest.partitions.empty()) {
378+
return kRowsMightMatch;
379+
}
380+
ManifestEvalVisitor visitor(manifest);
381+
return Visit<bool, ManifestEvalVisitor>(expr_, visitor);
382+
}
383+
384+
} // namespace iceberg

0 commit comments

Comments
 (0)