From 649290bd1c244af1f2c140f03bc31220a6d3c877 Mon Sep 17 00:00:00 2001 From: Jim Hughes Date: Mon, 22 Jun 2026 14:42:33 -0400 Subject: [PATCH] [FLINK-40002][table] Fix PushFilterInCalcIntoTableSourceScanRule for metadata filter push-down - Mixed physical+metadata predicates now stay as runtime Calc filters instead of being routed through FilterPushDownSpec, which crashed on compiled-plan restore when ProjectPushDownSpec narrowed the row type. - Fix PushFilterInCalcIntoTableSourceScanRule to separate metadata-only predicates from physical predicates, matching PushFilterIntoTableSourceScanRule. - Extract the duplicated classify-push logic into a shared classifyAndPushFilters() method in PushFilterIntoSourceScanRuleBase. - Document the identity contract on applyMetadataFilters Javadoc. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../abilities/SupportsReadingMetadata.java | 19 ++- ...shFilterInCalcIntoTableSourceScanRule.java | 35 +++-- .../PushFilterIntoSourceScanRuleBase.java | 116 ++++++++++++++- .../PushFilterIntoTableSourceScanRule.java | 56 +------- .../MetadataFilterResultShapesITCase.java | 133 ++++++++++++++++++ 5 files changed, 280 insertions(+), 79 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java index 0d1c65f0f2f96..10e993bd458c5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java @@ -174,14 +174,29 @@ default boolean supportsMetadataFilterPushDown() { /** * Provides a list of metadata filters in conjunctive form. A source can pick filters and return - * the accepted and remaining filters. Same contract as {@link - * SupportsFilterPushDown#applyFilters(List)}, but for metadata columns. + * the accepted and remaining filters. + * + *

Identity contract: The source MUST return the exact same {@link ResolvedExpression} + * instances it received — no copies, no wrappers, no rebuilds. The planner uses instance + * identity (not {@code equals}) to correlate returned expressions back to their original + * positions. Typical implementations partition the input list: + * + *

{@code
+     * return MetadataFilterResult.of(
+     *     metadataFilters.subList(0, accepted),
+     *     metadataFilters.subList(accepted, metadataFilters.size()));
+     * }
* *

The provided filters reference metadata key names (from {@link #listReadableMetadata()}), * not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA * FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code * msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this * method. + * + *

Acceptance must be decided per predicate, independent of which other predicates are in the + * list. On compiled-plan restore only the previously-accepted subset is re-presented, so a + * source that accepts one predicate only when another is also present would fail to re-accept + * it. */ default MetadataFilterResult applyMetadataFilters(List metadataFilters) { return MetadataFilterResult.of(Collections.emptyList(), metadataFilters); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java index 4dc970af83183..56f9848f7decd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java @@ -18,10 +18,8 @@ package org.apache.flink.table.planner.plan.rules.logical; -import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; -import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; @@ -32,6 +30,10 @@ import org.apache.calcite.rex.RexProgramBuilder; import org.apache.calcite.tools.RelBuilder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import scala.Tuple2; /** @@ -63,8 +65,7 @@ public boolean matches(RelOptRuleCall call) { FlinkLogicalTableSourceScan scan = call.rel(1); TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); - // we can not push filter twice - return canPushdownFilter(tableSourceTable); + return canPushdownFilter(tableSourceTable) || canPushdownMetadataFilter(tableSourceTable); } @Override @@ -79,7 +80,7 @@ private void pushFilterIntoScan( RelOptRuleCall call, Calc calc, FlinkLogicalTableSourceScan scan, - FlinkPreparingTableBase relOptTable) { + TableSourceTable tableSourceTable) { RexProgram originProgram = calc.getProgram(); @@ -94,33 +95,29 @@ private void pushFilterIntoScan( RexNode[] convertiblePredicates = extractedPredicates._1; RexNode[] unconvertedPredicates = extractedPredicates._2; if (convertiblePredicates.length == 0) { - // no condition can be translated to expression return; } - Tuple2 pushdownResultWithScan = - resolveFiltersAndCreateTableSourceTable( - convertiblePredicates, - relOptTable.unwrap(TableSourceTable.class), - scan, - relBuilder); + FilterClassificationResult result = + classifyAndPushFilters(convertiblePredicates, tableSourceTable, scan, relBuilder); + if (result == null) { + return; + } - SupportsFilterPushDown.Result result = pushdownResultWithScan._1; - TableSourceTable tableSourceTable = pushdownResultWithScan._2; + List allRemainingRexNodes = new ArrayList<>(result.remainingPredicates); + allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates)); FlinkLogicalTableSourceScan newScan = FlinkLogicalTableSourceScan.create( - scan.getCluster(), scan.getHints(), tableSourceTable); + scan.getCluster(), scan.getHints(), result.updatedTable); // build new calc program RexProgramBuilder programBuilder = RexProgramBuilder.forProgram(originProgram, call.builder().getRexBuilder(), true); programBuilder.clearCondition(); - if (!result.getRemainingFilters().isEmpty() || unconvertedPredicates.length != 0) { - RexNode remainingCondition = - createRemainingCondition( - relBuilder, result.getRemainingFilters(), unconvertedPredicates); + if (!allRemainingRexNodes.isEmpty()) { + RexNode remainingCondition = relBuilder.and(allRemainingRexNodes); RexNode simplifiedRemainingCondition = FlinkRexUtil.simplify( relBuilder.getRexBuilder(), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java index ef8bc092f9835..223c967dc1c71 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -113,6 +114,82 @@ protected RexNode createRemainingCondition( return new Tuple2<>(result, newTableSourceTable); } + /** + * Classifies convertible predicates into physical and metadata, pushes each through the + * appropriate path, and returns the updated table plus any remaining predicates. + */ + protected FilterClassificationResult classifyAndPushFilters( + RexNode[] convertiblePredicates, + TableSourceTable tableSourceTable, + TableScan scan, + RelBuilder relBuilder) { + + boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable); + boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable); + Set metadataColumnIndices = metadataColumnIndices(tableSourceTable, scan); + + List allRemainingRexNodes = new ArrayList<>(); + TableSourceTable currentTable = tableSourceTable; + + List physicalPredicates = new ArrayList<>(); + List metadataPredicates = new ArrayList<>(); + for (RexNode predicate : convertiblePredicates) { + if (referencesOnlyMetadataColumns(predicate, metadataColumnIndices)) { + if (supportsMetadataFilter) { + metadataPredicates.add(predicate); + } else { + allRemainingRexNodes.add(predicate); + } + } else if (referencesAnyMetadataColumns(predicate, metadataColumnIndices)) { + allRemainingRexNodes.add(predicate); + } else { + physicalPredicates.add(predicate); + } + } + + if ((physicalPredicates.isEmpty() || !supportsPhysicalFilter) + && metadataPredicates.isEmpty()) { + return null; + } + + if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) { + Tuple2 physicalResult = + resolveFiltersAndCreateTableSourceTable( + physicalPredicates.toArray(new RexNode[0]), + currentTable, + scan, + relBuilder); + currentTable = physicalResult._2; + List physicalRemaining = + convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder); + allRemainingRexNodes.addAll(physicalRemaining); + } else { + allRemainingRexNodes.addAll(physicalPredicates); + } + + if (!metadataPredicates.isEmpty()) { + MetadataPushDownOutcome metadataResult = + resolveMetadataFiltersAndCreateTableSourceTable( + metadataPredicates.toArray(new RexNode[0]), currentTable, scan); + currentTable = metadataResult.newTableSourceTable; + allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes); + } + + return new FilterClassificationResult(currentTable, allRemainingRexNodes); + } + + /** Result of classifying and pushing filters through physical and metadata paths. */ + protected static final class FilterClassificationResult { + final TableSourceTable updatedTable; + final List remainingPredicates; + + FilterClassificationResult( + TableSourceTable updatedTable, List remainingPredicates) { + this.updatedTable = updatedTable; + this.remainingPredicates = remainingPredicates; + } + } + /** Whether filter push-down is possible and not already assigned. */ protected boolean canPushdownFilter(TableSourceTable tableSourceTable) { return tableSourceTable != null @@ -143,13 +220,27 @@ protected boolean canPushdownMetadataFilter(TableSourceTable tableSourceTable) { *

A predicate like {@code OR(physical_pred, metadata_pred)} returns false because it * references both physical and metadata columns. Mixed predicates remain as runtime filters. */ - protected boolean referencesOnlyMetadataColumns(RexNode predicate, int physicalColumnCount) { + protected boolean referencesOnlyMetadataColumns( + RexNode predicate, Set metadataColumnIndices) { + boolean[] saw = classifyColumnReferences(predicate, metadataColumnIndices); + return saw[1] && !saw[0]; + } + + /** True if predicate references at least one metadata column (may also reference physical). */ + protected boolean referencesAnyMetadataColumns( + RexNode predicate, Set metadataColumnIndices) { + boolean[] saw = classifyColumnReferences(predicate, metadataColumnIndices); + return saw[1]; + } + + private boolean[] classifyColumnReferences( + RexNode predicate, Set metadataColumnIndices) { boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata predicate.accept( new RexVisitorImpl(true) { @Override public Void visitInputRef(RexInputRef inputRef) { - if (inputRef.getIndex() >= physicalColumnCount) { + if (metadataColumnIndices.contains(inputRef.getIndex())) { saw[1] = true; } else { saw[0] = true; @@ -157,13 +248,24 @@ public Void visitInputRef(RexInputRef inputRef) { return null; } }); - return saw[1] && !saw[0]; + return saw; } - /** Number of physical columns in the scan's schema. */ - protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) { - ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema(); - return (int) schema.getColumns().stream().filter(Column::isPhysical).count(); + /** + * Indices of metadata columns within the scan's current (possibly projection-narrowed) row + * type. Predicate {@link RexInputRef}s are positioned against this same row type, so the + * physical/metadata distinction must be derived from it rather than from the full table schema. + */ + private Set metadataColumnIndices(TableSourceTable tableSourceTable, TableScan scan) { + Map columnToMetadataKey = buildColumnToMetadataKeyMap(tableSourceTable); + List fieldNames = scan.getRowType().getFieldNames(); + Set indices = new HashSet<>(); + for (int i = 0; i < fieldNames.size(); i++) { + if (columnToMetadataKey.containsKey(fieldNames.get(i))) { + indices.add(i); + } + } + return indices; } /** Maps SQL column names to metadata keys for metadata columns. */ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java index 4e896a30701ca..ba502728d2d70 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java @@ -95,63 +95,17 @@ private void pushFilterIntoScan( return; } - boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable); - boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable); - int physicalColumnCount = getPhysicalColumnCount(tableSourceTable); - - List allRemainingRexNodes = new ArrayList<>(); - TableSourceTable currentTable = tableSourceTable; - - // Unpushable metadata predicates stay as a runtime Calc, not the physical path — - // physical routing produces a FilterPushDownSpec that crashes compiled-plan restore - // once ProjectPushDownSpec narrows the scan row type. - List physicalPredicates = new ArrayList<>(); - List metadataPredicates = new ArrayList<>(); - for (RexNode predicate : convertiblePredicates) { - if (referencesOnlyMetadataColumns(predicate, physicalColumnCount)) { - if (supportsMetadataFilter) { - metadataPredicates.add(predicate); - } else { - allRemainingRexNodes.add(predicate); - } - } else { - physicalPredicates.add(predicate); - } - } - - // Avoid re-firing on shapes we can't transform — saves wasted Hep iterations. - boolean nothingToPushPhysically = physicalPredicates.isEmpty() || !supportsPhysicalFilter; - if (nothingToPushPhysically && metadataPredicates.isEmpty()) { + FilterClassificationResult result = + classifyAndPushFilters(convertiblePredicates, tableSourceTable, scan, relBuilder); + if (result == null) { return; } - if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) { - Tuple2 physicalResult = - resolveFiltersAndCreateTableSourceTable( - physicalPredicates.toArray(new RexNode[0]), - currentTable, - scan, - relBuilder); - currentTable = physicalResult._2; - List physicalRemaining = - convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder); - allRemainingRexNodes.addAll(physicalRemaining); - } else { - allRemainingRexNodes.addAll(physicalPredicates); - } - - if (!metadataPredicates.isEmpty()) { - MetadataPushDownOutcome metadataResult = - resolveMetadataFiltersAndCreateTableSourceTable( - metadataPredicates.toArray(new RexNode[0]), currentTable, scan); - currentTable = metadataResult.newTableSourceTable; - allRemainingRexNodes.addAll(metadataResult.remainingInputRexNodes); - } - + List allRemainingRexNodes = new ArrayList<>(result.remainingPredicates); allRemainingRexNodes.addAll(Arrays.asList(unconvertedPredicates)); LogicalTableScan newScan = - LogicalTableScan.create(scan.getCluster(), currentTable, scan.getHints()); + LogicalTableScan.create(scan.getCluster(), result.updatedTable, scan.getHints()); if (allRemainingRexNodes.isEmpty()) { call.transformTo(newScan); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java index 47a987d485c4f..5f079ab20c179 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterResultShapesITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult; import org.apache.flink.table.data.RowData; @@ -44,6 +45,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -190,6 +192,77 @@ void testBestEffortOverlap() throws Exception { assertThat(explain).contains("where=[AND(>(m0, 0), >(m1, 0))]"); } + /** + * A mixed OR predicate like {@code id > 0 OR m0 > 0} references both physical and metadata + * columns in a single predicate that AND-decomposition cannot split. Such predicates must NOT + * leak into the physical {@code filter=[...]} path because {@code FilterPushDownSpec} stores + * RexInputRef indices without a row type — metadata column indices break during compiled-plan + * restore when {@code ProjectPushDownSpec} narrows the scan type. + * + *

The mixed predicate must stay as a runtime Calc ({@code where=[...]}). + */ + @Test + void testMixedOrPredicateStaysAsRuntimeFilter() throws Exception { + Schema schema = + Schema.newBuilder() + .column("id", INT()) + .columnByMetadata("m0", INT()) + .columnByMetadata("m1", INT()) + .build(); + + MixedFilterTrackingSource source = new MixedFilterTrackingSource(); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder().schema(schema).source(source).build(); + tableEnv.createTable("T_MIXED_OR", descriptor); + + // id < 3: rows 1,2. m0 > 0: rows 1,2,5. Union: rows 1,2,5. + String sql = "SELECT id FROM T_MIXED_OR WHERE id < 3 OR m0 > 0 ORDER BY id"; + + String explain = tableEnv.explainSql(sql); + assertThat(explain) + .as("Mixed OR must not leak into physical filter path") + .doesNotContain("filter=[OR("); + assertThat(explain) + .as("Mixed OR must not leak into metadata filter path") + .doesNotContain("metadataFilter=[OR("); + + // Row data: (1,5,5), (2,5,-1), (3,-1,5), (4,-1,-1), (5,7,9), (6,0,0) + assertThat(collectIds(sql)).containsExactly(1, 2, 5); + } + + /** + * When AND connects a pure physical predicate and a mixed OR, the physical predicate should be + * pushed while the mixed OR stays as a runtime Calc. + */ + @Test + void testMixedOrWithSeparatePhysicalPredicate() throws Exception { + Schema schema = + Schema.newBuilder() + .column("id", INT()) + .columnByMetadata("m0", INT()) + .columnByMetadata("m1", INT()) + .build(); + + MixedFilterTrackingSource source = new MixedFilterTrackingSource(); + TableDescriptor descriptor = + TableFactoryHarness.newBuilder().schema(schema).source(source).build(); + tableEnv.createTable("T_MIXED_OR2", descriptor); + + // id > 2 (physical, pushable) AND (id < 5 OR m0 > 0) (mixed, must stay as runtime) + String sql = "SELECT id FROM T_MIXED_OR2 WHERE id > 2 AND (id < 5 OR m0 > 0) ORDER BY id"; + + String explain = tableEnv.explainSql(sql); + // The mixed OR must NOT appear in filter= or metadataFilter= + assertThat(explain) + .as("Mixed OR must not leak into physical filter path") + .doesNotContain("filter=[OR("); + + // Row data: (1,5,5), (2,5,-1), (3,-1,5), (4,-1,-1), (5,7,9), (6,0,0) + // id>2: rows 3,4,5,6. AND (id<5 OR m0>0): row3(yes,no→yes), row4(yes,no→yes), + // row5(no,yes→yes), row6(no,no→no). Result: 3,4,5 + assertThat(collectIds(sql)).containsExactly(3, 4, 5); + } + // ----------------------------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------------------------- @@ -302,4 +375,64 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon return SourceProvider.of(source); } } + + /** + * Source that supports both physical and metadata filter push-down. Accepts all filters it + * receives, emits all rows (runtime Calc is the load-bearing filter for remaining predicates). + */ + static class MixedFilterTrackingSource extends TableFactoryHarness.ScanSourceBase + implements SupportsReadingMetadata, SupportsFilterPushDown { + + private DataType producedDataType; + private List acceptedPhysicalFilters = new ArrayList<>(); + + MixedFilterTrackingSource() { + super(true); + } + + @Override + public Map listReadableMetadata() { + Map metadata = new LinkedHashMap<>(); + metadata.put("m0", INT()); + metadata.put("m1", INT()); + return metadata; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.producedDataType = producedDataType; + } + + @Override + public boolean supportsMetadataFilterPushDown() { + return true; + } + + @Override + public MetadataFilterResult applyMetadataFilters(List metadataFilters) { + return MetadataFilterResult.of(metadataFilters, Collections.emptyList()); + } + + @Override + public Result applyFilters(List filters) { + acceptedPhysicalFilters.addAll(filters); + return Result.of(Collections.emptyList(), filters); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + DataType emitted = + producedDataType != null + ? producedDataType + : getFactoryContext().getPhysicalRowDataType(); + DynamicTableSource.DataStructureConverter converter = + runtimeProviderContext.createDataStructureConverter(emitted); + GeneratorFunction generator = + index -> (RowData) converter.toInternal(ROWS.get(index.intValue())); + DataGeneratorSource source = + new DataGeneratorSource<>( + generator, ROWS.size(), TypeInformation.of(RowData.class)); + return SourceProvider.of(source); + } + } }