Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ void addSegmentToServer(DruidServer server, DataSegment segment)
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{
final String table = Iterables.getOnlyElement(dataSource.getNames());
final String table = Iterables.getOnlyElement(dataSource.getTableNames());
return timelines.get(table);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames());

if (runningItem != null) {
final Task task = runningItem.getTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,20 @@ public boolean equals(Object o)
return false;
}
BaseQuery<?> baseQuery = (BaseQuery<?>) o;

// Must use getDuration() instead of "duration" because duration is lazily computed.
return descending == baseQuery.descending &&
Objects.equals(dataSource, baseQuery.dataSource) &&
Objects.equals(context, baseQuery.context) &&
Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) &&
Objects.equals(duration, baseQuery.duration) &&
Objects.equals(getDuration(), baseQuery.getDuration()) &&
Objects.equals(granularity, baseQuery.granularity);
}

@Override
public int hashCode()
{

return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious about why this change. I know this isn't really part of your change, but I've also noticed equals and hashcode for a lot of classes that extend BaseQuery have their own implementation of equals and hashCode and do not check the base class. Sounds like we should add some equalsVerifier tests for them, maybe in another patch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because duration is lazily computed, so it might be null. Calling getDuration() forces it to be computed. I'll add a comment.

Btw, a lot of the subclasses of BaseQuery do call super.equals at some point (but they do other stuff too).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird - I see the calls to super.equals() now, maybe I just shouldn't read code in the night 😝

EqualsVerifier is still complaining about some other stuff. But cleaning that up can be another issue.

// Must use getDuration() instead of "duration" because duration is lazily computed.
return Objects.hash(dataSource, descending, context, querySegmentSpec, getDuration(), granularity);
}
}
63 changes: 54 additions & 9 deletions processing/src/main/java/org/apache/druid/query/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,62 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import java.util.List;
import java.util.Set;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type",
defaultImpl = LegacyDataSource.class)
/**
* Represents a source... of data... for a query. Analogous to the "FROM" clause in SQL.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyDataSource.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query"),
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union")
})
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query"),
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union"),
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline")
})
public interface DataSource
{
List<String> getNames();
/**
* Returns the names of all table datasources involved in this query. Does not include names for non-tables, like
* lookups or inline datasources.
*/
Set<String> getTableNames();

/**
* Returns datasources that this datasource depends on. Will be empty for leaf datasources like 'table'.
*/
List<DataSource> getChildren();

/**
* Return a new DataSource, identical to this one, with different children. The number of children must be equal
* to the number of children that this datasource already has.
*/
DataSource withChildren(List<DataSource> children);

/**
* Returns true if queries on this dataSource are cacheable at both the result level and per-segment level.
* Currently, dataSources that modify the behavior of per-segment processing are not cacheable (like 'join').
* Nor are dataSources that do not actually reference segments (like 'inline'), since cache keys are always based
* on segment identifiers.
*
* Note: Ideally, queries on 'join' datasources _would_ be cacheable, but we cannot currently do this due to lacking
* the code necessary to compute cache keys properly.
*/
boolean isCacheable();

/**
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
* for queries of those.
*/
boolean isGlobal();

/**
* Returns true if this datasource represents concrete data that can be scanned via a
* {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'.
*
* @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this
* @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this
*/
boolean isConcrete();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand All @@ -30,7 +31,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the
Expand All @@ -42,9 +45,22 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();

/** Non final to give subclasses ability to reassign it. */
/**
* Non final to give subclasses ability to reassign it.
*/
protected Thread ownerThread = Thread.currentThread();

private static String getTableNamesAsString(DataSource dataSource)
{
final Set<String> names = dataSource.getTableNames();

if (names.size() == 1) {
return Iterables.getOnlyElement(names);
} else {
return names.stream().sorted().collect(Collectors.toList()).toString();
}
}

protected void checkModifiedFromOwnerThread()
{
if (Thread.currentThread() != ownerThread) {
Expand Down Expand Up @@ -77,7 +93,7 @@ public void query(QueryType query)
@Override
public void dataSource(QueryType query)
{
setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
setDimension(DruidMetrics.DATASOURCE, getTableNamesAsString(query.getDataSource()));
}

@Override
Expand Down
Loading