Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
}
if (!runtimeFilters.isEmpty()) {
output.append(prefix).append("runtime filters: ");
output.append(getRuntimeFilterExplainString(false));
output.append(getRuntimeFilterExplainString());
}

output.append(prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids;

import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.hint.Hint;
Expand All @@ -36,6 +37,7 @@
import org.apache.doris.nereids.memo.Memo;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.processor.post.runtimefilterv2.RuntimeFilterContextV2;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleFactory;
import org.apache.doris.nereids.rules.RuleSet;
Expand All @@ -47,6 +49,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
Expand Down Expand Up @@ -94,6 +97,7 @@ public class CascadesContext implements ScheduleContext {
// subqueryExprIsAnalyzed: whether the subquery has been analyzed.
private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
private final RuntimeFilterContext runtimeFilterContext;
private final RuntimeFilterContextV2 runtimeFilterV2Context;
private final TopnFilterContext topnFilterContext = new TopnFilterContext();
private Optional<Scope> outerScope = Optional.empty();

Expand Down Expand Up @@ -142,7 +146,10 @@ private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> curren
this.jobScheduler = new SimpleJobScheduler();
this.currentJobContext = new JobContext(this, requireProperties, Double.MAX_VALUE);
this.subqueryExprIsAnalyzed = new HashMap<>();
this.runtimeFilterContext = new RuntimeFilterContext(getConnectContext().getSessionVariable());
IdGenerator<RuntimeFilterId> runtimeFilterIdGen = RuntimeFilterId.createGenerator();
this.runtimeFilterContext = new RuntimeFilterContext(getConnectContext().getSessionVariable(),
runtimeFilterIdGen);
this.runtimeFilterV2Context = new RuntimeFilterContextV2(runtimeFilterIdGen);
this.materializationContexts = new HashSet<>();
if (statementContext.getConnectContext() != null) {
ConnectContext connectContext = statementContext.getConnectContext();
Expand Down Expand Up @@ -534,4 +541,8 @@ public int getDistinctAggLevel() {
public boolean isEnableExprTrace() {
return isEnableExprTrace;
}

public RuntimeFilterContextV2 getRuntimeFilterV2Context() {
return runtimeFilterV2Context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,10 @@ public void appendTupleInfo(StringBuilder str) {

@Override
public List<RuntimeFilter> getRuntimeFilters() {
return cascadesContext.getRuntimeFilterContext().getLegacyFilters();
ArrayList<RuntimeFilter> runtimeFilters = new ArrayList<>();
runtimeFilters.addAll(cascadesContext.getRuntimeFilterContext().getLegacyFilters());
runtimeFilters.addAll(cascadesContext.getRuntimeFilterV2Context().getLegacyFilters());
return runtimeFilters;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.doris.fs.FileSystemDirectoryLister;
import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.processor.post.runtimefilterv2.RuntimeFilterV2;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.DistributionSpecAllSingleton;
import org.apache.doris.nereids.properties.DistributionSpecAny;
Expand Down Expand Up @@ -650,6 +651,14 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
fileScan.getTableSnapshot().ifPresent(fileQueryScanNode::setQueryTableSnapshot);
fileScan.getScanParams().ifPresent(fileQueryScanNode::setScanParams);
}
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(fileScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
}

Expand Down Expand Up @@ -690,6 +699,14 @@ public PlanFragment visitPhysicalEsScan(PhysicalEsScan esScan, PlanTranslatorCon
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(esScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(esScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(esScan, esScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition);
Expand Down Expand Up @@ -749,6 +766,14 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(fileScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(fileScan, scanNode, context);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
Expand All @@ -774,6 +799,15 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(jdbcScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(jdbcScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}

context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition);
Expand All @@ -798,6 +832,14 @@ public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTransla
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(odbcScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(odbcScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
Expand Down Expand Up @@ -882,6 +924,14 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan,
expr, olapScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(olapScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(olapScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(olapScan, olapScanNode, context);
}
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
Expand Down Expand Up @@ -980,6 +1030,14 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(schemaScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.addScanNode(scanNode, schemaScan);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
Expand Down Expand Up @@ -1300,6 +1358,14 @@ public PlanFragment visitPhysicalCTEConsumer(PhysicalCTEConsumer cteConsumer,
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.getContext().getTargetListByScan(cteConsumer).forEach(
expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode, context)));
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(cteConsumer);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(cteScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode);

return multiCastFragment;
Expand Down Expand Up @@ -2249,6 +2315,28 @@ && findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot()
return setOperationFragment;
}

@Override
public PlanFragment visitPhysicalIntersect(PhysicalIntersect intersect, PlanTranslatorContext context) {
PlanFragment fragment = visitPhysicalSetOperation(intersect, context);
RunTimeFilterTranslatorV2.INSTANCE.createLegacyRuntimeFilters(
fragment.getPlanRoot(),
intersect.getRuntimeFiltersV2(),
context);

return fragment;
}

@Override
public PlanFragment visitPhysicalExcept(PhysicalExcept except, PlanTranslatorContext context) {
PlanFragment fragment = visitPhysicalSetOperation(except, context);
RunTimeFilterTranslatorV2.INSTANCE.createLegacyRuntimeFilters(
fragment.getPlanRoot(),
except.getRuntimeFiltersV2(),
context);

return fragment;
}

/*-
* Physical sort:
* 1. Build sortInfo
Expand Down Expand Up @@ -2615,6 +2703,14 @@ public PlanFragment visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterialize
expr, olapScanNode, context)
)
);
// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(lazyScan);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(olapScanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(lazyScan, olapScanNode, context);

return planFragment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.IdGenerator;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.processor.post.runtimefilterv2.RuntimeFilterContextV2;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.SlotReference;
Expand All @@ -44,6 +45,7 @@
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class PlanTranslatorContext {
private final DescriptorTable descTable = new DescriptorTable();

private final RuntimeFilterTranslator translator;

private final TopnFilterContext topnFilterContext;
/**
* index from Nereids' slot to legacy slot.
Expand Down Expand Up @@ -115,20 +118,24 @@ public class PlanTranslatorContext {
private final Map<RelationId, TPushAggOp> tablePushAggOp = Maps.newHashMap();

private final Map<ScanNode, Set<SlotId>> statsUnknownColumnsMap = Maps.newHashMap();
private final RuntimeFilterContextV2 runtimeFilterV2Context;

private boolean isTopMaterializeNode = true;

public PlanTranslatorContext(CascadesContext ctx) {
this.connectContext = ctx.getConnectContext();
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
}

@VisibleForTesting
public PlanTranslatorContext() {
this.connectContext = null;
this.translator = null;
this.topnFilterContext = new TopnFilterContext();
IdGenerator<RuntimeFilterId> runtimeFilterIdGen = RuntimeFilterId.createGenerator();
this.runtimeFilterV2Context = new RuntimeFilterContextV2(runtimeFilterIdGen);
}

/**
Expand Down Expand Up @@ -354,4 +361,7 @@ public void setTopMaterializeNode(boolean topMaterializeNode) {
isTopMaterializeNode = topMaterializeNode;
}

public RuntimeFilterContextV2 getRuntimeFilterV2Context() {
return runtimeFilterV2Context;
}
}
Loading
Loading