diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 2d4d0f5a6501..d7b8b63bedfe 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -522,7 +522,7 @@ void addSegmentToServer(DruidServer server, DataSegment segment) @Override public TimelineLookup getTimeline(DataSource dataSource) { - final String table = Iterables.getOnlyElement(dataSource.getNames()); + final String table = Iterables.getOnlyElement(dataSource.getTableNames()); return timelines.get(table); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index fab5a4f13919..ecb5d9ab03da 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -328,7 +328,7 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; - final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); + final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames()); if (runningItem != null) { final Task task = runningItem.getTask(); diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index e967d54c6017..cc2cd6df6b4c 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -266,18 +266,20 @@ public boolean equals(Object o) return false; } BaseQuery baseQuery = (BaseQuery) o; + + // Must use getDuration() instead of "duration" because duration is lazily computed. return descending == baseQuery.descending && Objects.equals(dataSource, baseQuery.dataSource) && Objects.equals(context, baseQuery.context) && Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) && - Objects.equals(duration, baseQuery.duration) && + Objects.equals(getDuration(), baseQuery.getDuration()) && Objects.equals(granularity, baseQuery.granularity); } @Override public int hashCode() { - - return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity); + // Must use getDuration() instead of "duration" because duration is lazily computed. + return Objects.hash(dataSource, descending, context, querySegmentSpec, getDuration(), granularity); } } diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index 13a3cf5cc4bf..549b06b0b3eb 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -23,17 +23,62 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.util.List; +import java.util.Set; -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.PROPERTY, - property = "type", - defaultImpl = LegacyDataSource.class) +/** + * Represents a source... of data... for a query. Analogous to the "FROM" clause in SQL. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyDataSource.class) @JsonSubTypes({ - @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), - @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), - @JsonSubTypes.Type(value = UnionDataSource.class, name = "union") - }) + @JsonSubTypes.Type(value = TableDataSource.class, name = "table"), + @JsonSubTypes.Type(value = QueryDataSource.class, name = "query"), + @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"), + @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), + @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"), + @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline") +}) public interface DataSource { - List getNames(); + /** + * Returns the names of all table datasources involved in this query. Does not include names for non-tables, like + * lookups or inline datasources. + */ + Set getTableNames(); + + /** + * Returns datasources that this datasource depends on. Will be empty for leaf datasources like 'table'. + */ + List getChildren(); + + /** + * Return a new DataSource, identical to this one, with different children. The number of children must be equal + * to the number of children that this datasource already has. + */ + DataSource withChildren(List children); + + /** + * Returns true if queries on this dataSource are cacheable at both the result level and per-segment level. + * Currently, dataSources that modify the behavior of per-segment processing are not cacheable (like 'join'). + * Nor are dataSources that do not actually reference segments (like 'inline'), since cache keys are always based + * on segment identifiers. + * + * Note: Ideally, queries on 'join' datasources _would_ be cacheable, but we cannot currently do this due to lacking + * the code necessary to compute cache keys properly. + */ + boolean isCacheable(); + + /** + * Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or + * for queries of those. + */ + boolean isGlobal(); + + /** + * Returns true if this datasource represents concrete data that can be scanned via a + * {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'. + * + * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this + * @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this + */ + boolean isConcrete(); } diff --git a/processing/src/main/java/org/apache/druid/query/DataSourceUtil.java b/processing/src/main/java/org/apache/druid/query/DataSourceUtil.java deleted file mode 100644 index fdc7747f2d93..000000000000 --- a/processing/src/main/java/org/apache/druid/query/DataSourceUtil.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query; - -import java.util.List; - -public class DataSourceUtil -{ - public static String getMetricName(DataSource dataSource) - { - final List names = dataSource.getNames(); - return names.size() == 1 ? names.get(0) : names.toString(); - } -} diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index 145f8dac3527..fcdebd22bc8c 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -20,6 +20,7 @@ package org.apache.druid.query; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -30,7 +31,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the @@ -42,9 +45,22 @@ public class DefaultQueryMetrics> implements QueryMet protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); protected final Map metrics = new HashMap<>(); - /** Non final to give subclasses ability to reassign it. */ + /** + * Non final to give subclasses ability to reassign it. + */ protected Thread ownerThread = Thread.currentThread(); + private static String getTableNamesAsString(DataSource dataSource) + { + final Set names = dataSource.getTableNames(); + + if (names.size() == 1) { + return Iterables.getOnlyElement(names); + } else { + return names.stream().sorted().collect(Collectors.toList()).toString(); + } + } + protected void checkModifiedFromOwnerThread() { if (Thread.currentThread() != ownerThread) { @@ -77,7 +93,7 @@ public void query(QueryType query) @Override public void dataSource(QueryType query) { - setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())); + setDimension(DruidMetrics.DATASOURCE, getTableNamesAsString(query.getDataSource())); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java new file mode 100644 index 000000000000..890fbc00ba41 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.function.ToLongFunction; + +/** + * Represents an inline datasource, where the rows are embedded within the DataSource object itself. + * + * The rows are backed by an Iterable, which can be lazy or not. Lazy datasources will only be iterated if someone calls + * {@link #getRows()} and iterates the result, or until someone calls {@link #getRowsAsList()}. + */ +public class InlineDataSource implements DataSource +{ + private final List columnNames; + private final List columnTypes; + private final Iterable rows; + + private InlineDataSource( + final List columnNames, + final List columnTypes, + final Iterable rows + ) + { + this.columnNames = Preconditions.checkNotNull(columnNames, "'columnNames' must be nonnull"); + this.columnTypes = Preconditions.checkNotNull(columnTypes, "'columnTypes' must be nonnull"); + this.rows = Preconditions.checkNotNull(rows, "'rows' must be nonnull"); + + if (columnNames.size() != columnTypes.size()) { + throw new IAE("columnNames and columnTypes must be the same length"); + } + } + + /** + * Factory method for Jackson. Used for inline datasources that were originally encoded as JSON. Private because + * non-Jackson callers should use {@link #fromIterable}. + */ + @JsonCreator + private static InlineDataSource fromJson( + @JsonProperty("columnNames") List columnNames, + @JsonProperty("columnTypes") List columnTypes, + @JsonProperty("rows") List rows + ) + { + return new InlineDataSource(columnNames, columnTypes, rows); + } + + /** + * Creates an inline datasource from an Iterable. The Iterable will not be iterated until someone calls + * {@link #getRows()} and iterates the result, or until someone calls {@link #getRowsAsList()}. + * + * @param columnNames names of each column in the rows + * @param columnTypes types of each column in the rows + * @param rows rows, each of the same length as columnNames and columnTypes + */ + public static InlineDataSource fromIterable( + final List columnNames, + final List columnTypes, + final Iterable rows + ) + { + return new InlineDataSource(columnNames, columnTypes, rows); + } + + @Override + public Set getTableNames() + { + return Collections.emptySet(); + } + + @JsonProperty + public List getColumnNames() + { + return columnNames; + } + + @JsonProperty + public List getColumnTypes() + { + return columnTypes; + } + + /** + * Returns rows as a list. If the original Iterable behind this datasource was a List, this method will return it + * as-is, without copying it. Otherwise, this method will walk the iterable and copy it into a List before returning. + */ + @JsonProperty("rows") + public List getRowsAsList() + { + return rows instanceof List ? ((List) rows) : Lists.newArrayList(rows); + } + + /** + * Returns rows as an Iterable. + */ + @JsonIgnore + public Iterable getRows() + { + return rows; + } + + @Override + public List getChildren() + { + return Collections.emptyList(); + } + + @Override + public DataSource withChildren(List children) + { + if (!children.isEmpty()) { + throw new IAE("Cannot accept children"); + } + + return this; + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return true; + } + + @Override + public boolean isConcrete() + { + return false; + } + + public Map getRowSignature() + { + final ImmutableMap.Builder retVal = ImmutableMap.builder(); + + for (int i = 0; i < columnNames.size(); i++) { + retVal.put(columnNames.get(i), columnTypes.get(i)); + } + + return retVal.build(); + } + + public RowAdapter rowAdapter() + { + return new RowAdapter() + { + @Override + public ToLongFunction timestampFunction() + { + final int columnNumber = columnNames.indexOf(ColumnHolder.TIME_COLUMN_NAME); + + if (columnNumber >= 0) { + return row -> (long) row[columnNumber]; + } else { + return row -> 0L; + } + } + + @Override + public Function columnFunction(String columnName) + { + final int columnNumber = columnNames.indexOf(columnName); + + if (columnNumber >= 0) { + return row -> row[columnNumber]; + } else { + return row -> null; + } + } + }; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InlineDataSource that = (InlineDataSource) o; + return Objects.equals(columnNames, that.columnNames) && + Objects.equals(columnTypes, that.columnTypes) && + Objects.equals(rows, that.rows); + } + + @Override + public int hashCode() + { + return Objects.hash(columnNames, columnTypes, rows); + } + + @Override + public String toString() + { + // Don't include 'rows' in stringification, because it might be long and/or lazy. + return "InlineDataSource{" + + "columnNames=" + columnNames + + ", columnTypes=" + columnTypes + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java new file mode 100644 index 000000000000..087a666c871e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.Joinables; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Represents a join of two datasources. + * + * Logically, this datasource contains the result of: + * + * (1) prefixing all right-side columns with "rightPrefix" + * (2) then, joining the left and (prefixed) right sides using the provided type and condition + * + * Any columns from the left-hand side that start with "rightPrefix", and are at least one character longer than + * the prefix, will be shadowed. It is up to the caller to ensure that no important columns are shadowed by the + * chosen prefix. + * + * When analyzed by {@link org.apache.druid.query.planning.DataSourceAnalysis}, the right-hand side of this datasource + * will become a {@link org.apache.druid.query.planning.PreJoinableClause} object. + */ +public class JoinDataSource implements DataSource +{ + private final DataSource left; + private final DataSource right; + private final String rightPrefix; + private final JoinConditionAnalysis conditionAnalysis; + private final JoinType joinType; + + private JoinDataSource( + DataSource left, + DataSource right, + String rightPrefix, + JoinConditionAnalysis conditionAnalysis, + JoinType joinType + ) + { + this.left = Preconditions.checkNotNull(left, "left"); + this.right = Preconditions.checkNotNull(right, "right"); + this.rightPrefix = Joinables.validatePrefix(rightPrefix); + this.conditionAnalysis = Preconditions.checkNotNull(conditionAnalysis, "conditionAnalysis"); + this.joinType = Preconditions.checkNotNull(joinType, "joinType"); + } + + @JsonCreator + public static JoinDataSource create( + @JsonProperty("left") DataSource left, + @JsonProperty("right") DataSource right, + @JsonProperty("rightPrefix") String rightPrefix, + @JsonProperty("condition") String condition, + @JsonProperty("joinType") JoinType joinType, + @JacksonInject ExprMacroTable macroTable + ) + { + return new JoinDataSource( + left, + right, + StringUtils.nullToEmptyNonDruidDataString(rightPrefix), + JoinConditionAnalysis.forExpression( + Preconditions.checkNotNull(condition, "condition"), + StringUtils.nullToEmptyNonDruidDataString(rightPrefix), + macroTable + ), + joinType + ); + } + + @Override + public Set getTableNames() + { + final Set names = new HashSet<>(); + names.addAll(left.getTableNames()); + names.addAll(right.getTableNames()); + return names; + } + + @JsonProperty + public DataSource getLeft() + { + return left; + } + + @JsonProperty + public DataSource getRight() + { + return right; + } + + @JsonProperty + public String getRightPrefix() + { + return rightPrefix; + } + + @JsonProperty + public String getCondition() + { + return conditionAnalysis.getOriginalExpression(); + } + + public JoinConditionAnalysis getConditionAnalysis() + { + return conditionAnalysis; + } + + @JsonProperty + public JoinType getJoinType() + { + return joinType; + } + + @Override + public List getChildren() + { + return ImmutableList.of(left, right); + } + + @Override + public DataSource withChildren(List children) + { + if (children.size() != 2) { + throw new IAE("Expected [2] children, got [%d]", children.size()); + } + + return new JoinDataSource(children.get(0), children.get(1), rightPrefix, conditionAnalysis, joinType); + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return left.isGlobal() && right.isGlobal(); + } + + @Override + public boolean isConcrete() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JoinDataSource that = (JoinDataSource) o; + return Objects.equals(left, that.left) && + Objects.equals(right, that.right) && + Objects.equals(rightPrefix, that.rightPrefix) && + Objects.equals(conditionAnalysis, that.conditionAnalysis) && + joinType == that.joinType; + } + + @Override + public int hashCode() + { + return Objects.hash(left, right, rightPrefix, conditionAnalysis, joinType); + } + + @Override + public String toString() + { + return "JoinDataSource{" + + "left=" + left + + ", right=" + right + + ", rightPrefix='" + rightPrefix + '\'' + + ", condition=" + conditionAnalysis + + ", joinType=" + joinType + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java new file mode 100644 index 000000000000..a2c99f7d1fd3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Represents a lookup. + * + * Currently, this datasource is not actually queryable, and attempts to do so will lead to errors. It is here as a + * placeholder for a future time in which it will become queryable. + * + * The "lookupName" referred to here should be provided by a + * {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider}. + */ +public class LookupDataSource implements DataSource +{ + private final String lookupName; + + @JsonCreator + public LookupDataSource( + @JsonProperty("lookup") String lookupName + ) + { + this.lookupName = Preconditions.checkNotNull(lookupName, "lookup"); + } + + @Override + public Set getTableNames() + { + return Collections.emptySet(); + } + + @JsonProperty("lookup") + public String getLookupName() + { + return lookupName; + } + + @Override + public List getChildren() + { + return Collections.emptyList(); + } + + @Override + public DataSource withChildren(List children) + { + if (!children.isEmpty()) { + throw new IAE("Cannot accept children"); + } + + return this; + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return true; + } + + @Override + public boolean isConcrete() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LookupDataSource that = (LookupDataSource) o; + return Objects.equals(lookupName, that.lookupName); + } + + @Override + public int hashCode() + { + return Objects.hash(lookupName); + } + + @Override + public String toString() + { + return "LookupDataSource{" + + "lookupName='" + lookupName + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java b/processing/src/main/java/org/apache/druid/query/Queries.java index 1fbe33587d00..37408a4aea3f 100644 --- a/processing/src/main/java/org/apache/druid/query/Queries.java +++ b/processing/src/main/java/org/apache/druid/query/Queries.java @@ -34,6 +34,7 @@ import java.util.Set; /** + * */ @PublicApi public class Queries diff --git a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java index 5e5201711d84..94d47d511f48 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java @@ -22,8 +22,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.IAE; +import java.util.Collections; import java.util.List; +import java.util.Set; @JsonTypeName("query") public class QueryDataSource implements DataSource @@ -34,13 +39,13 @@ public class QueryDataSource implements DataSource @JsonCreator public QueryDataSource(@JsonProperty("query") Query query) { - this.query = query; + this.query = Preconditions.checkNotNull(query, "'query' must be nonnull"); } @Override - public List getNames() + public Set getTableNames() { - return query.getDataSource().getNames(); + return query.getDataSource().getTableNames(); } @JsonProperty @@ -49,6 +54,40 @@ public Query getQuery() return query; } + @Override + public List getChildren() + { + return Collections.singletonList(query.getDataSource()); + } + + @Override + public DataSource withChildren(List children) + { + if (children.size() != 1) { + throw new IAE("Must have exactly one child"); + } + + return new QueryDataSource(query.withDataSource(Iterables.getOnlyElement(children))); + } + + @Override + public boolean isCacheable() + { + return false; + } + + @Override + public boolean isGlobal() + { + return query.getDataSource().isGlobal(); + } + + @Override + public boolean isConcrete() + { + return false; + } + @Override public String toString() { diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index f26aa53b93a5..4c371cf84510 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -22,20 +22,22 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; import java.util.Collections; import java.util.List; +import java.util.Set; @JsonTypeName("table") public class TableDataSource implements DataSource { - @JsonProperty private final String name; @JsonCreator public TableDataSource(@JsonProperty("name") String name) { - this.name = (name == null ? null : name); + this.name = Preconditions.checkNotNull(name, "'name' must be nonnull"); } @JsonProperty @@ -45,9 +47,43 @@ public String getName() } @Override - public List getNames() + public Set getTableNames() { - return Collections.singletonList(name); + return Collections.singleton(name); + } + + @Override + public List getChildren() + { + return Collections.emptyList(); + } + + @Override + public DataSource withChildren(List children) + { + if (!children.isEmpty()) { + throw new IAE("Cannot accept children"); + } + + return this; + } + + @Override + public boolean isCacheable() + { + return true; + } + + @Override + public boolean isGlobal() + { + return false; + } + + @Override + public boolean isConcrete() + { + return true; } @Override @@ -57,7 +93,7 @@ public String toString() } @Override - public boolean equals(Object o) + public final boolean equals(Object o) { if (this == o) { return true; @@ -76,7 +112,7 @@ public boolean equals(Object o) } @Override - public int hashCode() + public final int hashCode() { return name.hashCode(); } diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 290ac538b0af..3bd25b017f1d 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -23,9 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.IAE; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class UnionDataSource implements DataSource @@ -41,9 +44,11 @@ public UnionDataSource(@JsonProperty("dataSources") List dataSo } @Override - public List getNames() + public Set getTableNames() { - return dataSources.stream().map(input -> Iterables.getOnlyElement(input.getNames())).collect(Collectors.toList()); + return dataSources.stream() + .map(input -> Iterables.getOnlyElement(input.getTableNames())) + .collect(Collectors.toSet()); } @JsonProperty @@ -52,6 +57,51 @@ public List getDataSources() return dataSources; } + @Override + public List getChildren() + { + return ImmutableList.copyOf(dataSources); + } + + @Override + public DataSource withChildren(List children) + { + if (children.size() != dataSources.size()) { + throw new IAE("Expected [%d] children, got [%d]", dataSources.size(), children.size()); + } + + if (!children.stream().allMatch(dataSource -> dataSource instanceof TableDataSource)) { + throw new IAE("All children must be tables"); + } + + return new UnionDataSource( + children.stream().map(dataSource -> (TableDataSource) dataSource).collect(Collectors.toList()) + ); + } + + @Override + public boolean isCacheable() + { + // Disables result-level caching for 'union' datasources, which doesn't work currently. + // See https://github.com/apache/druid/issues/8713 for reference. + // + // Note that per-segment caching is still effective, since at the time the per-segment cache evaluates a query + // for cacheability, it would have already been rewritten to a query on a single table. + return false; + } + + @Override + public boolean isGlobal() + { + return dataSources.stream().allMatch(DataSource::isGlobal); + } + + @Override + public boolean isConcrete() + { + return dataSources.stream().allMatch(DataSource::isConcrete); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java new file mode 100644 index 000000000000..4237e50dc473 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.spec.QuerySegmentSpec; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Analysis of a datasource for purposes of deciding how to execute a particular query. + * + * The analysis breaks a datasource down in the following way: + * + *
+ *
+ *                             Q  <-- Possible outer query datasource(s) [may be multiple stacked]
+ *                             |
+ *                             J  <-- Possible join tree, expected to be left-leaning
+ *                            / \
+ *                           J  Dj <--  Other leaf datasources
+ *   Base datasource        / \         which will be joined
+ *  (bottom-leftmost) -->  Db Dj  <---- into the base datasource
+ *
+ * 
+ * + * The base datasource (Db) is returned by {@link #getBaseDataSource()}. The other leaf datasources are returned by + * {@link #getPreJoinableClauses()}. The outer query datasources are available as part of {@link #getDataSource()}, + * which just returns the original datasource that was provided for analysis. + * + * The base datasource (Db) will never be a join, but it can be any other type of datasource (table, query, etc). + * Note that join trees are only flattened if they occur at the top of the overall tree (or underneath an outer query), + * and that join trees are only flattened to the degree that they are left-leaning. Due to these facts, it is possible + * for the base or leaf datasources to include additional joins. + * + * The base datasource is the one that will be considered by the core Druid query stack for scanning via + * {@link org.apache.druid.segment.Segment} and {@link org.apache.druid.segment.StorageAdapter}. The other leaf + * datasources must be joinable onto the base data. + * + * The idea here is to keep things simple and dumb. So we focus only on identifying left-leaning join trees, which map + * neatly onto a series of hash table lookups at query time. The user/system generating the queries, e.g. the druid-sql + * layer (or the end user in the case of native queries), is responsible for containing the smarts to structure the + * tree in a way that will lead to optimal execution. + */ +public class DataSourceAnalysis +{ + private final DataSource dataSource; + private final DataSource baseDataSource; + @Nullable + private final QuerySegmentSpec baseQuerySegmentSpec; + private final List preJoinableClauses; + + private DataSourceAnalysis( + DataSource dataSource, + DataSource baseDataSource, + @Nullable QuerySegmentSpec baseQuerySegmentSpec, + List preJoinableClauses + ) + { + if (baseDataSource instanceof JoinDataSource) { + // The base cannot be a join (this is a class invariant). + // If it happens, it's a bug in the datasource analyzer. + throw new IAE("Base dataSource cannot be a join! Original dataSource was: %s", dataSource); + } + + this.dataSource = dataSource; + this.baseDataSource = baseDataSource; + this.baseQuerySegmentSpec = baseQuerySegmentSpec; + this.preJoinableClauses = preJoinableClauses; + } + + public static DataSourceAnalysis forDataSource(final DataSource dataSource) + { + // Strip outer queries, retaining querySegmentSpecs as we go down (lowest will become the 'baseQuerySegmentSpec'). + QuerySegmentSpec baseQuerySegmentSpec = null; + DataSource current = dataSource; + + while (current instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) current).getQuery(); + + if (!(subQuery instanceof BaseQuery)) { + // All builtin query types are BaseQuery, so we only expect this with funky extension queries. + throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName()); + } + + baseQuerySegmentSpec = ((BaseQuery) subQuery).getQuerySegmentSpec(); + current = subQuery.getDataSource(); + } + + if (current instanceof JoinDataSource) { + final Pair> flattened = flattenJoin((JoinDataSource) current); + return new DataSourceAnalysis(dataSource, flattened.lhs, baseQuerySegmentSpec, flattened.rhs); + } else { + return new DataSourceAnalysis(dataSource, current, baseQuerySegmentSpec, Collections.emptyList()); + } + } + + /** + * Flatten a datasource into two parts: the left-hand side datasource (the 'base' datasource), and a list of join + * clauses, if any. + * + * @throws IllegalArgumentException if dataSource cannot be fully flattened. + */ + private static Pair> flattenJoin(final JoinDataSource dataSource) + { + DataSource current = dataSource; + final List preJoinableClauses = new ArrayList<>(); + + while (current instanceof JoinDataSource) { + final JoinDataSource joinDataSource = (JoinDataSource) current; + current = joinDataSource.getLeft(); + preJoinableClauses.add( + new PreJoinableClause( + joinDataSource.getRightPrefix(), + joinDataSource.getRight(), + joinDataSource.getJoinType(), + joinDataSource.getConditionAnalysis() + ) + ); + } + + // Join clauses were added in the order we saw them while traversing down, but we need to apply them in the + // going-up order. So reverse them. + Collections.reverse(preJoinableClauses); + + return Pair.of(current, preJoinableClauses); + } + + /** + * Returns the topmost datasource: the original one passed to {@link #forDataSource(DataSource)}. + */ + public DataSource getDataSource() + { + return dataSource; + } + + /** + * Returns the baseĀ (bottom-leftmost) datasource. + */ + public DataSource getBaseDataSource() + { + return baseDataSource; + } + + /** + * Returns the same datasource as {@link #getBaseDataSource()}, but only if it is a table. Useful on data servers, + * since they generally can only handle queries where the base datasource is a table. + */ + public Optional getBaseTableDataSource() + { + if (baseDataSource instanceof TableDataSource) { + return Optional.of((TableDataSource) baseDataSource); + } else { + return Optional.empty(); + } + } + + /** + * Returns the {@link QuerySegmentSpec} that is associated with the base datasource, if any. This only happens + * when there is an outer query datasource. In this case, the base querySegmentSpec is the one associated with the + * innermost subquery. + */ + public Optional getBaseQuerySegmentSpec() + { + return Optional.ofNullable(baseQuerySegmentSpec); + } + + /** + * Returns join clauses corresponding to joinable leaf datasources (every leaf except the bottom-leftmost). + */ + public List getPreJoinableClauses() + { + return preJoinableClauses; + } + + /** + * Returns true if all servers have the ability to compute this datasource. These datasources depend only on + * globally broadcast data, like lookups or inline data. + */ + public boolean isGlobal() + { + return dataSource.isGlobal(); + } + + /** + * Returns true if this datasource can be computed by the core Druid query stack via a scan of a concrete base + * datasource. All other datasources involved, if any, must be global. + */ + public boolean isConcreteBased() + { + return baseDataSource.isConcrete() && preJoinableClauses.stream() + .allMatch(clause -> clause.getDataSource().isGlobal()); + } + + /** + * Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a + * 'table' or union of them. This is an important property because it corresponds to datasources that can be handled + * by Druid data servers, like Historicals. + */ + public boolean isConcreteTableBased() + { + // At the time of writing this comment, UnionDataSource children are required to be tables, so the instanceof + // check is redundant. But in the future, we will likely want to support unions of things other than tables, + // so check anyway for future-proofing. + return isConcreteBased() && (baseDataSource instanceof TableDataSource + || (baseDataSource instanceof UnionDataSource && + baseDataSource.getChildren() + .stream() + .allMatch(ds -> ds instanceof TableDataSource))); + } + + /** + * Returns true if this datasource represents a subquery. + */ + public boolean isQuery() + { + return dataSource instanceof QueryDataSource; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DataSourceAnalysis that = (DataSourceAnalysis) o; + return Objects.equals(dataSource, that.dataSource) && + Objects.equals(baseDataSource, that.baseDataSource) && + Objects.equals(baseQuerySegmentSpec, that.baseQuerySegmentSpec) && + Objects.equals(preJoinableClauses, that.preJoinableClauses); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSource, baseDataSource, baseQuerySegmentSpec, preJoinableClauses); + } + + @Override + public String toString() + { + return "DataSourceAnalysis{" + + "dataSource=" + dataSource + + ", baseDataSource=" + baseDataSource + + ", baseQuerySegmentSpec=" + baseQuerySegmentSpec + + ", joinClauses=" + preJoinableClauses + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java b/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java new file mode 100644 index 000000000000..5ed7f71d561e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/planning/PreJoinableClause.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import com.google.common.base.Preconditions; +import org.apache.druid.query.DataSource; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.Joinables; + +import java.util.Objects; + +/** + * Like {@link org.apache.druid.segment.join.JoinableClause}, but contains a {@link DataSource} instead of a + * {@link org.apache.druid.segment.join.Joinable}. This is useful because when analyzing joins, we don't want to + * actually create Joinables, since that can be an expensive operation. + */ +public class PreJoinableClause +{ + private final String prefix; + private final DataSource dataSource; + private final JoinType joinType; + private final JoinConditionAnalysis condition; + + PreJoinableClause( + final String prefix, + final DataSource dataSource, + final JoinType joinType, + final JoinConditionAnalysis condition + ) + { + this.prefix = Joinables.validatePrefix(prefix); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.joinType = Preconditions.checkNotNull(joinType, "joinType"); + this.condition = Preconditions.checkNotNull(condition, "condition"); + } + + public String getPrefix() + { + return prefix; + } + + public DataSource getDataSource() + { + return dataSource; + } + + public JoinType getJoinType() + { + return joinType; + } + + public JoinConditionAnalysis getCondition() + { + return condition; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PreJoinableClause that = (PreJoinableClause) o; + return Objects.equals(prefix, that.prefix) && + Objects.equals(dataSource, that.dataSource) && + joinType == that.joinType && + Objects.equals(condition, that.condition); + } + + @Override + public int hashCode() + { + return Objects.hash(prefix, dataSource, joinType, condition); + } + + @Override + public String toString() + { + return "JoinClause{" + + "prefix='" + prefix + '\'' + + ", dataSource=" + dataSource + + ", joinType=" + joinType + + ", condition=" + condition + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java index 4f985dbf3f50..5e9bfc39bdd6 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; -import javax.annotation.Nullable; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -39,9 +38,9 @@ public class JoinableClause private final JoinType joinType; private final JoinConditionAnalysis condition; - public JoinableClause(@Nullable String prefix, Joinable joinable, JoinType joinType, JoinConditionAnalysis condition) + public JoinableClause(String prefix, Joinable joinable, JoinType joinType, JoinConditionAnalysis condition) { - this.prefix = prefix != null ? prefix : ""; + this.prefix = Joinables.validatePrefix(prefix); this.joinable = Preconditions.checkNotNull(joinable, "joinable"); this.joinType = Preconditions.checkNotNull(joinType, "joinType"); this.condition = Preconditions.checkNotNull(condition, "condition"); diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinables.java b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java new file mode 100644 index 000000000000..6bb95a1a502a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.column.ColumnHolder; + +import javax.annotation.Nullable; + +/** + * Utility methods for working with {@link Joinable} related classes. + */ +public class Joinables +{ + /** + * Checks that "prefix" is a valid prefix for a join clause (see {@link JoinableClause#getPrefix()}) and, if so, + * returns it. Otherwise, throws an exception. + */ + public static String validatePrefix(@Nullable final String prefix) + { + if (prefix == null || prefix.isEmpty()) { + throw new IAE("Join clause cannot have null or empty prefix"); + } else if (isPrefixedBy(ColumnHolder.TIME_COLUMN_NAME, prefix) || ColumnHolder.TIME_COLUMN_NAME.equals(prefix)) { + throw new IAE( + "Join clause cannot have prefix[%s], since it would shadow %s", + prefix, + ColumnHolder.TIME_COLUMN_NAME + ); + } else { + return prefix; + } + } + + public static boolean isPrefixedBy(final String columnName, final String prefix) + { + return columnName.startsWith(prefix) && columnName.length() > prefix.length(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java index 537650881b1c..090570db7acb 100644 --- a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java @@ -20,6 +20,7 @@ package org.apache.druid.query; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -53,7 +54,10 @@ public void testLegacyDataSource() throws IOException @Test public void testTableDataSource() throws IOException { - DataSource dataSource = JSON_MAPPER.readValue("{\"type\":\"table\", \"name\":\"somedatasource\"}", DataSource.class); + DataSource dataSource = JSON_MAPPER.readValue( + "{\"type\":\"table\", \"name\":\"somedatasource\"}", + DataSource.class + ); Assert.assertEquals(new TableDataSource("somedatasource"), dataSource); } @@ -88,8 +92,8 @@ public void testUnionDataSource() throws Exception Lists.newArrayList(((UnionDataSource) dataSource).getDataSources()) ); Assert.assertEquals( - Lists.newArrayList("ds1", "ds2"), - Lists.newArrayList(dataSource.getNames()) + ImmutableSet.of("ds1", "ds2"), + dataSource.getTableNames() ); final DataSource serde = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(dataSource), DataSource.class); diff --git a/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java new file mode 100644 index 000000000000..c533ec3f9e12 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/InlineDataSourceTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +public class InlineDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final AtomicLong iterationCounter = new AtomicLong(); + + private final List rows = ImmutableList.of( + new Object[]{DateTimes.of("2000").getMillis(), "foo", 0d, ImmutableMap.of("n", "0")}, + new Object[]{DateTimes.of("2000").getMillis(), "bar", 1d, ImmutableMap.of("n", "1")}, + new Object[]{DateTimes.of("2000").getMillis(), "baz", 2d, ImmutableMap.of("n", "2")} + ); + + private final Iterable rowsIterable = () -> { + iterationCounter.incrementAndGet(); + return rows.iterator(); + }; + + private final List expectedColumnNames = ImmutableList.of( + ColumnHolder.TIME_COLUMN_NAME, + "str", + "double", + "complex" + ); + + private final List expectedColumnTypes = ImmutableList.of( + ValueType.LONG, + ValueType.STRING, + ValueType.DOUBLE, + ValueType.COMPLEX + ); + + private final InlineDataSource listDataSource = InlineDataSource.fromIterable( + expectedColumnNames, + expectedColumnTypes, + rows + ); + + private final InlineDataSource iterableDataSource = InlineDataSource.fromIterable( + expectedColumnNames, + expectedColumnTypes, + rowsIterable + ); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(Collections.emptySet(), listDataSource.getTableNames()); + Assert.assertEquals(Collections.emptySet(), iterableDataSource.getTableNames()); + } + + @Test + public void test_getColumnNames() + { + Assert.assertEquals(expectedColumnNames, listDataSource.getColumnNames()); + Assert.assertEquals(expectedColumnNames, iterableDataSource.getColumnNames()); + } + + @Test + public void test_getColumnTypes() + { + Assert.assertEquals(expectedColumnTypes, listDataSource.getColumnTypes()); + Assert.assertEquals(expectedColumnTypes, iterableDataSource.getColumnTypes()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals(Collections.emptyList(), listDataSource.getChildren()); + Assert.assertEquals(Collections.emptyList(), iterableDataSource.getChildren()); + } + + @Test + public void test_getRowSignature() + { + Assert.assertEquals( + ImmutableMap.of( + ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG, + "str", ValueType.STRING, + "double", ValueType.DOUBLE, + "complex", ValueType.COMPLEX + ), + listDataSource.getRowSignature() + ); + } + + @Test + public void test_isCacheable() + { + Assert.assertFalse(listDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertTrue(listDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertFalse(listDataSource.isConcrete()); + } + + @Test + public void test_rowAdapter() + { + final RowAdapter adapter = listDataSource.rowAdapter(); + final Object[] row = rows.get(1); + + Assert.assertEquals(DateTimes.of("2000").getMillis(), adapter.timestampFunction().applyAsLong(row)); + Assert.assertEquals("bar", adapter.columnFunction("str").apply(row)); + Assert.assertEquals(1d, adapter.columnFunction("double").apply(row)); + Assert.assertEquals(ImmutableMap.of("n", "1"), adapter.columnFunction("complex").apply(row)); + } + + @Test + public void test_getRows_list() + { + Assert.assertSame(this.rows, listDataSource.getRowsAsList()); + } + + @Test + public void test_getRows_iterable() + { + final Iterable iterable = iterableDataSource.getRows(); + Assert.assertNotSame(this.rows, iterable); + + // No iteration yet. + Assert.assertEquals(0, iterationCounter.get()); + + assertRowsEqual(this.rows, ImmutableList.copyOf(iterable)); + + // OK, now we've iterated. + Assert.assertEquals(1, iterationCounter.get()); + + // Read again, we should iterate again. + //noinspection MismatchedQueryAndUpdateOfCollection + final List ignored = Lists.newArrayList(iterable); + Assert.assertEquals(2, iterationCounter.get()); + } + + @Test + public void test_getRowsAsList_list() + { + Assert.assertSame(this.rows, listDataSource.getRowsAsList()); + } + + @Test + public void test_getRowsAsList_iterable() + { + final List list = iterableDataSource.getRowsAsList(); + + Assert.assertEquals(1, iterationCounter.get()); + assertRowsEqual(this.rows, list); + + // Read again, we should *not* iterate again (in contrast to "test_getRows_iterable"). + //noinspection MismatchedQueryAndUpdateOfCollection + final List ignored = Lists.newArrayList(list); + Assert.assertEquals(1, iterationCounter.get()); + } + + @Test + public void test_withChildren_empty() + { + Assert.assertSame(listDataSource, listDataSource.withChildren(Collections.emptyList())); + } + + @Test + public void test_withChildren_nonEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot accept children"); + + // Workaround so "withChildren" isn't flagged as unused in the DataSource interface. + ((DataSource) listDataSource).withChildren(ImmutableList.of(new TableDataSource("foo"))); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(InlineDataSource.class) + .usingGetClass() + .withNonnullFields("columnNames", "columnTypes", "rows") + .verify(); + } + + @Test + public void test_toString_iterable() + { + // Verify that toString does not iterate the rows. + final String ignored = iterableDataSource.toString(); + Assert.assertEquals(0, iterationCounter.get()); + } + + @Test + public void test_serde_list() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final InlineDataSource deserialized = (InlineDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(listDataSource), + DataSource.class + ); + + Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames()); + Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes()); + assertRowsEqual(listDataSource.getRows(), deserialized.getRows()); + } + + @Test + public void test_serde_iterable() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final InlineDataSource deserialized = (InlineDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(iterableDataSource), + DataSource.class + ); + + // Lazy iterables turn into Lists upon serialization. + Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames()); + Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes()); + assertRowsEqual(listDataSource.getRows(), deserialized.getRows()); + + // Should have iterated once. + Assert.assertEquals(1, iterationCounter.get()); + } + + /** + * This method exists because "equals" on two equivalent Object[] won't return true, so we need to check + * for equality deeply. + */ + private static void assertRowsEqual(final Iterable expectedRows, final Iterable actualRows) + { + if (expectedRows instanceof List && actualRows instanceof List) { + // Only check equality deeply when both rows1 and rows2 are Lists, i.e., non-lazy. + final List expectedRowsList = (List) expectedRows; + final List actualRowsList = (List) actualRows; + + final int sz = expectedRowsList.size(); + Assert.assertEquals("number of rows", sz, actualRowsList.size()); + + // Super slow for LinkedLists, but we don't expect those to be used here. + // (They're generally forbidden in Druid except for special cases.) + for (int i = 0; i < sz; i++) { + Assert.assertArrayEquals("row #" + i, expectedRowsList.get(i), actualRowsList.get(i)); + } + } else { + // If they're not both Lists, we don't want to iterate them during equality checks, so do a non-deep check. + // This might still return true if whatever class they are has another way of checking equality. But, usually we + // expect this to return false. + Assert.assertEquals("rows", expectedRows, actualRows); + } + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java new file mode 100644 index 000000000000..e0a990e753ae --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.join.JoinType; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class JoinDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final TableDataSource fooTable = new TableDataSource("foo"); + private final TableDataSource barTable = new TableDataSource("bar"); + private final LookupDataSource lookylooLookup = new LookupDataSource("lookyloo"); + + private final JoinDataSource joinTableToLookup = JoinDataSource.create( + fooTable, + lookylooLookup, + "j.", + "x == \"j.x\"", + JoinType.LEFT, + ExprMacroTable.nil() + ); + + private final JoinDataSource joinTableToTable = JoinDataSource.create( + fooTable, + barTable, + "j.", + "x == \"j.x\"", + JoinType.LEFT, + ExprMacroTable.nil() + ); + + @Test + public void test_getTableNames_tableToTable() + { + Assert.assertEquals(ImmutableSet.of("foo", "bar"), joinTableToTable.getTableNames()); + } + + @Test + public void test_getTableNames_tableToLookup() + { + Assert.assertEquals(Collections.singleton("foo"), joinTableToLookup.getTableNames()); + } + + @Test + public void test_getChildren_tableToTable() + { + Assert.assertEquals(ImmutableList.of(fooTable, barTable), joinTableToTable.getChildren()); + } + + @Test + public void test_getChildren_tableToLookup() + { + Assert.assertEquals(ImmutableList.of(fooTable, lookylooLookup), joinTableToLookup.getChildren()); + } + + @Test + public void test_isCacheable_tableToTable() + { + Assert.assertFalse(joinTableToTable.isCacheable()); + } + + @Test + public void test_isCacheable_lookup() + { + Assert.assertFalse(joinTableToLookup.isCacheable()); + } + + @Test + public void test_isConcrete_tableToTable() + { + Assert.assertFalse(joinTableToTable.isConcrete()); + } + + @Test + public void test_isConcrete_tableToLookup() + { + Assert.assertFalse(joinTableToLookup.isConcrete()); + } + + @Test + public void test_isGlobal_tableToTable() + { + Assert.assertFalse(joinTableToTable.isGlobal()); + } + + @Test + public void test_isGlobal_tableToLookup() + { + Assert.assertFalse(joinTableToLookup.isGlobal()); + } + + @Test + public void test_withChildren_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Expected [2] children, got [0]"); + + final DataSource ignored = joinTableToTable.withChildren(Collections.emptyList()); + } + + @Test + public void test_withChildren_two() + { + final DataSource transformed = joinTableToTable.withChildren(ImmutableList.of(fooTable, lookylooLookup)); + + Assert.assertEquals(joinTableToLookup, transformed); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(JoinDataSource.class) + .usingGetClass() + .withNonnullFields("left", "right", "rightPrefix", "conditionAnalysis", "joinType") + .verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final JoinDataSource deserialized = (JoinDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(joinTableToLookup), + DataSource.class + ); + + Assert.assertEquals(joinTableToLookup, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java new file mode 100644 index 000000000000..c68579ff60a0 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/LookupDataSourceTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class LookupDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final LookupDataSource lookylooDataSource = new LookupDataSource("lookyloo"); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(Collections.emptySet(), lookylooDataSource.getTableNames()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals(Collections.emptyList(), lookylooDataSource.getChildren()); + } + + @Test + public void test_isCacheable() + { + Assert.assertFalse(lookylooDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertTrue(lookylooDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertFalse(lookylooDataSource.isConcrete()); + } + + @Test + public void test_withChildren_empty() + { + Assert.assertSame(lookylooDataSource, lookylooDataSource.withChildren(Collections.emptyList())); + } + + @Test + public void test_withChildren_nonEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot accept children"); + + lookylooDataSource.withChildren(ImmutableList.of(new LookupDataSource("bar"))); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(LookupDataSource.class).usingGetClass().withNonnullFields("lookupName").verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final LookupDataSource deserialized = (LookupDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(lookylooDataSource), + DataSource.class + ); + + Assert.assertEquals(lookylooDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java new file mode 100644 index 000000000000..df8c1f6b5647 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/QueryDataSourceTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class QueryDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final TimeseriesQuery queryOnTable = + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + private final TimeseriesQuery queryOnLookup = + Druids.newTimeseriesQueryBuilder() + .dataSource(new LookupDataSource("lookyloo")) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + private final QueryDataSource queryOnTableDataSource = new QueryDataSource(queryOnTable); + private final QueryDataSource queryOnLookupDataSource = new QueryDataSource(queryOnLookup); + + @Test + public void test_getTableNames_table() + { + Assert.assertEquals(Collections.singleton("foo"), queryOnTableDataSource.getTableNames()); + } + + @Test + public void test_getTableNames_lookup() + { + Assert.assertEquals(Collections.emptySet(), queryOnLookupDataSource.getTableNames()); + } + + @Test + public void test_getChildren_table() + { + Assert.assertEquals(Collections.singletonList(new TableDataSource("foo")), queryOnTableDataSource.getChildren()); + } + + @Test + public void test_getChildren_lookup() + { + Assert.assertEquals( + Collections.singletonList(new LookupDataSource("lookyloo")), + queryOnLookupDataSource.getChildren() + ); + } + + @Test + public void test_isCacheable_table() + { + Assert.assertFalse(queryOnTableDataSource.isCacheable()); + } + + @Test + public void test_isCacheable_lookup() + { + Assert.assertFalse(queryOnLookupDataSource.isCacheable()); + } + + @Test + public void test_isConcrete_table() + { + Assert.assertFalse(queryOnTableDataSource.isConcrete()); + } + + @Test + public void test_isConcrete_lookup() + { + Assert.assertFalse(queryOnLookupDataSource.isConcrete()); + } + + @Test + public void test_isGlobal_table() + { + Assert.assertFalse(queryOnTableDataSource.isGlobal()); + } + + @Test + public void test_isGlobal_lookup() + { + Assert.assertTrue(queryOnLookupDataSource.isGlobal()); + } + + @Test + public void test_withChildren_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Must have exactly one child"); + + final DataSource ignored = queryOnLookupDataSource.withChildren(Collections.emptyList()); + } + + @Test + public void test_withChildren_single() + { + final TableDataSource barTable = new TableDataSource("bar"); + + final QueryDataSource transformed = + (QueryDataSource) queryOnLookupDataSource.withChildren(Collections.singletonList(barTable)); + + Assert.assertEquals(barTable, transformed.getQuery().getDataSource()); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(QueryDataSource.class).usingGetClass().withNonnullFields("query").verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final QueryDataSource deserialized = (QueryDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(queryOnTableDataSource), + DataSource.class + ); + + Assert.assertEquals(queryOnTableDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java new file mode 100644 index 000000000000..ef50f3e45d9d --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class TableDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final TableDataSource fooDataSource = new TableDataSource("foo"); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(Collections.singleton("foo"), fooDataSource.getTableNames()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals(Collections.emptyList(), fooDataSource.getChildren()); + } + + @Test + public void test_isCacheable() + { + Assert.assertTrue(fooDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertFalse(fooDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertTrue(fooDataSource.isConcrete()); + } + + @Test + public void test_withChildren_empty() + { + Assert.assertSame(fooDataSource, fooDataSource.withChildren(Collections.emptyList())); + } + + @Test + public void test_withChildren_nonEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Cannot accept children"); + + fooDataSource.withChildren(ImmutableList.of(new TableDataSource("bar"))); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(TableDataSource.class).withNonnullFields("name").verify(); + } + + @Test + public void test_equals_legacy() + { + final LegacyDataSource legacyFoo = new LegacyDataSource("foo"); + final LegacyDataSource legacyBar = new LegacyDataSource("bar"); + + Assert.assertEquals(legacyFoo, fooDataSource); + Assert.assertEquals(fooDataSource, legacyFoo); + + Assert.assertNotEquals(legacyBar, fooDataSource); + Assert.assertNotEquals(fooDataSource, legacyBar); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final TableDataSource deserialized = (TableDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(fooDataSource), + DataSource.class + ); + + Assert.assertEquals(fooDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java new file mode 100644 index 000000000000..117225d890b9 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; +import java.util.List; + +public class UnionDataSourceTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final UnionDataSource unionDataSource = new UnionDataSource( + ImmutableList.of( + new TableDataSource("foo"), + new TableDataSource("bar") + ) + ); + + private final UnionDataSource unionDataSourceWithDuplicates = new UnionDataSource( + ImmutableList.of( + new TableDataSource("bar"), + new TableDataSource("foo"), + new TableDataSource("bar") + ) + ); + + @Test + public void test_getTableNames() + { + Assert.assertEquals(ImmutableSet.of("foo", "bar"), unionDataSource.getTableNames()); + } + + @Test + public void test_getTableNames_withDuplicates() + { + Assert.assertEquals(ImmutableSet.of("foo", "bar"), unionDataSourceWithDuplicates.getTableNames()); + } + + @Test + public void test_getChildren() + { + Assert.assertEquals( + ImmutableList.of(new TableDataSource("foo"), new TableDataSource("bar")), + unionDataSource.getChildren() + ); + } + + @Test + public void test_getChildren_withDuplicates() + { + Assert.assertEquals( + ImmutableList.of(new TableDataSource("bar"), new TableDataSource("foo"), new TableDataSource("bar")), + unionDataSourceWithDuplicates.getChildren() + ); + } + + @Test + public void test_isCacheable() + { + Assert.assertFalse(unionDataSource.isCacheable()); + } + + @Test + public void test_isGlobal() + { + Assert.assertFalse(unionDataSource.isGlobal()); + } + + @Test + public void test_isConcrete() + { + Assert.assertTrue(unionDataSource.isConcrete()); + } + + @Test + public void test_withChildren_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Expected [2] children, got [0]"); + + unionDataSource.withChildren(Collections.emptyList()); + } + + @Test + public void test_withChildren_sameNumber() + { + final List newDataSources = ImmutableList.of( + new TableDataSource("baz"), + new TableDataSource("qux") + ); + + //noinspection unchecked + Assert.assertEquals( + new UnionDataSource(newDataSources), + unionDataSource.withChildren((List) newDataSources) + ); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(UnionDataSource.class).usingGetClass().withNonnullFields("dataSources").verify(); + } + + @Test + public void test_serde() throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final UnionDataSource deserialized = (UnionDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(unionDataSource), + DataSource.class + ); + + Assert.assertEquals(unionDataSource, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java index 3b9e5e8971fe..a9ce7a9471b2 100644 --- a/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java @@ -44,7 +44,7 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // verify that table datasource is passed to baseQueryRunner Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource); - String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getNames()); + String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getTableNames()); if ("ds1".equals(dsName)) { ds1.compareAndSet(false, true); return Sequences.simple(Arrays.asList(1, 2, 3)); diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java index 5b254ea18eb6..5f044ec339f6 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java @@ -917,7 +917,7 @@ public void testSerde() throws Exception Query query = MAPPER.readValue(queryStr, Query.class); Assert.assertTrue(query instanceof SegmentMetadataQuery); - Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames())); + Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames())); Assert.assertEquals( Intervals.of("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0) @@ -937,7 +937,7 @@ public void testSerdeWithDefaultInterval() throws Exception + "}"; Query query = MAPPER.readValue(queryStr, Query.class); Assert.assertTrue(query instanceof SegmentMetadataQuery); - Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames())); + Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getTableNames())); Assert.assertEquals(Intervals.ETERNITY, query.getIntervals().get(0)); Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval()); diff --git a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java new file mode 100644 index 000000000000..1b24c5e38983 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import com.google.common.collect.ImmutableList; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class DataSourceAnalysisTest +{ + private static final List MILLENIUM_INTERVALS = ImmutableList.of(Intervals.of("2000/3000")); + private static final TableDataSource TABLE_FOO = new TableDataSource("foo"); + private static final TableDataSource TABLE_BAR = new TableDataSource("bar"); + private static final LookupDataSource LOOKUP_LOOKYLOO = new LookupDataSource("lookyloo"); + private static final InlineDataSource INLINE = InlineDataSource.fromIterable( + ImmutableList.of("column"), + ImmutableList.of(ValueType.STRING), + ImmutableList.of(new Object[0]) + ); + + @Test + public void testTable() + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(TABLE_FOO); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(TABLE_FOO, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testUnion() + { + final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR)); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(unionDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(unionDataSource, analysis.getDataSource()); + Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testQueryOnTable() + { + final QueryDataSource queryDataSource = subquery(TABLE_FOO); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testQueryOnUnion() + { + final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR)); + final QueryDataSource queryDataSource = subquery(unionDataSource); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testLookup() + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testQueryOnLookup() + { + final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testInline() + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(INLINE); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(INLINE, analysis.getDataSource()); + Assert.assertEquals(INLINE, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals(Collections.emptyList(), analysis.getPreJoinableClauses()); + } + + @Test + public void testJoinSimpleLeftLeaning() + { + // Join of a table onto a variety of simple joinable objects (lookup, inline, subquery) with a left-leaning + // structure (no right children are joins themselves). + + final JoinDataSource joinDataSource = + join( + join( + join( + TABLE_FOO, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ), + INLINE, + "2.", + JoinType.LEFT + ), + subquery(LOOKUP_LOOKYLOO), + "3.", + JoinType.FULL + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")), + new PreJoinableClause("2.", INLINE, JoinType.LEFT, joinClause("2.")), + new PreJoinableClause("3.", subquery(LOOKUP_LOOKYLOO), JoinType.FULL, joinClause("3.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinSimpleRightLeaning() + { + // Join of a table onto a variety of simple joinable objects (lookup, inline, subquery) with a right-leaning + // structure (no left children are joins themselves). + // + // Note that unlike the left-leaning stack, which is fully flattened, this one will not get flattened at all. + + final JoinDataSource rightLeaningJoinStack = + join( + LOOKUP_LOOKYLOO, + join( + INLINE, + subquery(LOOKUP_LOOKYLOO), + "1.", + JoinType.LEFT + ), + "2.", + JoinType.FULL + ); + + final JoinDataSource joinDataSource = + join( + TABLE_FOO, + rightLeaningJoinStack, + "3.", + JoinType.RIGHT + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("3.", rightLeaningJoinStack, JoinType.RIGHT, joinClause("3.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinOverTableSubquery() + { + final JoinDataSource joinDataSource = join( + TABLE_FOO, + subquery(TABLE_FOO), + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", subquery(TABLE_FOO), JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinTableUnionToLookup() + { + final UnionDataSource unionDataSource = new UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR)); + final JoinDataSource joinDataSource = join( + unionDataSource, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(unionDataSource, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinUnderTopLevelSubqueries() + { + final QueryDataSource queryDataSource = + subquery( + subquery( + join( + TABLE_FOO, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ) + ) + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource); + + Assert.assertTrue(analysis.isConcreteBased()); + Assert.assertTrue(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertTrue(analysis.isQuery()); + Assert.assertEquals(queryDataSource, analysis.getDataSource()); + Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.of(TABLE_FOO), analysis.getBaseTableDataSource()); + Assert.assertEquals( + Optional.of(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)), + analysis.getBaseQuerySegmentSpec() + ); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinLookupToLookup() + { + final JoinDataSource joinDataSource = join( + LOOKUP_LOOKYLOO, + LOOKUP_LOOKYLOO, + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertTrue(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", LOOKUP_LOOKYLOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testJoinLookupToTable() + { + final JoinDataSource joinDataSource = join( + LOOKUP_LOOKYLOO, + TABLE_FOO, + "1.", + JoinType.INNER + ); + + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource); + + Assert.assertFalse(analysis.isConcreteBased()); + Assert.assertFalse(analysis.isConcreteTableBased()); + Assert.assertFalse(analysis.isGlobal()); + Assert.assertFalse(analysis.isQuery()); + Assert.assertEquals(joinDataSource, analysis.getDataSource()); + Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource()); + Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec()); + Assert.assertEquals( + ImmutableList.of( + new PreJoinableClause("1.", TABLE_FOO, JoinType.INNER, joinClause("1.")) + ), + analysis.getPreJoinableClauses() + ); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(DataSourceAnalysis.class) + .usingGetClass() + .withNonnullFields("dataSource", "baseDataSource", "preJoinableClauses") + .verify(); + } + + /** + * Generate a datasource that joins on a column named "x" on both sides. + */ + private static JoinDataSource join( + final DataSource left, + final DataSource right, + final String rightPrefix, + final JoinType joinType + ) + { + return JoinDataSource.create( + left, + right, + rightPrefix, + joinClause(rightPrefix).getOriginalExpression(), + joinType, + ExprMacroTable.nil() + ); + } + + /** + * Generate a join clause that joins on a column named "x" on both sides. + */ + private static JoinConditionAnalysis joinClause( + final String rightPrefix + ) + { + return JoinConditionAnalysis.forExpression( + StringUtils.format("x == \"%sx\"", rightPrefix), + rightPrefix, + ExprMacroTable.nil() + ); + } + + /** + * Generate a datasource that does a subquery on another datasource. The specific kind of query doesn't matter + * much for the purpose of this test class, so it's always the same. + */ + private static QueryDataSource subquery(final DataSource dataSource) + { + return new QueryDataSource( + GroupByQuery.builder() + .setDataSource(dataSource) + .setInterval(new MultipleIntervalSegmentSpec(MILLENIUM_INTERVALS)) + .setGranularity(Granularities.ALL) + .build() + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/planning/PreJoinableClauseTest.java b/processing/src/test/java/org/apache/druid/query/planning/PreJoinableClauseTest.java new file mode 100644 index 000000000000..80762a758c83 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/planning/PreJoinableClauseTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.planning; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.JoinType; +import org.junit.Assert; +import org.junit.Test; + +public class PreJoinableClauseTest +{ + private final PreJoinableClause clause = new PreJoinableClause( + "j.", + new TableDataSource("foo"), + JoinType.LEFT, + JoinConditionAnalysis.forExpression("x == \"j.x\"", "j.", ExprMacroTable.nil()) + ); + + @Test + public void test_getPrefix() + { + Assert.assertEquals("j.", clause.getPrefix()); + } + + @Test + public void test_getJoinType() + { + Assert.assertEquals(JoinType.LEFT, clause.getJoinType()); + } + + @Test + public void test_getCondition() + { + Assert.assertEquals("x == \"j.x\"", clause.getCondition().getOriginalExpression()); + } + + @Test + public void test_getDataSource() + { + Assert.assertEquals(new TableDataSource("foo"), clause.getDataSource()); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(PreJoinableClause.class) + .usingGetClass() + .withNonnullFields("prefix", "dataSource", "joinType", "condition") + .verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java new file mode 100644 index 000000000000..ae3f845529ea --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import org.apache.druid.segment.column.ColumnHolder; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class JoinablesTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void test_validatePrefix_null() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have null or empty prefix"); + + Joinables.validatePrefix(null); + } + + @Test + public void test_validatePrefix_empty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have null or empty prefix"); + + Joinables.validatePrefix(""); + } + + @Test + public void test_validatePrefix_underscore() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have prefix[_]"); + + Joinables.validatePrefix("_"); + } + + @Test + public void test_validatePrefix_timeColumn() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Join clause cannot have prefix[__time]"); + + Joinables.validatePrefix(ColumnHolder.TIME_COLUMN_NAME); + } + + @Test + public void test_isPrefixedBy() + { + Assert.assertTrue(Joinables.isPrefixedBy("foo", "")); + Assert.assertTrue(Joinables.isPrefixedBy("foo", "f")); + Assert.assertTrue(Joinables.isPrefixedBy("foo", "fo")); + Assert.assertFalse(Joinables.isPrefixedBy("foo", "foo")); + } +} diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index d3f0a6672308..4b365b7ce89e 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -59,6 +59,7 @@ import java.util.stream.Collectors; /** + * */ @ManageLifecycle public class BrokerServerView implements TimelineServerView @@ -293,7 +294,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen @Override public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - String table = Iterables.getOnlyElement(dataSource.getNames()); + String table = Iterables.getOnlyElement(dataSource.getTableNames()); synchronized (lock) { return timelines.get(table); } diff --git a/server/src/main/java/org/apache/druid/client/CacheUtil.java b/server/src/main/java/org/apache/druid/client/CacheUtil.java index d3d9183e1c16..242bece98c13 100644 --- a/server/src/main/java/org/apache/druid/client/CacheUtil.java +++ b/server/src/main/java/org/apache/druid/client/CacheUtil.java @@ -25,13 +25,52 @@ import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.nio.ByteBuffer; public class CacheUtil { + public enum ServerType + { + BROKER { + @Override + boolean willMergeRunners() + { + return false; + } + }, + DATA { + @Override + boolean willMergeRunners() + { + return true; + } + }; + + /** + * Same meaning as the "willMergeRunners" parameter to {@link CacheStrategy#isCacheable}. + */ + abstract boolean willMergeRunners(); + } + + public static Cache.NamedKey computeResultLevelCacheKey(String resultLevelCacheIdentifier) + { + return new Cache.NamedKey(resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier)); + } + + public static void populateResultCache( + Cache cache, + Cache.NamedKey key, + byte[] resultBytes + ) + { + cache.put(key, resultBytes); + } + public static Cache.NamedKey computeSegmentCacheKey( String segmentId, SegmentDescriptor descriptor, @@ -54,64 +93,105 @@ public static Cache.NamedKey computeSegmentCacheKey( ); } - public static boolean useCacheOnBrokers( + /** + * Returns whether the segment-level cache should be checked for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isUseSegmentCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isUseCache(query) + && cacheConfig.isUseCache(); } - public static boolean populateCacheOnBrokers( + /** + * Returns whether the result-level cache should be populated for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isPopulateSegmentCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isPopulateCache(query) + && cacheConfig.isPopulateCache(); } - public static boolean useCacheOnDataNodes( + /** + * Returns whether the result-level cache should be checked for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isUseResultCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isUseResultLevelCache(query) + && cacheConfig.isUseResultLevelCache(); } - public static boolean populateCacheOnDataNodes( + /** + * Returns whether the result-level cache should be populated for a particular query. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + public static boolean isPopulateResultCache( Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + @Nullable CacheStrategy> cacheStrategy, + CacheConfig cacheConfig, + ServerType serverType ) { - return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true); + return isQueryCacheable(query, cacheStrategy, cacheConfig, serverType) + && QueryContexts.isPopulateResultLevelCache(query) + && cacheConfig.isPopulateResultLevelCache(); } - private static boolean useCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig + /** + * Returns whether a particular query is cacheable. Does not check whether we are actually configured to use or + * populate the cache; that should be done separately. + * + * @param query the query to check + * @param cacheStrategy result of {@link QueryToolChest#getCacheStrategy} on this query + * @param cacheConfig current active cache config + * @param serverType BROKER or DATA + */ + static boolean isQueryCacheable( + final Query query, + @Nullable final CacheStrategy> cacheStrategy, + final CacheConfig cacheConfig, + final ServerType serverType ) { - return QueryContexts.isUseCache(query) - && strategy != null - && cacheConfig.isUseCache() - && cacheConfig.isQueryCacheable(query); + return cacheStrategy != null + && cacheStrategy.isCacheable(query, serverType.willMergeRunners()) + && cacheConfig.isQueryCacheable(query) + && query.getDataSource().isCacheable(); } - - private static boolean populateCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return QueryContexts.isPopulateCache(query) - && strategy != null - && cacheConfig.isPopulateCache() - && cacheConfig.isQueryCacheable(query); - } - } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 414c0541b648..9fbc354109cb 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -241,8 +241,8 @@ private class SpecificQueryRunnable this.toolChest = warehouse.getToolChest(query); this.strategy = toolChest.getCacheStrategy(query); - this.useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); - this.populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); + this.useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); + this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); this.isBySegment = QueryContexts.isBySegment(query); // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/apache/druid/issues/2108 diff --git a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java index 130113963677..f69c413fdf0e 100644 --- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java @@ -77,8 +77,13 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { Query query = queryPlus.getQuery(); final CacheStrategy strategy = toolChest.getCacheStrategy(query); - final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig); - final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig); + final boolean populateCache = CacheUtil.isPopulateSegmentCache( + query, + strategy, + cacheConfig, + CacheUtil.ServerType.DATA + ); + final boolean useCache = CacheUtil.isUseSegmentCache(query, strategy, cacheConfig, CacheUtil.ServerType.DATA); final Cache.NamedKey key; if (strategy != null && (useCache || populateCache)) { diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java index 1f1d801d9e36..2517a8f0e9be 100644 --- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java @@ -194,7 +194,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - String table = Iterables.getOnlyElement(dataSource.getNames()); + String table = Iterables.getOnlyElement(dataSource.getTableNames()); synchronized (lock) { return timelines.get(table); } diff --git a/server/src/main/java/org/apache/druid/client/ResultLevelCacheUtil.java b/server/src/main/java/org/apache/druid/client/ResultLevelCacheUtil.java deleted file mode 100644 index 7ca9dc19917f..000000000000 --- a/server/src/main/java/org/apache/druid/client/ResultLevelCacheUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.client; - -import org.apache.druid.client.cache.Cache; -import org.apache.druid.client.cache.CacheConfig; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.CacheStrategy; -import org.apache.druid.query.Query; -import org.apache.druid.query.QueryContexts; - -public class ResultLevelCacheUtil -{ - private static final Logger log = new Logger(ResultLevelCacheUtil.class); - - public static Cache.NamedKey computeResultLevelCacheKey(String resultLevelCacheIdentifier) - { - return new Cache.NamedKey(resultLevelCacheIdentifier, StringUtils.toUtf8(resultLevelCacheIdentifier)); - } - - public static void populate( - Cache cache, - Cache.NamedKey key, - byte[] resultBytes - ) - { - log.debug("Populating results into cache"); - cache.put(key, resultBytes); - } - - public static boolean useResultLevelCacheOnBrokers( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return useResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); - } - - public static boolean populateResultLevelCacheOnBrokers( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return populateResultLevelCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false); - } - - private static boolean useResultLevelCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return QueryContexts.isUseResultLevelCache(query) - && strategy != null - && cacheConfig.isUseResultLevelCache() - && cacheConfig.isQueryCacheable(query); - } - - private static boolean populateResultLevelCache( - Query query, - CacheStrategy> strategy, - CacheConfig cacheConfig - ) - { - return QueryContexts.isPopulateResultLevelCache(query) - && strategy != null - && cacheConfig.isPopulateResultLevelCache() - && cacheConfig.isQueryCacheable(query); - } -} diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 0a7407fa52f8..93a7d13f41a7 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -25,7 +25,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import org.apache.druid.client.ResultLevelCacheUtil; +import org.apache.druid.client.CacheUtil; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.java.util.common.RE; @@ -71,8 +71,13 @@ public ResultLevelCachingQueryRunner( this.cacheConfig = cacheConfig; this.query = query; this.strategy = queryToolChest.getCacheStrategy(query); - this.populateResultCache = ResultLevelCacheUtil.populateResultLevelCacheOnBrokers(query, strategy, cacheConfig); - this.useResultCache = ResultLevelCacheUtil.useResultLevelCacheOnBrokers(query, strategy, cacheConfig); + this.populateResultCache = CacheUtil.isPopulateResultCache( + query, + strategy, + cacheConfig, + CacheUtil.ServerType.BROKER + ); + this.useResultCache = CacheUtil.isUseResultCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER); } @Override @@ -162,7 +167,7 @@ private byte[] fetchResultsFromResultLevelCache( ) { if (useResultCache && queryCacheKey != null) { - return cache.get(ResultLevelCacheUtil.computeResultLevelCacheKey(queryCacheKey)); + return cache.get(CacheUtil.computeResultLevelCacheKey(queryCacheKey)); } return null; } @@ -216,7 +221,7 @@ private ResultLevelCachePopulator createResultLevelCachePopulator( ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator( cache, objectMapper, - ResultLevelCacheUtil.computeResultLevelCacheKey(cacheKeyStr), + CacheUtil.computeResultLevelCacheKey(cacheKeyStr), cacheConfig, true ); @@ -292,7 +297,7 @@ private void cacheResultEntry( public void populateResults() { - ResultLevelCacheUtil.populate( + CacheUtil.populateResultCache( cache, key, Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray() diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 6c5287920182..451e079b26b4 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -53,6 +53,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SinkQueryRunners; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.Segment; @@ -168,6 +169,12 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final return new NoopQueryRunner<>(); } + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { throw new ISE("Unknown query type[%s].", query.getClass()); diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 29a173ccf985..ddc15aedef07 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -25,6 +25,7 @@ import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.PostProcessingOperator; @@ -37,10 +38,12 @@ import org.apache.druid.query.RetryQueryRunner; import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.initialization.ServerConfig; import org.joda.time.Interval; /** + * */ public class ClientQuerySegmentWalker implements QuerySegmentWalker { @@ -79,12 +82,24 @@ public ClientQuerySegmentWalker( @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); } @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); } @@ -93,12 +108,14 @@ private QueryRunner makeRunner(Query query, QueryRunner baseClientR QueryToolChest> toolChest = warehouse.getToolChest(query); // This does not adhere to the fluent workflow. See https://github.com/apache/druid/issues/5517 - return new ResultLevelCachingQueryRunner<>(makeRunner(query, baseClientRunner, toolChest), - toolChest, - query, - objectMapper, - cache, - cacheConfig); + return new ResultLevelCachingQueryRunner<>( + makeRunner(query, baseClientRunner, toolChest), + toolChest, + query, + objectMapper, + cache, + cacheConfig + ); } private QueryRunner makeRunner( diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index 87508107d194..6e876c4d0922 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -189,7 +189,7 @@ public Access authorize(final AuthenticationResult authenticationResult) AuthorizationUtils.authorizeAllResourceActions( authenticationResult, Iterables.transform( - baseQuery.getDataSource().getNames(), + baseQuery.getDataSource().getTableNames(), AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR ), authorizerMapper @@ -213,7 +213,7 @@ public Access authorize(HttpServletRequest req) AuthorizationUtils.authorizeAllResourceActions( req, Iterables.transform( - baseQuery.getDataSource().getNames(), + baseQuery.getDataSource().getTableNames(), AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR ), authorizerMapper diff --git a/server/src/main/java/org/apache/druid/server/QueryManager.java b/server/src/main/java/org/apache/druid/server/QueryManager.java index a90bf4b077d5..0fd1807d86e3 100644 --- a/server/src/main/java/org/apache/druid/server/QueryManager.java +++ b/server/src/main/java/org/apache/druid/server/QueryManager.java @@ -27,7 +27,6 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryWatcher; -import java.util.List; import java.util.Set; public class QueryManager implements QueryWatcher @@ -61,7 +60,7 @@ public boolean cancelQuery(String id) public void registerQuery(Query query, final ListenableFuture future) { final String id = query.getId(); - final List datasources = query.getDataSource().getNames(); + final Set datasources = query.getDataSource().getTableNames(); queries.put(id, future); queryDatasources.putAll(id, datasources); future.addListener( diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 78100aa153a4..e2fb263def4c 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -190,7 +190,7 @@ public Response doPost( "%s[%s_%s_%s]", currThreadName, query.getType(), - query.getDataSource().getNames(), + query.getDataSource().getTableNames(), queryId ); diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 98e6f3e814a3..41c71160e5ec 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -53,6 +53,7 @@ import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ReferenceCountingSegment; @@ -71,8 +72,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; -/** - */ public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); @@ -128,6 +127,12 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable> toolChest = factory.getToolchest(); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); @@ -162,8 +167,7 @@ public Iterable> apply(In { @Override public Iterable> apply( - @Nullable - final TimelineObjectHolder holder + @Nullable final TimelineObjectHolder holder ) { if (holder == null) { @@ -210,7 +214,7 @@ public QueryRunner apply(PartitionChunk input) private String getDataSourceName(DataSource dataSource) { - return Iterables.getOnlyElement(dataSource.getNames()); + return Iterables.getOnlyElement(dataSource.getTableNames()); } @Override @@ -224,6 +228,12 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable(); } + // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + if (!analysis.getPreJoinableClauses().isEmpty()) { + throw new ISE("Cannot handle join dataSource"); + } + final QueryToolChest> toolChest = factory.getToolchest(); String dataSourceName = getDataSourceName(query.getDataSource()); diff --git a/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java b/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java index a0af4cb2fa1b..bcb628a195b8 100644 --- a/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java +++ b/server/src/main/java/org/apache/druid/server/log/LoggingRequestLogger.java @@ -20,21 +20,16 @@ package org.apache.druid.server.log; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.DataSource; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.UnionDataSource; import org.apache.druid.server.RequestLogLine; import org.slf4j.MDC; import java.io.IOException; import java.util.Map; -import java.util.stream.Collectors; public class LoggingRequestLogger implements RequestLogger { @@ -66,7 +61,7 @@ public void logNativeQuery(RequestLogLine requestLogLine) throws IOException final Query query = requestLogLine.getQuery(); MDC.put("queryId", query.getId()); MDC.put("sqlQueryId", StringUtils.nullToEmptyNonDruidDataString(query.getSqlQueryId())); - MDC.put("dataSource", findInnerDatasource(query).toString()); + MDC.put("dataSource", String.join(",", query.getDataSource().getTableNames())); MDC.put("queryType", query.getType()); MDC.put("isNested", String.valueOf(!(query.getDataSource() instanceof TableDataSource))); MDC.put("hasFilters", Boolean.toString(query.hasFilters())); @@ -119,30 +114,6 @@ public boolean isSetContextMDC() return setContextMDC; } - private Object findInnerDatasource(Query query) - { - DataSource _ds = query.getDataSource(); - if (_ds instanceof TableDataSource) { - return ((TableDataSource) _ds).getName(); - } - if (_ds instanceof QueryDataSource) { - return findInnerDatasource(((QueryDataSource) _ds).getQuery()); - } - if (_ds instanceof UnionDataSource) { - return Joiner.on(",") - .join( - ((UnionDataSource) _ds) - .getDataSources() - .stream() - .map(TableDataSource::getName) - .collect(Collectors.toList()) - ); - } else { - // should not come here - return query.getDataSource(); - } - } - @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 96473e1614e2..08ce78ff0fbc 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -201,7 +201,7 @@ public Pair select(final Query query) if (brokerServiceName == null) { // For Union Queries tier will be selected on the rules for first dataSource. - List rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getNames(), null)); + List rules = ruleManager.getRulesWithDefault(Iterables.getFirst(query.getDataSource().getTableNames(), null)); // find the rule that can apply to the entire set of intervals DateTime now = DateTimes.nowUtc(); diff --git a/server/src/test/java/org/apache/druid/client/CacheUtilTest.java b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java new file mode 100644 index 000000000000..e7a512046fa9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/CacheUtilTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.Druids; +import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class CacheUtilTest +{ + private final TimeseriesQuery timeseriesQuery = + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + @Test + public void test_isQueryCacheable_cacheableOnBroker() + { + Assert.assertTrue( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_cacheableOnDataServer() + { + Assert.assertTrue( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.DATA + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableOnBroker() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(false, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableOnDataServer() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, false), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.DATA + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableType() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + new DummyCacheStrategy<>(true, false), + makeCacheConfig(ImmutableMap.of("unCacheable", ImmutableList.of("timeseries"))), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_unCacheableDataSource() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery.withDataSource(new LookupDataSource("lookyloo")), + new DummyCacheStrategy<>(true, true), + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + @Test + public void test_isQueryCacheable_nullCacheStrategy() + { + Assert.assertFalse( + CacheUtil.isQueryCacheable( + timeseriesQuery, + null, + makeCacheConfig(ImmutableMap.of()), + CacheUtil.ServerType.BROKER + ) + ); + } + + private static CacheConfig makeCacheConfig(final Map properties) + { + return TestHelper.makeJsonMapper().convertValue(properties, CacheConfig.class); + } + + private static class DummyCacheStrategy> + implements CacheStrategy + { + private final boolean cacheableOnBrokers; + private final boolean cacheableOnDataServers; + + public DummyCacheStrategy(boolean cacheableOnBrokers, boolean cacheableOnDataServers) + { + this.cacheableOnBrokers = cacheableOnBrokers; + this.cacheableOnDataServers = cacheableOnDataServers; + } + + @Override + public boolean isCacheable(QueryType query, boolean willMergeRunners) + { + return willMergeRunners ? cacheableOnDataServers : cacheableOnBrokers; + } + + @Override + public byte[] computeCacheKey(QueryType query) + { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] computeResultLevelCacheKey(QueryType query) + { + throw new UnsupportedOperationException(); + } + + @Override + public TypeReference getCacheObjectClazz() + { + throw new UnsupportedOperationException(); + } + + @Override + public Function prepareForCache(boolean isResultLevelCache) + { + throw new UnsupportedOperationException(); + } + + @Override + public Function pullFromCache(boolean isResultLevelCache) + { + throw new UnsupportedOperationException(); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java index a1b1910a2ef2..934361a8b40d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java @@ -41,6 +41,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Set; /** * DruidRel that uses a "query" dataSource. @@ -199,9 +200,9 @@ public RelNode copy(final RelTraitSet traitSet, final List inputs) } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { - return ((DruidRel) sourceRel).getDataSourceNames(); + return ((DruidRel) sourceRel).getDataSourceNames(); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java index 3deb8e415539..fed6d886a6f7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java @@ -35,7 +35,7 @@ import org.apache.druid.sql.calcite.table.DruidTable; import javax.annotation.Nonnull; -import java.util.List; +import java.util.Set; /** * DruidRel that uses a "table" dataSource. @@ -123,9 +123,9 @@ public DruidQueryRel asDruidConvention() } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { - return druidTable.getDataSource().getNames(); + return druidTable.getDataSource().getTableNames(); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java index c5549ddb1a11..9f56e2f58142 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java @@ -26,7 +26,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; -import java.util.List; +import java.util.Set; public abstract class DruidRel extends AbstractRelNode { @@ -110,7 +110,7 @@ public PlannerContext getPlannerContext() public abstract T asDruidConvention(); /** - * Get a list of names of datasources read by this DruidRel + * Get the set of names of table datasources read by this DruidRel */ - public abstract List getDataSourceNames(); + public abstract Set getDataSourceNames(); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java index 6248a360dfdb..e31437b69622 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidSemiJoin.java @@ -171,13 +171,13 @@ public DruidSemiJoin asDruidConvention() } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { final DruidRel druidRight = (DruidRel) this.right; - Set datasourceNames = new LinkedHashSet<>(); - datasourceNames.addAll(left.getDataSourceNames()); - datasourceNames.addAll(druidRight.getDataSourceNames()); - return new ArrayList<>(datasourceNames); + Set dataSourceNames = new LinkedHashSet<>(); + dataSourceNames.addAll(left.getDataSourceNames()); + dataSourceNames.addAll(druidRight.getDataSourceNames()); + return dataSourceNames; } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java index 41f08098b76b..2fe3d2ca0271 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class DruidUnionRel extends DruidRel @@ -166,12 +167,11 @@ public RelNode copy(final RelTraitSet traitSet, final List inputs) } @Override - public List getDataSourceNames() + public Set getDataSourceNames() { return rels.stream() .flatMap(rel -> ((DruidRel) rel).getDataSourceNames().stream()) - .distinct() - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } @Override