diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java index b7cf087a3f4d..c2257d160ed3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; -import org.apache.druid.client.BrokerServerView; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -84,7 +84,7 @@ public class DartControllerContext implements ControllerContext private final ObjectMapper jsonMapper; private final DruidNode selfNode; private final DartWorkerClient workerClient; - private final BrokerServerView serverView; + private final TimelineServerView serverView; private final MemoryIntrospector memoryIntrospector; private final ServiceMetricEvent.Builder metricBuilder; private final ServiceEmitter emitter; @@ -95,7 +95,7 @@ public DartControllerContext( final DruidNode selfNode, final DartWorkerClient workerClient, final MemoryIntrospector memoryIntrospector, - final BrokerServerView serverView, + final TimelineServerView serverView, final ServiceEmitter emitter ) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java index 8cefb6af7ece..113714aa9b92 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java @@ -22,13 +22,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.Injector; -import org.apache.druid.client.BrokerServerView; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.msq.dart.worker.DartWorkerClient; +import org.apache.druid.msq.dart.worker.DartWorkerClientImpl; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.MemoryIntrospector; import org.apache.druid.rpc.ServiceClientFactory; @@ -36,14 +36,14 @@ public class DartControllerContextFactoryImpl implements DartControllerContextFactory { - private final Injector injector; - private final ObjectMapper jsonMapper; - private final ObjectMapper smileMapper; - private final DruidNode selfNode; - private final ServiceClientFactory serviceClientFactory; - private final BrokerServerView serverView; - private final MemoryIntrospector memoryIntrospector; - private final ServiceEmitter emitter; + protected final Injector injector; + protected final ObjectMapper jsonMapper; + protected final ObjectMapper smileMapper; + protected final DruidNode selfNode; + protected final ServiceClientFactory serviceClientFactory; + protected final TimelineServerView serverView; + protected final MemoryIntrospector memoryIntrospector; + protected final ServiceEmitter emitter; @Inject public DartControllerContextFactoryImpl( @@ -53,7 +53,7 @@ public DartControllerContextFactoryImpl( @Self final DruidNode selfNode, @EscalatedGlobal final ServiceClientFactory serviceClientFactory, final MemoryIntrospector memoryIntrospector, - final BrokerServerView serverView, + final TimelineServerView serverView, final ServiceEmitter emitter ) { @@ -74,7 +74,7 @@ public ControllerContext newContext(final String queryId) injector, jsonMapper, selfNode, - new DartWorkerClient(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()), + new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()), memoryIntrospector, serverView, emitter diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClient.java index 932300de217f..e048f9fd6d67 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClient.java @@ -19,192 +19,21 @@ package org.apache.druid.msq.dart.worker; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.util.concurrent.ListenableFuture; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import it.unimi.dsi.fastutil.Pair; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.msq.dart.controller.DartWorkerManager; -import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; -import org.apache.druid.msq.dart.worker.http.DartWorkerResource; import org.apache.druid.msq.exec.WorkerClient; -import org.apache.druid.msq.rpc.BaseWorkerClientImpl; -import org.apache.druid.rpc.FixedServiceLocator; -import org.apache.druid.rpc.IgnoreHttpResponseHandler; -import org.apache.druid.rpc.RequestBuilder; -import org.apache.druid.rpc.ServiceClient; -import org.apache.druid.rpc.ServiceClientFactory; -import org.apache.druid.rpc.ServiceLocation; -import org.apache.druid.rpc.ServiceRetryPolicy; -import org.apache.druid.utils.CloseableUtils; -import org.jboss.netty.handler.codec.http.HttpMethod; -import javax.annotation.Nullable; -import java.io.Closeable; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; - -/** - * Dart implementation of {@link WorkerClient}. Uses the same {@link BaseWorkerClientImpl} as the task-based engine. - * Each instance of this class is scoped to a single query. - */ -public class DartWorkerClient extends BaseWorkerClientImpl +public interface DartWorkerClient extends WorkerClient { - private static final Logger log = new Logger(DartWorkerClient.class); - - private final String queryId; - private final ServiceClientFactory clientFactory; - private final ServiceRetryPolicy retryPolicy; - - @Nullable - private final String controllerHost; - - @GuardedBy("clientMap") - private final Map> clientMap = new HashMap<>(); - - /** - * Create a worker client. - * - * @param queryId dart query ID. see {@link DartSqlEngine#CTX_DART_QUERY_ID} - * @param clientFactory service client factor - * @param smileMapper Smile object mapper - * @param controllerHost Controller host (see {@link DartWorkerResource#HEADER_CONTROLLER_HOST}) if this is a - * controller-to-worker client. Null if this is a worker-to-worker client. - */ - public DartWorkerClient( - final String queryId, - final ServiceClientFactory clientFactory, - final ObjectMapper smileMapper, - @Nullable final String controllerHost - ) - { - super(smileMapper, SmileMediaTypes.APPLICATION_JACKSON_SMILE); - this.queryId = queryId; - this.clientFactory = clientFactory; - this.controllerHost = controllerHost; - - if (controllerHost == null) { - // worker -> worker client. Retry HTTP 503 in case worker A starts up before worker B, and needs to - // contact it immediately. - this.retryPolicy = new DartWorkerRetryPolicy(true); - } else { - // controller -> worker client. Do not retry any HTTP error codes. If we retry HTTP 503 for controller -> worker, - // we can get stuck trying to contact workers that have exited. - this.retryPolicy = new DartWorkerRetryPolicy(false); - } - } - - @Override - protected ServiceClient getClient(final String workerIdString) - { - final WorkerId workerId = WorkerId.fromString(workerIdString); - if (!queryId.equals(workerId.getQueryId())) { - throw DruidException.defensive("Unexpected queryId[%s]. Expected queryId[%s]", workerId.getQueryId(), queryId); - } - - synchronized (clientMap) { - return clientMap.computeIfAbsent(workerId.getHostAndPort(), ignored -> makeNewClient(workerId)).left(); - } - } - /** * Close a single worker's clients. Used when that worker fails, so we stop trying to contact it. * * @param workerHost worker host:port */ - public void closeClient(final String workerHost) - { - synchronized (clientMap) { - final Pair clientPair = clientMap.remove(workerHost); - if (clientPair != null) { - CloseableUtils.closeAndWrapExceptions(clientPair.right()); - } - } - } - - /** - * Close all outstanding clients. - */ - @Override - public void close() - { - synchronized (clientMap) { - for (Map.Entry> entry : clientMap.entrySet()) { - CloseableUtils.closeAndSuppressExceptions( - entry.getValue().right(), - e -> log.warn(e, "Failed to close client[%s]", entry.getKey()) - ); - } - - clientMap.clear(); - } - } + void closeClient(String hostAndPort); /** * Stops a worker. Dart-only API, used by the {@link DartWorkerManager}. */ - public ListenableFuture stopWorker(String workerId) - { - return getClient(workerId).asyncRequest( - new RequestBuilder(HttpMethod.POST, "/stop"), - IgnoreHttpResponseHandler.INSTANCE - ); - } - - /** - * Create a new client. Called by {@link #getClient(String)} if a new one is needed. - */ - private Pair makeNewClient(final WorkerId workerId) - { - final URI uri = workerId.toUri(); - final FixedServiceLocator locator = new FixedServiceLocator(ServiceLocation.fromUri(uri)); - final ServiceClient baseClient = - clientFactory.makeClient(workerId.toString(), locator, retryPolicy); - final ServiceClient client; - - if (controllerHost != null) { - client = new ControllerDecoratedClient(baseClient, controllerHost); - } else { - client = baseClient; - } - - return Pair.of(client, locator); - } - - /** - * Service client that adds the {@link DartWorkerResource#HEADER_CONTROLLER_HOST} header. - */ - private static class ControllerDecoratedClient implements ServiceClient - { - private final ServiceClient delegate; - private final String controllerHost; - - ControllerDecoratedClient(final ServiceClient delegate, final String controllerHost) - { - this.delegate = delegate; - this.controllerHost = controllerHost; - } - - @Override - public ListenableFuture asyncRequest( - final RequestBuilder requestBuilder, - final HttpResponseHandler handler - ) - { - return delegate.asyncRequest( - requestBuilder.header(DartWorkerResource.HEADER_CONTROLLER_HOST, controllerHost), - handler - ); - } - - @Override - public ServiceClient withRetryPolicy(final ServiceRetryPolicy retryPolicy) - { - return new ControllerDecoratedClient(delegate.withRetryPolicy(retryPolicy), controllerHost); - } - } + ListenableFuture stopWorker(String workerId); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java new file mode 100644 index 000000000000..adfebee111c9 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.worker; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import it.unimi.dsi.fastutil.Pair; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; +import org.apache.druid.msq.dart.worker.http.DartWorkerResource; +import org.apache.druid.msq.exec.WorkerClient; +import org.apache.druid.msq.rpc.BaseWorkerClientImpl; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.IgnoreHttpResponseHandler; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.ServiceRetryPolicy; +import org.apache.druid.utils.CloseableUtils; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +/** + * Dart implementation of {@link WorkerClient}. Uses the same {@link BaseWorkerClientImpl} as the task-based engine. + * Each instance of this class is scoped to a single query. + */ +public class DartWorkerClientImpl extends BaseWorkerClientImpl implements DartWorkerClient +{ + private static final Logger log = new Logger(DartWorkerClientImpl.class); + + private final String queryId; + private final ServiceClientFactory clientFactory; + private final ServiceRetryPolicy retryPolicy; + + @Nullable + private final String controllerHost; + + @GuardedBy("clientMap") + private final Map> clientMap = new HashMap<>(); + + /** + * Create a worker client. + * + * @param queryId dart query ID. see {@link DartSqlEngine#CTX_DART_QUERY_ID} + * @param clientFactory service client factor + * @param smileMapper Smile object mapper + * @param controllerHost Controller host (see {@link DartWorkerResource#HEADER_CONTROLLER_HOST}) if this is a + * controller-to-worker client. Null if this is a worker-to-worker client. + */ + public DartWorkerClientImpl( + final String queryId, + final ServiceClientFactory clientFactory, + final ObjectMapper smileMapper, + @Nullable final String controllerHost + ) + { + super(smileMapper, SmileMediaTypes.APPLICATION_JACKSON_SMILE); + this.queryId = queryId; + this.clientFactory = clientFactory; + this.controllerHost = controllerHost; + + if (controllerHost == null) { + // worker -> worker client. Retry HTTP 503 in case worker A starts up before worker B, and needs to + // contact it immediately. + this.retryPolicy = new DartWorkerRetryPolicy(true); + } else { + // controller -> worker client. Do not retry any HTTP error codes. If we retry HTTP 503 for controller -> worker, + // we can get stuck trying to contact workers that have exited. + this.retryPolicy = new DartWorkerRetryPolicy(false); + } + } + + @Override + protected ServiceClient getClient(final String workerIdString) + { + final WorkerId workerId = WorkerId.fromString(workerIdString); + if (!queryId.equals(workerId.getQueryId())) { + throw DruidException.defensive("Unexpected queryId[%s]. Expected queryId[%s]", workerId.getQueryId(), queryId); + } + + synchronized (clientMap) { + return clientMap.computeIfAbsent(workerId.getHostAndPort(), ignored -> makeNewClient(workerId)).left(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void closeClient(final String workerHost) + { + synchronized (clientMap) { + final Pair clientPair = clientMap.remove(workerHost); + if (clientPair != null) { + CloseableUtils.closeAndWrapExceptions(clientPair.right()); + } + } + } + + /** + * Close all outstanding clients. + */ + @Override + public void close() + { + synchronized (clientMap) { + for (Map.Entry> entry : clientMap.entrySet()) { + CloseableUtils.closeAndSuppressExceptions( + entry.getValue().right(), + e -> log.warn(e, "Failed to close client[%s]", entry.getKey()) + ); + } + + clientMap.clear(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public ListenableFuture stopWorker(String workerId) + { + return getClient(workerId).asyncRequest( + new RequestBuilder(HttpMethod.POST, "/stop"), + IgnoreHttpResponseHandler.INSTANCE + ); + } + + /** + * Create a new client. Called by {@link #getClient(String)} if a new one is needed. + */ + protected Pair makeNewClient(final WorkerId workerId) + { + final URI uri = workerId.toUri(); + final FixedServiceLocator locator = new FixedServiceLocator(ServiceLocation.fromUri(uri)); + final ServiceClient baseClient = + clientFactory.makeClient(workerId.toString(), locator, retryPolicy); + final ServiceClient client; + + if (controllerHost != null) { + client = new ControllerDecoratedClient(baseClient, controllerHost); + } else { + client = baseClient; + } + + return Pair.of(client, locator); + } + + /** + * Service client that adds the {@link DartWorkerResource#HEADER_CONTROLLER_HOST} header. + */ + private static class ControllerDecoratedClient implements ServiceClient + { + private final ServiceClient delegate; + private final String controllerHost; + + ControllerDecoratedClient(final ServiceClient delegate, final String controllerHost) + { + this.delegate = delegate; + this.controllerHost = controllerHost; + } + + @Override + public ListenableFuture asyncRequest( + final RequestBuilder requestBuilder, + final HttpResponseHandler handler + ) + { + return delegate.asyncRequest( + requestBuilder.header(DartWorkerResource.HEADER_CONTROLLER_HOST, controllerHost), + handler + ); + } + + @Override + public ServiceClient withRetryPolicy(final ServiceRetryPolicy retryPolicy) + { + return new ControllerDecoratedClient(delegate.withRetryPolicy(retryPolicy), controllerHost); + } + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java index 34da929060d8..1960924b4b67 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java @@ -101,7 +101,7 @@ public Worker build(String queryId, String controllerHost, File tempDir, QueryCo selfNode, jsonMapper, injector, - new DartWorkerClient(queryId, serviceClientFactory, smileMapper, null), + new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, null), processingConfig, segmentWrangler, groupingEngine, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRetryPolicy.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRetryPolicy.java index 5dbfe98ef0c5..9e4d2f668039 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRetryPolicy.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRetryPolicy.java @@ -25,7 +25,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; /** - * Retry policy for {@link DartWorkerClient}. This is a {@link StandardRetryPolicy#unlimited()} with + * Retry policy for {@link DartWorkerClientImpl}. This is a {@link StandardRetryPolicy#unlimited()} with * {@link #retryHttpResponse(HttpResponse)} customized to retry fewer HTTP error codes. */ public class DartWorkerRetryPolicy implements ServiceRetryPolicy diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b370ab586433..cc73ea42c1a4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -350,6 +350,15 @@ private MSQTaskReportPayload runInternal(final QueryListener queryListener, fina context.registerController(this, closer); queryDef = initializeQueryDefAndState(closer); + this.netClient = closer.register(new ExceptionWrappingWorkerClient(context.newWorkerClient())); + this.workerSketchFetcher = new WorkerSketchFetcher( + netClient, + workerManager, + queryKernelConfig.isFaultTolerant(), + MultiStageQueryContext.getSketchEncoding(querySpec.getContext()) + ); + closer.register(workerSketchFetcher::close); + // Execution-related: run the multi-stage QueryDefinition. final InputSpecSlicerFactory inputSpecSlicerFactory = makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager)); @@ -589,7 +598,6 @@ public void addToKernelManipulationQueue(Consumer kernelC private QueryDefinition initializeQueryDefAndState(final Closer closer) { this.selfDruidNode = context.selfNode(); - this.netClient = closer.register(new ExceptionWrappingWorkerClient(context.newWorkerClient())); this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec); final QueryContext queryContext = querySpec.getContext(); @@ -653,13 +661,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) ) ) ); - this.workerSketchFetcher = new WorkerSketchFetcher( - netClient, - workerManager, - queryKernelConfig.isFaultTolerant(), - MultiStageQueryContext.getSketchEncoding(queryContext) - ); - closer.register(workerSketchFetcher::close); + return queryDef; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java index f4441c984e70..ea781e2e9920 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java @@ -28,7 +28,7 @@ import org.apache.druid.error.DruidException; import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.msq.dart.worker.DartWorkerClient; +import org.apache.druid.msq.dart.worker.DartWorkerClientImpl; import org.apache.druid.msq.dart.worker.WorkerId; import org.apache.druid.msq.exec.WorkerManager; import org.apache.druid.msq.exec.WorkerStats; @@ -56,7 +56,7 @@ public class DartWorkerManagerTest private AutoCloseable mockCloser; @Mock - private DartWorkerClient workerClient; + private DartWorkerClientImpl workerClient; @BeforeEach public void setUp() diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java new file mode 100644 index 000000000000..f37162eb7aa7 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.QueryTestBuilder; +import org.apache.druid.sql.calcite.SqlTestFrameworkConfig; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +@SqlTestFrameworkConfig.ComponentSupplier(DartComponentSupplier.class) +public class CalciteDartTest extends BaseCalciteQueryTest +{ + @Override + protected QueryTestBuilder testBuilder() + { + return new QueryTestBuilder(new CalciteTestConfig(true)) + .queryContext( + ImmutableMap.builder() + .put(DartSqlEngine.CTX_DART_QUERY_ID, UUID.randomUUID().toString()) + .build() + ) + .skipVectorize(true) + .verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate()); + } + + @Test + public void testSelect1() + { + testBuilder() + .sql("SELECT 1") + .expectedResults(ImmutableList.of(new Object[] {1})) + .run(); + } + + @Test + public void testSelectFromFoo() + { + testBuilder() + .sql("SELECT 2 from foo order by dim1") + .expectedResults( + ImmutableList.of( + new Object[] {2}, + new Object[] {2}, + new Object[] {2}, + new Object[] {2}, + new Object[] {2}, + new Object[] {2} + ) + ) + .run(); + } + + @Test + @Disabled("this case currently stalls") + public void testSelectFromFooLimit2() + { + testBuilder() + .sql("SELECT 2 from foo limit 2") + .expectedResults( + ImmutableList.of( + new Object[] {2}, + new Object[] {2} + ) + ) + .run(); + } + + + @Test + public void testSelectDim1FromFoo11() + { + testBuilder() + .sql("SELECT dim1 from foo") + .expectedResults( + ImmutableList.of( + new Object[] {""}, + new Object[] {"10.1"}, + new Object[] {"2"}, + new Object[] {"1"}, + new Object[] {"def"}, + new Object[] {"abc"} + ) + ) + .run(); + } + + @Test + public void testGby() + { + testBuilder() + .sql("SELECT 3 from foo group by dim2") + .expectedResults( + ImmutableList.of( + new Object[] {3}, + new Object[] {3}, + new Object[] {3}, + new Object[] {3} + ) + ) + .run(); + } + + @Test + public void testComplexFromFoo() + { + String sql = "SELECT dim1, COUNT(*) FROM druid.foo " + + "WHERE dim1 NOT IN ('ghi', 'abc', 'def') AND dim1 IS NOT NULL " + + "GROUP BY dim1"; + testBuilder() + .sql(sql) + .expectedResults( + ImmutableList.of( + new Object[] {"", 1L}, + new Object[] {"1", 1L}, + new Object[] {"10.1", 1L}, + new Object[] {"2", 1L} + ) + ) + .run(); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java new file mode 100644 index 000000000000..7cfac91a4ce5 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Provides; +import org.apache.druid.collections.NonBlockingPool; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Merging; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.msq.dart.Dart; +import org.apache.druid.msq.dart.controller.DartControllerContextFactory; +import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; +import org.apache.druid.msq.dart.guice.DartControllerModule; +import org.apache.druid.msq.dart.guice.DartModules; +import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule; +import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.apache.druid.msq.exec.Worker; +import org.apache.druid.query.TestBufferPool; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.guice.ServiceClientModule; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.sql.avatica.DartDruidMeta; +import org.apache.druid.sql.avatica.DruidMeta; +import org.apache.druid.sql.calcite.TempDirProducer; +import org.apache.druid.sql.calcite.run.SqlEngine; +import org.apache.druid.sql.calcite.util.DruidModuleCollection; +import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class DartComponentSupplier extends AbstractMSQComponentSupplierDelegate +{ + public DartComponentSupplier(TempDirProducer tempFolderProducer) + { + super(new StandardComponentSupplier(tempFolderProducer)); + } + + @Override + public void gatherProperties(Properties properties) + { + super.gatherProperties(properties); + properties.put(DartModules.DART_ENABLED_PROPERTY, "true"); + } + + @Override + public DruidModule getCoreModule() + { + return DruidModuleCollection.of( + super.getCoreModule(), + new DartControllerModule(), + new DartWorkerModule(), + new DartWorkerMemoryManagementModule(), + new DartTestCoreModule() + ); + } + + @Override + public DruidModule getOverrideModule() + { + return DruidModuleCollection.of( + super.getOverrideModule(), + new DartTestOverrideModule() + ); + } + + @Override + public SqlEngine createEngine( + QueryLifecycleFactory qlf, + ObjectMapper queryJsonMapper, + Injector injector) + { + return injector.getInstance(DartSqlEngine.class); + } + + static class DartTestCoreModule implements DruidModule + { + @Provides + @EscalatedGlobal + final ServiceClientFactory getServiceClientFactory(HttpClient ht) + { + return ServiceClientModule.makeServiceClientFactory(ht); + + } + + @Provides + final DruidNodeDiscoveryProvider getDiscoveryProvider() + { + return null; + } + + @Override + public void configure(Binder binder) + { + } + } + + static class DartTestOverrideModule implements DruidModule + { + + @Provides + @LazySingleton + public DruidMeta createMeta(DartDruidMeta druidMeta) + { + return druidMeta; + } + + @Override + public void configure(Binder binder) + { + binder.bind(DartControllerContextFactory.class) + .to(TestDartControllerContextFactoryImpl.class) + .in(LazySingleton.class); + } + + @Provides + @Merging + NonBlockingPool makeMergingBuffer(TestBufferPool bufferPool) + { + return bufferPool; + } + + @Provides + @LazySingleton + @Dart + Map workerMap() + { + return new HashMap(); + } + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 6e632a4869ca..e1755757d04f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.dart.controller.DartControllerContextFactory; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.ControllerMemoryParameters; @@ -84,7 +85,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -public class MSQTestControllerContext implements ControllerContext +public class MSQTestControllerContext implements ControllerContext, DartControllerContextFactory { private static final Logger log = new Logger(MSQTestControllerContext.class); private static final int NUM_WORKERS = 4; @@ -387,4 +388,10 @@ public WorkerClient newWorkerClient() { return new MSQTestWorkerClient(inMemoryWorkers); } + + @Override + public ControllerContext newContext(String queryId) + { + return this; + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java index 4c7ccd72efd0..9f482912f315 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel; import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.java.util.common.ISE; @@ -32,14 +33,13 @@ import org.apache.druid.msq.rpc.SketchEncoding; import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; -import java.io.InputStream; import java.util.Arrays; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; public class MSQTestWorkerClient implements WorkerClient { - private final Map inMemoryWorkers; + protected final Map inMemoryWorkers; private final AtomicBoolean closed = new AtomicBoolean(); public MSQTestWorkerClient(Map inMemoryWorkers) @@ -50,10 +50,20 @@ public MSQTestWorkerClient(Map inMemoryWorkers) @Override public ListenableFuture postWorkOrder(String workerTaskId, WorkOrder workOrder) { - inMemoryWorkers.get(workerTaskId).postWorkOrder(workOrder); + getWorkerFor(workerTaskId).postWorkOrder(workOrder); return Futures.immediateFuture(null); } + protected Worker getWorkerFor(String workerTaskId) + { + return inMemoryWorkers.computeIfAbsent(workerTaskId, this::newWorker); + } + + protected Worker newWorker(String workerId) + { + throw new RuntimeException("Not implemented!"); + } + @Override public ListenableFuture fetchClusterByStatisticsSnapshot( String workerTaskId, @@ -61,7 +71,7 @@ public ListenableFuture fetchClusterByStatisticsSna SketchEncoding sketchEncoding ) { - return Futures.immediateFuture(inMemoryWorkers.get(workerTaskId).fetchStatisticsSnapshot(stageId)); + return Futures.immediateFuture(getWorkerFor(workerTaskId).fetchStatisticsSnapshot(stageId)); } @Override @@ -73,7 +83,7 @@ public ListenableFuture fetchClusterByStatisticsSna ) { return Futures.immediateFuture( - inMemoryWorkers.get(workerTaskId).fetchStatisticsSnapshotForTimeChunk(stageId, timeChunk) + getWorkerFor(workerTaskId).fetchStatisticsSnapshotForTimeChunk(stageId, timeChunk) ); } @@ -85,7 +95,7 @@ public ListenableFuture postResultPartitionBoundaries( ) { try { - inMemoryWorkers.get(workerTaskId).postResultPartitionBoundaries(stageId, partitionBoundaries); + getWorkerFor(workerTaskId).postResultPartitionBoundaries(stageId, partitionBoundaries); return Futures.immediateFuture(null); } catch (Exception e) { @@ -96,21 +106,21 @@ public ListenableFuture postResultPartitionBoundaries( @Override public ListenableFuture postCleanupStage(String workerTaskId, StageId stageId) { - inMemoryWorkers.get(workerTaskId).postCleanupStage(stageId); + getWorkerFor(workerTaskId).postCleanupStage(stageId); return Futures.immediateFuture(null); } @Override public ListenableFuture postFinish(String taskId) { - inMemoryWorkers.get(taskId).postFinish(); + getWorkerFor(taskId).postFinish(); return Futures.immediateFuture(null); } @Override public ListenableFuture getCounters(String taskId) { - return Futures.immediateFuture(inMemoryWorkers.get(taskId).getCounters()); + return Futures.immediateFuture(getWorkerFor(taskId).getCounters()); } @Override @@ -122,22 +132,25 @@ public ListenableFuture fetchChannelData( final ReadableByteChunksFrameChannel channel ) { - try (InputStream inputStream = - inMemoryWorkers.get(workerTaskId).readStageOutput(stageId, partitionNumber, offset).get()) { - byte[] buffer = new byte[8 * 1024]; - boolean didRead = false; - int bytesRead; - while ((bytesRead = inputStream.read(buffer)) != -1) { - channel.addChunk(Arrays.copyOf(buffer, bytesRead)); - didRead = true; - } - inputStream.close(); - - return Futures.immediateFuture(!didRead); - } - catch (Exception e) { - throw new ISE(e, "Error reading frame file channel"); - } + return FutureUtils.transform( + getWorkerFor(workerTaskId).readStageOutput(stageId, partitionNumber, offset), + inputStream -> { + try { + byte[] buffer = new byte[8 * 1024]; + boolean didRead = false; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + channel.addChunk(Arrays.copyOf(buffer, bytesRead)); + didRead = true; + } + inputStream.close(); + return !didRead; + } + catch (Exception e) { + throw new ISE(e, "Error reading frame file channel"); + } + } + ); } @Override @@ -145,6 +158,7 @@ public void close() { if (closed.compareAndSet(false, true)) { inMemoryWorkers.forEach((k, v) -> v.stop()); + inMemoryWorkers.clear(); } } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java new file mode 100644 index 000000000000..89209f012529 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.druid.client.TimelineServerView; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.msq.dart.Dart; +import org.apache.druid.msq.dart.controller.DartControllerContext; +import org.apache.druid.msq.dart.controller.DartControllerContextFactoryImpl; +import org.apache.druid.msq.dart.worker.DartWorkerClient; +import org.apache.druid.msq.exec.Controller; +import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.msq.exec.MemoryIntrospector; +import org.apache.druid.msq.exec.Worker; +import org.apache.druid.msq.exec.WorkerImpl; +import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.kernel.StageId; +import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.server.DruidNode; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class TestDartControllerContextFactoryImpl extends DartControllerContextFactoryImpl +{ + private Map workerMap; + public Controller controller; + + @Inject + public TestDartControllerContextFactoryImpl( + final Injector injector, + @Json final ObjectMapper jsonMapper, + @Smile final ObjectMapper smileMapper, + @Self final DruidNode selfNode, + @EscalatedGlobal final ServiceClientFactory serviceClientFactory, + final MemoryIntrospector memoryIntrospector, + final TimelineServerView serverView, + final ServiceEmitter emitter, + @Dart Map workerMap) + { + super(injector, jsonMapper, smileMapper, selfNode, serviceClientFactory, memoryIntrospector, serverView, emitter); + this.workerMap = workerMap; + } + + @Override + public ControllerContext newContext(String queryId) + { + return new DartControllerContext( + injector, + jsonMapper, + selfNode, + new DartTestWorkerClient(), + memoryIntrospector, + serverView, + emitter + ) + { + @Override + public void registerController(Controller currentController, Closer closer) + { + super.registerController(currentController, closer); + controller = currentController; + } + }; + } + + public class DartTestWorkerClient extends MSQTestWorkerClient implements DartWorkerClient + { + private final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); + + public DartTestWorkerClient() + { + super(workerMap); + } + + @Override + protected Worker newWorker(String workerId) + { + String queryId = workerId; + Worker worker = new WorkerImpl( + null, + new MSQTestWorkerContext( + queryId, + inMemoryWorkers, + controller, + jsonMapper, + injector, + MSQTestBase.makeTestWorkerMemoryParameters(), + WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE) + ) + ); + + EXECUTOR.submit(() -> { + try { + worker.run(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }); + + return worker; + } + + @Override + public ListenableFuture postWorkOrder(String workerTaskId, WorkOrder workOrder) + { + return super.postWorkOrder(workerTaskId, workOrder); + } + + @Override + public ListenableFuture postCleanupStage(String workerTaskId, StageId stageId) + { + return super.postCleanupStage(workerTaskId, stageId); + + } + + @Override + public void closeClient(String hostAndPort) + { + } + + @Override + public ListenableFuture stopWorker(String workerId) + { + return null; + } + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/sql/avatica/DartDruidMeta.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/sql/avatica/DartDruidMeta.java new file mode 100644 index 000000000000..48d2f831c2d8 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/sql/avatica/DartDruidMeta.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.avatica; + +import com.google.inject.Inject; +import org.apache.druid.msq.dart.Dart; +import org.apache.druid.server.security.AuthenticatorMapper; +import org.apache.druid.sql.SqlStatementFactory; + +public class DartDruidMeta extends DruidMeta +{ + @Inject + public DartDruidMeta( + final @Dart SqlStatementFactory sqlStatementFactory, + final AvaticaServerConfig config, + final ErrorHandler errorHandler, + final AuthenticatorMapper authMapper) + { + super(sqlStatementFactory, config, errorHandler, authMapper); + } +} diff --git a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/dart.iq b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/dart.iq new file mode 100644 index 000000000000..d232a52ac20f --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.quidem.MSQQuidemTest/dart.iq @@ -0,0 +1,46 @@ +!set dartQueryId 00000000-0000-0000-0000-000000000000 +!use druidtest://?componentSupplier=DartComponentSupplier +!set outputformat mysql + +select dim1 from foo ; ++------+ +| dim1 | ++------+ +| | +| 1 | +| 10.1 | +| 2 | +| abc | +| def | ++------+ +(6 rows) + +!ok + +select dim1 from foo ; ++------+ +| dim1 | ++------+ +| | +| 1 | +| 10.1 | +| 2 | +| abc | +| def | ++------+ +(6 rows) + +!ok + +select dim2,count(dim3) from foo group by dim2; ++------+--------+ +| dim2 | EXPR$1 | ++------+--------+ +| | 1 | +| a | 2 | +| abc | 0 | +| | 1 | ++------+--------+ +(4 rows) + +!ok diff --git a/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java b/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java index ce575b52356f..21e0cc7bd71a 100644 --- a/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java +++ b/sql/src/test/java/org/apache/druid/quidem/DruidAvaticaTestDriver.java @@ -24,7 +24,6 @@ import com.google.inject.Injector; import com.google.inject.Provides; import com.google.inject.name.Named; -import com.google.inject.name.Names; import org.apache.calcite.avatica.server.AbstractAvaticaHandler; import org.apache.druid.guice.LazySingleton; import org.apache.druid.initialization.DruidModule; @@ -199,19 +198,6 @@ public AvaticaBasedTestConnectionSupplier(QueryComponentSupplier delegate) this.connectionModule = new AvaticaBasedConnectionModule(); } - @Override - public DruidModule getCoreModule() - { - return DruidModuleCollection.of( - super.getCoreModule(), - binder -> { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); - } - ); - } - @Override public DruidModule getOverrideModule() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java index f53b2f37b1f5..c35003e0e767 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java @@ -163,9 +163,7 @@ public QueryTestBuilder expectedQueries(List> expectedQueries) return this; } - public QueryTestBuilder expectedResults( - final List expectedResults - ) + public QueryTestBuilder expectedResults(final List expectedResults) { return expectedResults(ResultMatchMode.EQUALS, expectedResults); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index abf32c9bf321..cb3ceaf442d7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -32,6 +32,7 @@ import org.apache.druid.client.FilteredServerInventoryView; import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.ServerView; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscovery; @@ -397,7 +398,7 @@ NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDI public static SystemSchema createMockSystemSchema( final DruidSchema druidSchema, - final SpecificSegmentsQuerySegmentWalker walker, + final TimelineServerView timelineServerView, final AuthorizerMapper authorizerMapper ) { @@ -474,7 +475,7 @@ private TaskStatusPlus createTaskStatus(String id, String datasource, Long durat new BrokerSegmentWatcherConfig(), BrokerSegmentMetadataCacheConfig.create() ), - new TestTimelineServerView(walker.getSegments()), + timelineServerView, new FakeServerInventoryView(), authorizerMapper, druidLeaderClient, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java index f28461e1711c..f61f52343987 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/QueryFrameworkUtils.java @@ -26,6 +26,7 @@ import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.schema.SchemaPlus; import org.apache.druid.client.InternalQueryConfig; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; @@ -125,7 +126,32 @@ public static DruidSchemaCatalog createMockRootSchema( @Nullable final ViewManager viewManager, final DruidSchemaManager druidSchemaManager, final AuthorizerMapper authorizerMapper, - final CatalogResolver catalogResolver + final CatalogResolver catalogResolver) + { + TimelineServerView timelineServerView = new TestTimelineServerView(walker.getSegments()); + return createMockRootSchema( + injector, + conglomerate, + walker, + plannerConfig, + viewManager, + druidSchemaManager, + authorizerMapper, + catalogResolver, + timelineServerView + ); + } + + public static DruidSchemaCatalog createMockRootSchema( + final Injector injector, + final QueryRunnerFactoryConglomerate conglomerate, + final SpecificSegmentsQuerySegmentWalker walker, + final PlannerConfig plannerConfig, + @Nullable final ViewManager viewManager, + final DruidSchemaManager druidSchemaManager, + final AuthorizerMapper authorizerMapper, + final CatalogResolver catalogResolver, + final TimelineServerView timelineServerView ) { DruidSchema druidSchema = createMockSchema( @@ -133,10 +159,11 @@ public static DruidSchemaCatalog createMockRootSchema( conglomerate, walker, druidSchemaManager, - catalogResolver + catalogResolver, + timelineServerView ); SystemSchema systemSchema = - CalciteTests.createMockSystemSchema(druidSchema, walker, authorizerMapper); + CalciteTests.createMockSystemSchema(druidSchema, timelineServerView, authorizerMapper); LookupSchema lookupSchema = createMockLookupSchema(injector); DruidOperatorTable createOperatorTable = createOperatorTable(injector); @@ -221,12 +248,13 @@ static DruidSchema createMockSchema( final QueryRunnerFactoryConglomerate conglomerate, final SpecificSegmentsQuerySegmentWalker walker, final DruidSchemaManager druidSchemaManager, - final CatalogResolver catalog + final CatalogResolver catalog, + final TimelineServerView timelineServerView ) { final BrokerSegmentMetadataCache cache = new BrokerSegmentMetadataCache( createMockQueryLifecycleFactory(walker, conglomerate), - new TestTimelineServerView(walker.getSegments()), + timelineServerView, BrokerSegmentMetadataCacheConfig.create(), CalciteTests.TEST_AUTHENTICATOR_ESCALATOR, new InternalQueryConfig(), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index 90cc53d1b9a0..d88af75df644 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -29,8 +29,11 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.TypeLiteral; +import org.apache.druid.client.TestHttpClient; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.guice.DruidInjectorBuilder; @@ -44,12 +47,14 @@ import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.annotations.Merging; +import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.CoreInjectorBuilder; import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.ServiceInjectorBuilder; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DruidProcessingConfig; @@ -78,6 +83,7 @@ import org.apache.druid.segment.realtime.ChatHandlerProvider; import org.apache.druid.segment.realtime.NoopChatHandlerProvider; import org.apache.druid.server.ClientQuerySegmentWalker; +import org.apache.druid.server.DruidNode; import org.apache.druid.server.LocalQuerySegmentWalker; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; @@ -672,7 +678,8 @@ public PlannerFixture( viewManager, componentSupplier.createSchemaManager(), framework.authorizerMapper, - framework.builder.catalogResolver + framework.builder.catalogResolver, + framework.injector.getInstance(TimelineServerView.class) ); this.plannerFactory = new PlannerFactory( @@ -771,10 +778,22 @@ public void configure(Binder binder) .annotatedWith(Global.class) .to(TestBufferPool.class); + binder.bind(new TypeLiteral>(){}) + .annotatedWith(Merging.class) + .to(TestBufferPool.class); + TestRequestLogger testRequestLogger = new TestRequestLogger(); binder.bind(RequestLogger.class).toInstance(testRequestLogger); } + @Provides + @Self + @LazySingleton + public DruidNode makeSelfDruidNode() + { + return new DruidNode("druid/broker", "local-test-host", false, 12345, 443, true, false); + } + @Provides AuthorizerMapper getAuthorizerMapper() { @@ -923,26 +942,36 @@ private DruidSchema makeDruidSchema( final Injector injector, QueryRunnerFactoryConglomerate conglomerate, QuerySegmentWalker walker, - Builder builder) + Builder builder, + TimelineServerView timelineServerView) { return QueryFrameworkUtils.createMockSchema( injector, conglomerate, (SpecificSegmentsQuerySegmentWalker) walker, builder.componentSupplier.getPlannerComponentSupplier().createSchemaManager(), - builder.catalogResolver + builder.catalogResolver, + timelineServerView ); } @Provides @LazySingleton - private SystemSchema makeSystemSchema(QuerySegmentWalker walker, AuthorizerMapper authorizerMapper, - DruidSchema druidSchema) + private SystemSchema makeSystemSchema(AuthorizerMapper authorizerMapper, DruidSchema druidSchema, + TimelineServerView timelineServerView) + { + return CalciteTests.createMockSystemSchema(druidSchema, timelineServerView, authorizerMapper); + } + + + @Provides + @LazySingleton + private TimelineServerView makeTimelineServerView(SpecificSegmentsQuerySegmentWalker walker) { - return CalciteTests - .createMockSystemSchema(druidSchema, (SpecificSegmentsQuerySegmentWalker) walker, authorizerMapper); + return new TestTimelineServerView(walker.getSegments()); } + @Provides @LazySingleton private ColumnConfig getColumnConfig() @@ -1039,6 +1068,13 @@ public TestSegmentsBroker makeTimelines() return new TestSegmentsBroker(); } + @Provides + @LazySingleton + private HttpClient makeHttpClient(ObjectMapper objectMapper) + { + return new TestHttpClient(objectMapper); + } + @Provides @Named("empty") @LazySingleton diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java index 42aa4e491cb4..88df3b80a4e5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java @@ -25,7 +25,10 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; +import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.client.selector.TierSelectorStrategy; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.TableDataSource; @@ -33,9 +36,12 @@ import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; - +import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.SingleElementPartitionChunk; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.concurrent.Executor; @@ -47,7 +53,7 @@ public class TestTimelineServerView implements TimelineServerView { private static final DruidServerMetadata DUMMY_SERVER = new DruidServerMetadata( "dummy", - "dummy", + "dummy:15723", null, 0, ServerType.HISTORICAL, @@ -56,7 +62,7 @@ public class TestTimelineServerView implements TimelineServerView ); private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata( "dummy2", - "dummy2", + "dummy2:15723", null, 0, ServerType.REALTIME, @@ -65,7 +71,7 @@ public class TestTimelineServerView implements TimelineServerView ); private static final DruidServerMetadata DUMMY_BROKER = new DruidServerMetadata( "dummy3", - "dummy3", + "dummy3:15723", null, 0, ServerType.BROKER, @@ -81,7 +87,7 @@ public class TestTimelineServerView implements TimelineServerView public TestTimelineServerView(List segments) { - this.segments.addAll(segments); + this(segments, Collections.emptyList()); } public TestTimelineServerView(List segments, List realtimeSegments) @@ -93,7 +99,22 @@ public TestTimelineServerView(List segments, List real @Override public Optional> getTimeline(TableDataSource table) { - throw new UnsupportedOperationException(); + for (DataSegment segment : segments) { + if (!segment.getDataSource().equals(table.getName())) { + continue; + } + + VersionedIntervalTimeline timelineLookup = new VersionedIntervalTimeline( + Comparator.naturalOrder() + ); + TierSelectorStrategy st = new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()); + ServerSelector sss = new ServerSelector(segment, st); + + PartitionChunk partitionChunk = new SingleElementPartitionChunk(sss); + timelineLookup.add(segment.getInterval(), segment.getVersion(), partitionChunk); + return Optional.of(timelineLookup); + } + return Optional.empty(); } @Override