Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/api-reference/dynamic-configuration-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
"cloneServers": {},
"historicalTierAliases": {
"hot": ["hot_1", "hot_2"]
}
},
"coordinatingVersions": []

}
```
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/index.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private boolean awaitInitializationOnStart = true;

/**
* If non-empty, only servers whose deploymentGroup is in this set are watched.
* Realtime servers (peons / indexers) bypass this filter unless
* {@link #strictRealtimeDeploymentGroupFilter} is true.
*/
@JsonProperty
private Set<String> watchedDeploymentGroups = null;

/**
* When true, the deploymentGroup filter is applied to realtime servers as well as historicals.
* Default false: realtime servers are always watched so red/black brokers can both query in-flight
* data during a rollover. Set true only for strict isolation (e.g. testing a realtime ingest change).
*/
@JsonProperty
private boolean strictRealtimeDeploymentGroupFilter = false;

public Set<String> getWatchedTiers()
{
return watchedTiers;
Expand All @@ -66,4 +82,14 @@ public boolean isAwaitInitializationOnStart()
{
return awaitInitializationOnStart;
}

public Set<String> getWatchedDeploymentGroups()
{
return watchedDeploymentGroups;
}

public boolean isStrictRealtimeDeploymentGroupFilter()
{
return strictRealtimeDeploymentGroupFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -124,6 +125,10 @@ public BrokerServerView(
return false;
}

if (!isDeploymentGroupAllowed(metadataAndSegment.lhs)) {
return false;
}

// Include realtime tasks only if they are watched
return metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR
|| segmentWatcherConfig.isWatchRealtimeTasks();
Expand Down Expand Up @@ -172,7 +177,7 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
public CallbackAction serverAdded(DruidServer server)
{
// We don't track brokers in this view.
if (!server.getType().equals(ServerType.BROKER)) {
if (!server.getType().equals(ServerType.BROKER) && isDeploymentGroupAllowed(server.getMetadata())) {
addServer(server);
}
return CallbackAction.CONTINUE;
Expand Down Expand Up @@ -246,6 +251,30 @@ private void validateSegmentWatcherConfig(BrokerSegmentWatcherConfig watcherConf
&& watcherConfig.getIgnoredTiers().isEmpty()) {
throw new ISE("If configured, 'druid.broker.segment.ignoredTiers' must be non-empty");
}

if (watcherConfig.getWatchedDeploymentGroups() != null
&& watcherConfig.getWatchedDeploymentGroups().isEmpty()) {
throw new ISE("If configured, 'druid.broker.segment.watchedDeploymentGroups' must be non-empty");
}
}

/**
* Returns true if the server's deploymentGroup passes the watched filter, or if the server is
* a realtime server type and the strict-realtime toggle is off (the default).
*/
private boolean isDeploymentGroupAllowed(DruidServerMetadata server)
{
final boolean isRealtime =
server.getType() == ServerType.INDEXER_EXECUTOR || server.getType() == ServerType.REALTIME;
if (isRealtime && !segmentWatcherConfig.isStrictRealtimeDeploymentGroupFilter()) {
return true;
}

final Set<String> watched = segmentWatcherConfig.getWatchedDeploymentGroups();
if (watched != null && !watched.contains(server.getDeploymentGroup())) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Propagate deploymentGroup through inventory DruidServer construction

This filter depends on DruidServerMetadata.getDeploymentGroup(), but the normal discovery inventory path still builds DruidServer instances through DruidServer constructors that do not pass DruidNode.getDeploymentGroup() into DruidServerMetadata. In a real cluster, Historicals therefore arrive here with a null deploymentGroup, so setting druid.broker.segment.watchedDeploymentGroups filters out all Historicals; the same lost metadata also prevents coordinator resources from seeing active deployment groups. Please preserve the DruidNode deploymentGroup when constructing inventory DruidServer metadata.

return false;
}
return true;
}

private QueryableDruidServer addServer(DruidServer server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public DruidServerMetadata getMetadata(
config.getStorageSize(),
serverTypeConfig.getServerType(),
config.getTier(),
config.getPriority()
config.getPriority(),
node.getDeploymentGroup()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,6 @@ public Optional<ChatHandler> get(final String key)

private DruidNode makeDruidNode(String key)
{
return new DruidNode(
key,
node.getHost(),
node.isBindOnHost(),
node.getPlaintextPort(),
node.getTlsPort(),
node.isEnablePlaintextPort(),
node.isEnableTlsPort()
);
return node.withService(key);
}
}
63 changes: 56 additions & 7 deletions server/src/main/java/org/apache/druid/server/DruidNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
Expand Down Expand Up @@ -99,6 +100,16 @@ public class DruidNode
@JsonProperty
private Map<String, String> labels;

/**
* Operator-set tag identifying which deployment group this node belongs to (e.g. red/black for R/B upgrades).
* Used by version-aware query routing on the broker and router. Null means "no group" and matches everywhere.
* Omitted from JSON when null so older consumers see unchanged output.
*/
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private String deploymentGroup;

public DruidNode(
String serviceName,
String host,
Expand All @@ -109,7 +120,22 @@ public DruidNode(
boolean enableTlsPort
)
{
this(serviceName, host, bindOnHost, plaintextPort, null, tlsPort, enablePlaintextPort, enableTlsPort, null);
this(serviceName, host, bindOnHost, plaintextPort, null, tlsPort, enablePlaintextPort, enableTlsPort, null, null);
}

public DruidNode(
String serviceName,
String host,
boolean bindOnHost,
Integer plaintextPort,
Integer port,
Integer tlsPort,
Boolean enablePlaintextPort,
boolean enableTlsPort,
@Nullable Map<String, String> labels
)
{
this(serviceName, host, bindOnHost, plaintextPort, port, tlsPort, enablePlaintextPort, enableTlsPort, labels, null);
}

/**
Expand Down Expand Up @@ -138,7 +164,8 @@ public DruidNode(
@JacksonInject @Named("tlsServicePort") @JsonProperty("tlsPort") Integer tlsPort,
@JsonProperty("enablePlaintextPort") Boolean enablePlaintextPort,
@JsonProperty("enableTlsPort") boolean enableTlsPort,
@JsonProperty("labels") @Nullable Map<String, String> labels
@JsonProperty("labels") @Nullable Map<String, String> labels,
@JsonProperty("deploymentGroup") @Nullable String deploymentGroup
)
{
init(
Expand All @@ -149,7 +176,8 @@ public DruidNode(
tlsPort,
enablePlaintextPort == null || enablePlaintextPort.booleanValue(),
enableTlsPort,
labels
labels,
deploymentGroup
);
}

Expand All @@ -161,7 +189,8 @@ private void init(
Integer tlsPort,
boolean enablePlaintextPort,
boolean enableTlsPort,
Map<String, String> labels
Map<String, String> labels,
@Nullable String deploymentGroup
)
{
Preconditions.checkNotNull(serviceName);
Expand Down Expand Up @@ -222,6 +251,13 @@ private void init(
this.host = host;
this.bindOnHost = bindOnHost;
this.labels = labels;
this.deploymentGroup = deploymentGroup;
}

@Nullable
public String getDeploymentGroup()
{
return deploymentGroup;
}

@Nullable
Expand Down Expand Up @@ -277,7 +313,18 @@ public String getBuildRevision()

public DruidNode withService(String service)
{
return new DruidNode(service, host, bindOnHost, plaintextPort, tlsPort, enablePlaintextPort, enableTlsPort);
return new DruidNode(
service,
host,
bindOnHost,
plaintextPort,
null,
tlsPort,
enablePlaintextPort,
enableTlsPort,
labels,
deploymentGroup
);
}

public String getServiceScheme()
Expand Down Expand Up @@ -360,13 +407,14 @@ public boolean equals(Object o)
enableTlsPort == druidNode.enableTlsPort &&
Objects.equals(serviceName, druidNode.serviceName) &&
Objects.equals(host, druidNode.host) &&
Objects.equals(labels, druidNode.labels);
Objects.equals(labels, druidNode.labels) &&
Objects.equals(deploymentGroup, druidNode.deploymentGroup);
}

@Override
public int hashCode()
{
return Objects.hash(serviceName, host, port, plaintextPort, enablePlaintextPort, tlsPort, enableTlsPort, labels);
return Objects.hash(serviceName, host, port, plaintextPort, enablePlaintextPort, tlsPort, enableTlsPort, labels, deploymentGroup);
}

@Override
Expand All @@ -382,6 +430,7 @@ public String toString()
", tlsPort=" + tlsPort +
", enableTlsPort=" + enableTlsPort +
", labels=" + labels +
", deploymentGroup='" + deploymentGroup + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.server.coordination;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

Expand All @@ -40,6 +41,22 @@ public class DruidServerMetadata
private final String tier;
private final ServerType type;
private final int priority;
@Nullable
private final String deploymentGroup;

public DruidServerMetadata(
String name,
@Nullable String hostAndPort,
@Nullable String hostAndTlsPort,
long maxSize,
@Nullable Long storageSize,
ServerType type,
String tier,
int priority
)
{
this(name, hostAndPort, hostAndTlsPort, maxSize, storageSize, type, tier, priority, null);
}

// Either hostAndPort or hostAndTlsPort would be null depending on the type of connection.
@JsonCreator
Expand All @@ -51,7 +68,8 @@ public DruidServerMetadata(
@JsonProperty("storageSize") @Nullable Long storageSize,
@JsonProperty("type") ServerType type,
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
@JsonProperty("priority") int priority,
@JsonProperty("deploymentGroup") @Nullable String deploymentGroup
)
{
this.name = Preconditions.checkNotNull(name);
Expand All @@ -63,6 +81,7 @@ public DruidServerMetadata(
this.tier = tier;
this.type = type;
this.priority = priority;
this.deploymentGroup = deploymentGroup;
}

@JsonProperty
Expand Down Expand Up @@ -120,6 +139,19 @@ public int getPriority()
return priority;
}

/**
* Operator-set tag identifying the deployment group of this server (e.g. red/black for R/B upgrades).
* Null means unset. Used by version-aware query routing on the broker.
* Omitted from JSON when null so older consumers that don't know about this field see unchanged output.
*/
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getDeploymentGroup()
{
return deploymentGroup;
}

public boolean isSegmentReplicationTarget()
{
return type.isSegmentReplicationTarget();
Expand Down Expand Up @@ -163,13 +195,16 @@ public boolean equals(Object o)
if (type != that.type) {
return false;
}
return priority == that.priority;
if (priority != that.priority) {
return false;
}
return Objects.equals(deploymentGroup, that.deploymentGroup);
}

@Override
public int hashCode()
{
return Objects.hash(name, hostAndPort, hostAndTlsPort, maxSize, storageSize, tier, type, priority);
return Objects.hash(name, hostAndPort, hostAndTlsPort, maxSize, storageSize, tier, type, priority, deploymentGroup);
}

@Override
Expand All @@ -184,6 +219,7 @@ public String toString()
", tier='" + tier + '\'' +
", type=" + type +
", priority=" + priority +
", deploymentGroup='" + deploymentGroup + '\'' +
'}';
}
}
Loading
Loading