Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
747034b
bootstrap
kgyrtkirk Feb 28, 2025
abe6916
use interface
kgyrtkirk Feb 28, 2025
71e14d9
some-leftover
kgyrtkirk Mar 3, 2025
064e362
fixes
kgyrtkirk Mar 3, 2025
5aa1836
a
kgyrtkirk Mar 4, 2025
c899e7a
move stuff
kgyrtkirk Mar 4, 2025
0543df6
my-wood
kgyrtkirk Mar 4, 2025
99ccba0
rename/etc
kgyrtkirk Mar 4, 2025
5de20dc
s
kgyrtkirk Mar 4, 2025
c0ed1b4
use TimelineServerView instead BrokerServerView
kgyrtkirk Mar 4, 2025
fe641fd
make TestTimelineServerView
kgyrtkirk Mar 5, 2025
d60fab3
stuff
kgyrtkirk Mar 5, 2025
9b85514
aa
kgyrtkirk Mar 5, 2025
4e2fd3c
m
kgyrtkirk Mar 5, 2025
061a741
up
kgyrtkirk Mar 5, 2025
fb4ec06
sx
kgyrtkirk Mar 5, 2025
1d806d6
crap
kgyrtkirk Mar 6, 2025
b574226
refact
kgyrtkirk Mar 6, 2025
823d01c
x
kgyrtkirk Mar 6, 2025
0a1059e
messedup?
kgyrtkirk Mar 6, 2025
d788f9f
unlucky-for-some
kgyrtkirk Mar 6, 2025
8f053d1
some changes which have broke it
kgyrtkirk Mar 6, 2025
7cec2d2
t
kgyrtkirk Mar 6, 2025
35d2294
update expecttions
kgyrtkirk Mar 6, 2025
810d306
Merge branch 'msq-small-test-enhancements' into make-msq-tests-really…
kgyrtkirk Mar 6, 2025
008a155
u
kgyrtkirk Mar 6, 2025
637ae37
same
kgyrtkirk Mar 6, 2025
e1c4693
cleanup/etc
kgyrtkirk Mar 6, 2025
96b2a9c
run dart w/o old msqtestmodule
kgyrtkirk Mar 6, 2025
cff97bf
add-trial
kgyrtkirk Mar 6, 2025
11b212a
retain running dart with msqtestsqlmodule for now
kgyrtkirk Mar 6, 2025
83e36a3
middle
kgyrtkirk Mar 7, 2025
9a9eef4
crap
kgyrtkirk Mar 7, 2025
5e51e1a
Revert "crap"
kgyrtkirk Mar 7, 2025
5115dd7
fixme
kgyrtkirk Mar 7, 2025
a569e95
cleanup
kgyrtkirk Mar 7, 2025
73872ba
halfway
kgyrtkirk Mar 7, 2025
808a4c8
cleanup/etc
kgyrtkirk Mar 7, 2025
ae737be
make quidem run
kgyrtkirk Mar 7, 2025
0d8089d
moretest
kgyrtkirk Mar 7, 2025
07eb975
cleanup
kgyrtkirk Mar 7, 2025
0230874
cleanup tests/etc
kgyrtkirk Mar 10, 2025
96fa16a
remove
kgyrtkirk Mar 10, 2025
39c4f42
cleanup
kgyrtkirk Mar 10, 2025
07b986c
fixes
kgyrtkirk Mar 10, 2025
af3beae
move doc
kgyrtkirk Mar 10, 2025
a21433e
fixes bug
kgyrtkirk Mar 10, 2025
e60eea5
Revert "fixes bug"
kgyrtkirk Mar 10, 2025
f27c023
set host
kgyrtkirk Mar 10, 2025
26857dc
override
kgyrtkirk Mar 11, 2025
6faff99
cleanup
kgyrtkirk Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -84,7 +84,7 @@ public class DartControllerContext implements ControllerContext
private final ObjectMapper jsonMapper;
private final DruidNode selfNode;
private final DartWorkerClient workerClient;
private final BrokerServerView serverView;
private final TimelineServerView serverView;
private final MemoryIntrospector memoryIntrospector;
private final ServiceMetricEvent.Builder metricBuilder;
private final ServiceEmitter emitter;
Expand All @@ -95,7 +95,7 @@ public DartControllerContext(
final DruidNode selfNode,
final DartWorkerClient workerClient,
final MemoryIntrospector memoryIntrospector,
final BrokerServerView serverView,
final TimelineServerView serverView,
final ServiceEmitter emitter
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,28 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.dart.worker.DartWorkerClient;
import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;

public class DartControllerContextFactoryImpl implements DartControllerContextFactory
{
private final Injector injector;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final DruidNode selfNode;
private final ServiceClientFactory serviceClientFactory;
private final BrokerServerView serverView;
private final MemoryIntrospector memoryIntrospector;
private final ServiceEmitter emitter;
protected final Injector injector;
protected final ObjectMapper jsonMapper;
protected final ObjectMapper smileMapper;
protected final DruidNode selfNode;
protected final ServiceClientFactory serviceClientFactory;
protected final TimelineServerView serverView;
protected final MemoryIntrospector memoryIntrospector;
protected final ServiceEmitter emitter;

@Inject
public DartControllerContextFactoryImpl(
Expand All @@ -53,7 +53,7 @@ public DartControllerContextFactoryImpl(
@Self final DruidNode selfNode,
@EscalatedGlobal final ServiceClientFactory serviceClientFactory,
final MemoryIntrospector memoryIntrospector,
final BrokerServerView serverView,
final TimelineServerView serverView,
final ServiceEmitter emitter
)
{
Expand All @@ -74,7 +74,7 @@ public ControllerContext newContext(final String queryId)
injector,
jsonMapper,
selfNode,
new DartWorkerClient(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()),
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()),
memoryIntrospector,
serverView,
emitter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,192 +19,21 @@

package org.apache.druid.msq.dart.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.Pair;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.msq.dart.controller.DartWorkerManager;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.rpc.BaseWorkerClientImpl;
import org.apache.druid.rpc.FixedServiceLocator;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.utils.CloseableUtils;
import org.jboss.netty.handler.codec.http.HttpMethod;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
* Dart implementation of {@link WorkerClient}. Uses the same {@link BaseWorkerClientImpl} as the task-based engine.
* Each instance of this class is scoped to a single query.
*/
public class DartWorkerClient extends BaseWorkerClientImpl
public interface DartWorkerClient extends WorkerClient
{
private static final Logger log = new Logger(DartWorkerClient.class);

private final String queryId;
private final ServiceClientFactory clientFactory;
private final ServiceRetryPolicy retryPolicy;

@Nullable
private final String controllerHost;

@GuardedBy("clientMap")
private final Map<String, Pair<ServiceClient, Closeable>> clientMap = new HashMap<>();

/**
* Create a worker client.
*
* @param queryId dart query ID. see {@link DartSqlEngine#CTX_DART_QUERY_ID}
* @param clientFactory service client factor
* @param smileMapper Smile object mapper
* @param controllerHost Controller host (see {@link DartWorkerResource#HEADER_CONTROLLER_HOST}) if this is a
* controller-to-worker client. Null if this is a worker-to-worker client.
*/
public DartWorkerClient(
final String queryId,
final ServiceClientFactory clientFactory,
final ObjectMapper smileMapper,
@Nullable final String controllerHost
)
{
super(smileMapper, SmileMediaTypes.APPLICATION_JACKSON_SMILE);
this.queryId = queryId;
this.clientFactory = clientFactory;
this.controllerHost = controllerHost;

if (controllerHost == null) {
// worker -> worker client. Retry HTTP 503 in case worker A starts up before worker B, and needs to
// contact it immediately.
this.retryPolicy = new DartWorkerRetryPolicy(true);
} else {
// controller -> worker client. Do not retry any HTTP error codes. If we retry HTTP 503 for controller -> worker,
// we can get stuck trying to contact workers that have exited.
this.retryPolicy = new DartWorkerRetryPolicy(false);
}
}

@Override
protected ServiceClient getClient(final String workerIdString)
{
final WorkerId workerId = WorkerId.fromString(workerIdString);
if (!queryId.equals(workerId.getQueryId())) {
throw DruidException.defensive("Unexpected queryId[%s]. Expected queryId[%s]", workerId.getQueryId(), queryId);
}

synchronized (clientMap) {
return clientMap.computeIfAbsent(workerId.getHostAndPort(), ignored -> makeNewClient(workerId)).left();
}
}

/**
* Close a single worker's clients. Used when that worker fails, so we stop trying to contact it.
*
* @param workerHost worker host:port
*/
public void closeClient(final String workerHost)
{
synchronized (clientMap) {
final Pair<ServiceClient, Closeable> clientPair = clientMap.remove(workerHost);
if (clientPair != null) {
CloseableUtils.closeAndWrapExceptions(clientPair.right());
}
}
}

/**
* Close all outstanding clients.
*/
@Override
public void close()
{
synchronized (clientMap) {
for (Map.Entry<String, Pair<ServiceClient, Closeable>> entry : clientMap.entrySet()) {
CloseableUtils.closeAndSuppressExceptions(
entry.getValue().right(),
e -> log.warn(e, "Failed to close client[%s]", entry.getKey())
);
}

clientMap.clear();
}
}
void closeClient(String hostAndPort);

/**
* Stops a worker. Dart-only API, used by the {@link DartWorkerManager}.
*/
public ListenableFuture<?> stopWorker(String workerId)
{
return getClient(workerId).asyncRequest(
new RequestBuilder(HttpMethod.POST, "/stop"),
IgnoreHttpResponseHandler.INSTANCE
);
}

/**
* Create a new client. Called by {@link #getClient(String)} if a new one is needed.
*/
private Pair<ServiceClient, Closeable> makeNewClient(final WorkerId workerId)
{
final URI uri = workerId.toUri();
final FixedServiceLocator locator = new FixedServiceLocator(ServiceLocation.fromUri(uri));
final ServiceClient baseClient =
clientFactory.makeClient(workerId.toString(), locator, retryPolicy);
final ServiceClient client;

if (controllerHost != null) {
client = new ControllerDecoratedClient(baseClient, controllerHost);
} else {
client = baseClient;
}

return Pair.of(client, locator);
}

/**
* Service client that adds the {@link DartWorkerResource#HEADER_CONTROLLER_HOST} header.
*/
private static class ControllerDecoratedClient implements ServiceClient
{
private final ServiceClient delegate;
private final String controllerHost;

ControllerDecoratedClient(final ServiceClient delegate, final String controllerHost)
{
this.delegate = delegate;
this.controllerHost = controllerHost;
}

@Override
public <IntermediateType, FinalType> ListenableFuture<FinalType> asyncRequest(
final RequestBuilder requestBuilder,
final HttpResponseHandler<IntermediateType, FinalType> handler
)
{
return delegate.asyncRequest(
requestBuilder.header(DartWorkerResource.HEADER_CONTROLLER_HOST, controllerHost),
handler
);
}

@Override
public ServiceClient withRetryPolicy(final ServiceRetryPolicy retryPolicy)
{
return new ControllerDecoratedClient(delegate.withRetryPolicy(retryPolicy), controllerHost);
}
}
ListenableFuture<?> stopWorker(String workerId);
}
Loading