diff --git a/docs/api-reference/dynamic-configuration-api.md b/docs/api-reference/dynamic-configuration-api.md index 259e50c88008..9f8bdd3beef8 100644 --- a/docs/api-reference/dynamic-configuration-api.md +++ b/docs/api-reference/dynamic-configuration-api.md @@ -110,7 +110,8 @@ Host: http://ROUTER_IP:ROUTER_PORT "cloneServers": {}, "historicalTierAliases": { "hot": ["hot_1", "hot_2"] - } + }, + "coordinatingVersions": [] } ``` diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 8f4e4a7dcbfe..03db0eb98604 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -716,6 +716,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.md) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative integer.|8281| |`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services.|`druid/coordinator`| |`druid.labels`|Optional JSON object of key-value pairs that define custom labels for the server. These labels are displayed in the web console under the "Services" tab. Example: `druid.labels={"location":"Airtrunk"}` or `druid.labels.location=Airtrunk`|`null`| +|`druid.deploymentGroup`|Optional tag identifying the deployment group this service belongs to (for example `red` or `black` during a red/black upgrade). Used by version-aware query routing on Brokers (`druid.broker.segment.watchedDeploymentGroups`) and Routers (`druid.router.acceptableDeploymentGroups`) to isolate query traffic between two parallel control planes that share the same metadata, deep storage, and discovery. Surfaced as the `deployment_group` column of `sys.servers`. Master services (Coordinator, Overlord) typically leave this unset so they can manage both groups during cutover.|`null`| ##### Coordinator operation @@ -812,6 +813,7 @@ The following table shows the dynamic configuration properties for the Coordinat |`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.
Please use this config with caution. All servers should eventually be removed from this list once the segment loading on the respective historicals is finished. |none| |`cloneServers`| Experimental. Map from target Historical server to source Historical server which should be cloned by the target. The target Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact copy of the source. Segments on the target Historical do not count towards replica counts either. If the source disappears, the target remains in the last known state of the source server until removed from the configuration.
Use this config with caution. All servers should eventually be removed from this list once the desired state on the respective Historicals is achieved. |none| |`historicalTierAliases`|Map from a virtual tier name to the set of real Historical tier names it expands to. When a load/drop rule references a virtual alias tier, the Coordinator replaces it with its real tiers — each receiving the full replica count independently. The alias key itself is never loaded to directly. For example, `{"hot": ["hot_1", "hot_2"]}` causes a rule of `{"hot": 2}` to load 2 replicas on each of `hot_1` and `hot_2`; `hot` receives no direct assignment. An alias value tier with no servers raises the normal invalid-tier alert. If a rule already specifies an explicit replica count for a tier that also appears as an alias value, the explicit count takes precedence. Duplicate tier names within a set are ignored. A virtual alias tier cannot also be a physical tier.|none| +|`coordinatingVersions`|List of deployment groups for which the Coordinator enforces per-group Historical replication and handoff. When set, load/drop rules are applied independently to each listed group that has active servers in a tier, so each group receives the rule's required replicas. Servers outside the listed groups are not assigned new replicas and may have surplus replicas dropped. Empty disables per-group coordination.|none| ##### Smart segment loading @@ -961,6 +963,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.md) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|8290| |`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services.|`druid/overlord`| |`druid.labels`|Optional JSON object of key-value pairs that define custom labels for the server. These labels are displayed in the web console under the "Services" tab. Example: `druid.labels={"location":"Airtrunk"}` or `druid.labels.location=Airtrunk`|`null`| +|`druid.deploymentGroup`|Optional tag identifying the deployment group this service belongs to (for example `red` or `black` during a red/black upgrade). Used by version-aware query routing on Brokers (`druid.broker.segment.watchedDeploymentGroups`) and Routers (`druid.router.acceptableDeploymentGroups`) to isolate query traffic between two parallel control planes that share the same metadata, deep storage, and discovery. Surfaced as the `deployment_group` column of `sys.servers`. Master services (Coordinator, Overlord) typically leave this unset so they can manage both groups during cutover.|`null`| ##### Overlord operations @@ -1314,6 +1317,7 @@ These Middle Manager and Peon configurations can be defined in the `middleManage |`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.md) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|8291| |`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services|`druid/middlemanager`| |`druid.labels`|Optional JSON object of key-value pairs that define custom labels for the server. These labels are displayed in the web console under the "Services" tab. Example: `druid.labels={"location":"Airtrunk"}` or `druid.labels.location=Airtrunk`|`null`| +|`druid.deploymentGroup`|Optional tag identifying the deployment group this service belongs to (for example `red` or `black` during a red/black upgrade). Used by version-aware query routing on Brokers (`druid.broker.segment.watchedDeploymentGroups`) and Routers (`druid.router.acceptableDeploymentGroups`) to isolate query traffic between two parallel control planes that share the same metadata, deep storage, and discovery. Surfaced as the `deployment_group` column of `sys.servers`. Master services (Coordinator, Overlord) typically leave this unset so they can manage both groups during cutover.|`null`| #### Middle Manager configuration @@ -1443,6 +1447,7 @@ For most types of tasks, `SegmentWriteOutMediumFactory` can be configured per-ta |`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.md) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|8283| |`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services|`druid/indexer`| |`druid.labels`|Optional JSON object of key-value pairs that define custom labels for the server. These labels are displayed in the web console under the "Services" tab. Example: `druid.labels={"location":"Airtrunk"}` or `druid.labels.location=Airtrunk`|`null`| +|`druid.deploymentGroup`|Optional tag identifying the deployment group this service belongs to (for example `red` or `black` during a red/black upgrade). Used by version-aware query routing on Brokers (`druid.broker.segment.watchedDeploymentGroups`) and Routers (`druid.router.acceptableDeploymentGroups`) to isolate query traffic between two parallel control planes that share the same metadata, deep storage, and discovery. Surfaced as the `deployment_group` column of `sys.servers`. Master services (Coordinator, Overlord) typically leave this unset so they can manage both groups during cutover.|`null`| #### Indexer general configuration @@ -1540,6 +1545,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.md) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|8283| |`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services|`druid/historical`| |`druid.labels`|Optional JSON object of key-value pairs that define custom labels for the server. These labels are displayed in the web console under the "Services" tab. Example: `druid.labels={"location":"Airtrunk"}` or `druid.labels.location=Airtrunk`|`null`| +|`druid.deploymentGroup`|Optional tag identifying the deployment group this service belongs to (for example `red` or `black` during a red/black upgrade). Used by version-aware query routing on Brokers (`druid.broker.segment.watchedDeploymentGroups`) and Routers (`druid.router.acceptableDeploymentGroups`) to isolate query traffic between two parallel control planes that share the same metadata, deep storage, and discovery. Surfaced as the `deployment_group` column of `sys.servers`. Master services (Coordinator, Overlord) typically leave this unset so they can manage both groups during cutover.|`null`| #### Historical general configuration @@ -1655,6 +1661,7 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi |`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.md) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|8282| |`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services|`druid/broker`| |`druid.labels`|Optional JSON object of key-value pairs that define custom labels for the server. These labels are displayed in the web console under the "Services" tab. Example: `druid.labels={"location":"Airtrunk"}` or `druid.labels.location=Airtrunk`|`null`| +|`druid.deploymentGroup`|Optional tag identifying the deployment group this service belongs to (for example `red` or `black` during a red/black upgrade). Used by version-aware query routing on Brokers (`druid.broker.segment.watchedDeploymentGroups`) and Routers (`druid.router.acceptableDeploymentGroups`) to isolate query traffic between two parallel control planes that share the same metadata, deep storage, and discovery. Surfaced as the `deployment_group` column of `sys.servers`. Master services (Coordinator, Overlord) typically leave this unset so they can manage both groups during cutover.|`null`| #### Query configuration @@ -1911,6 +1918,8 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.watchRealtimeTasks`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeTasks` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeTasks` to false. |true| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| +|`druid.broker.segment.watchedDeploymentGroups`|List of strings|Restricts the Broker's segment view to data servers whose `druid.deploymentGroup` matches one of the listed values. Realtime servers (peons and indexers) bypass this filter unless `druid.broker.segment.strictRealtimeDeploymentGroupFilter` is true. Used to isolate query traffic during red/black upgrades while still allowing both clusters to query in-flight realtime data. Empty means no filtering.|none| +|`druid.broker.segment.strictRealtimeDeploymentGroupFilter`|Boolean|When true, `druid.broker.segment.watchedDeploymentGroups` also applies to realtime servers (peons, indexers). Default false: realtime servers are always watched regardless of deployment group, so red and black brokers can both serve queries against in-flight ingestion during a rollover.|false| ## Metrics monitors @@ -2280,6 +2289,7 @@ Supported query contexts: |`druid.tlsPort`|TLS port for HTTPS connector, if [druid.enableTlsPort](../operations/tls-support.md) is set then this config will be used. If `druid.host` contains port then that port will be ignored. This should be a non-negative Integer.|9088| |`druid.service`|The name of the service. This is used as a dimension when emitting metrics and alerts to differentiate between the various services|`druid/router`| |`druid.labels`|Optional JSON object of key-value pairs that define custom labels for the server. These labels are displayed in the web console under the "Services" tab. Example: `druid.labels={"location":"Airtrunk"}` or `druid.labels.location=Airtrunk`|`null`| +|`druid.deploymentGroup`|Optional tag identifying the deployment group this service belongs to (for example `red` or `black` during a red/black upgrade). Used by version-aware query routing on Brokers (`druid.broker.segment.watchedDeploymentGroups`) and Routers (`druid.router.acceptableDeploymentGroups`) to isolate query traffic between two parallel control planes that share the same metadata, deep storage, and discovery. Surfaced as the `deployment_group` column of `sys.servers`. Master services (Coordinator, Overlord) typically leave this unset so they can manage both groups during cutover.|`null`| #### Runtime configuration @@ -2291,6 +2301,7 @@ Supported query contexts: |`druid.router.pollPeriod`|How often to poll for new rules.|`PT1M`| |`druid.router.sql.enable`|Enable routing of SQL queries using strategies. When`true`, the Router uses the strategies defined in `druid.router.strategies` to determine the broker service for a given SQL query. When `false`, the Router uses the `defaultBrokerServiceName`.|`false`| |`druid.router.strategies`|Please see [Router Strategies](../design/router.md#router-strategies) for details.|`[{"type":"timeBoundary"},{"type":"priority"}]`| +|`druid.router.acceptableDeploymentGroups`|List of strings|If non-empty, the Router only routes queries to Brokers whose `druid.deploymentGroup` is in this set. Brokers without a deployment group tag match only when this is empty. When a query's serviceName has no matching Broker after filtering, the selector returns no server and the request fails with a clear error (a warning is logged). Use during red/black upgrades to direct user traffic to one cluster's Brokers while the other cluster is still discoverable.|none| |`druid.router.avatica.balancer.type`|Class to use for balancing Avatica queries across Brokers. Please see [Avatica Query Balancing](../design/router.md#avatica-query-balancing).|`rendezvousHash`| |`druid.router.managementProxy.enabled`|Enables the Router's [management proxy](../design/router.md#router-as-management-proxy) functionality.|false| |`druid.router.http.numConnections`|Size of connection pool for the Router to connect to Broker processes. If there are more queries than this number that all need to speak to the same process, then they will queue up.|`20`| diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java index 5f3dbd3abe7d..cd89f722edf0 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java @@ -42,6 +42,22 @@ public class BrokerSegmentWatcherConfig @JsonProperty private boolean awaitInitializationOnStart = true; + /** + * If non-empty, only servers whose deploymentGroup is in this set are watched. + * Realtime servers (peons / indexers) bypass this filter unless + * {@link #strictRealtimeDeploymentGroupFilter} is true. + */ + @JsonProperty + private Set watchedDeploymentGroups = null; + + /** + * When true, the deploymentGroup filter is applied to realtime servers as well as historicals. + * Default false: realtime servers are always watched so red/black brokers can both query in-flight + * data during a rollover. Set true only for strict isolation (e.g. testing a realtime ingest change). + */ + @JsonProperty + private boolean strictRealtimeDeploymentGroupFilter = false; + public Set getWatchedTiers() { return watchedTiers; @@ -66,4 +82,14 @@ public boolean isAwaitInitializationOnStart() { return awaitInitializationOnStart; } + + public Set getWatchedDeploymentGroups() + { + return watchedDeploymentGroups; + } + + public boolean isStrictRealtimeDeploymentGroupFilter() + { + return strictRealtimeDeploymentGroupFilter; + } } diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 61a85cd4c2ce..a888e1ebbcde 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -124,6 +125,10 @@ public BrokerServerView( return false; } + if (!isDeploymentGroupAllowed(metadataAndSegment.lhs)) { + return false; + } + // Include realtime tasks only if they are watched return metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR || segmentWatcherConfig.isWatchRealtimeTasks(); @@ -172,7 +177,7 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) public CallbackAction serverAdded(DruidServer server) { // We don't track brokers in this view. - if (!server.getType().equals(ServerType.BROKER)) { + if (!server.getType().equals(ServerType.BROKER) && isDeploymentGroupAllowed(server.getMetadata())) { addServer(server); } return CallbackAction.CONTINUE; @@ -246,6 +251,30 @@ private void validateSegmentWatcherConfig(BrokerSegmentWatcherConfig watcherConf && watcherConfig.getIgnoredTiers().isEmpty()) { throw new ISE("If configured, 'druid.broker.segment.ignoredTiers' must be non-empty"); } + + if (watcherConfig.getWatchedDeploymentGroups() != null + && watcherConfig.getWatchedDeploymentGroups().isEmpty()) { + throw new ISE("If configured, 'druid.broker.segment.watchedDeploymentGroups' must be non-empty"); + } + } + + /** + * Returns true if the server's deploymentGroup passes the watched filter, or if the server is + * a realtime server type and the strict-realtime toggle is off (the default). + */ + private boolean isDeploymentGroupAllowed(DruidServerMetadata server) + { + final boolean isRealtime = + server.getType() == ServerType.INDEXER_EXECUTOR || server.getType() == ServerType.REALTIME; + if (isRealtime && !segmentWatcherConfig.isStrictRealtimeDeploymentGroupFilter()) { + return true; + } + + final Set watched = segmentWatcherConfig.getWatchedDeploymentGroups(); + if (watched != null && !watched.contains(server.getDeploymentGroup())) { + return false; + } + return true; } private QueryableDruidServer addServer(DruidServer server) diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java index e030cd8cd2ab..433c1525bddd 100644 --- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java @@ -87,7 +87,8 @@ public DruidServerMetadata getMetadata( config.getStorageSize(), serverTypeConfig.getServerType(), config.getTier(), - config.getPriority() + config.getPriority(), + node.getDeploymentGroup() ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProvider.java b/server/src/main/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProvider.java index da3975e4545a..6eb746ebdcef 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProvider.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProvider.java @@ -119,14 +119,6 @@ public Optional get(final String key) private DruidNode makeDruidNode(String key) { - return new DruidNode( - key, - node.getHost(), - node.isBindOnHost(), - node.getPlaintextPort(), - node.getTlsPort(), - node.isEnablePlaintextPort(), - node.isEnableTlsPort() - ); + return node.withService(key); } } diff --git a/server/src/main/java/org/apache/druid/server/DruidNode.java b/server/src/main/java/org/apache/druid/server/DruidNode.java index 820d8d32a08a..82047eebe982 100644 --- a/server/src/main/java/org/apache/druid/server/DruidNode.java +++ b/server/src/main/java/org/apache/druid/server/DruidNode.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; @@ -99,6 +100,16 @@ public class DruidNode @JsonProperty private Map labels; + /** + * Operator-set tag identifying which deployment group this node belongs to (e.g. red/black for R/B upgrades). + * Used by version-aware query routing on the broker and router. Null means "no group" and matches everywhere. + * Omitted from JSON when null so older consumers see unchanged output. + */ + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private String deploymentGroup; + public DruidNode( String serviceName, String host, @@ -109,7 +120,22 @@ public DruidNode( boolean enableTlsPort ) { - this(serviceName, host, bindOnHost, plaintextPort, null, tlsPort, enablePlaintextPort, enableTlsPort, null); + this(serviceName, host, bindOnHost, plaintextPort, null, tlsPort, enablePlaintextPort, enableTlsPort, null, null); + } + + public DruidNode( + String serviceName, + String host, + boolean bindOnHost, + Integer plaintextPort, + Integer port, + Integer tlsPort, + Boolean enablePlaintextPort, + boolean enableTlsPort, + @Nullable Map labels + ) + { + this(serviceName, host, bindOnHost, plaintextPort, port, tlsPort, enablePlaintextPort, enableTlsPort, labels, null); } /** @@ -138,7 +164,8 @@ public DruidNode( @JacksonInject @Named("tlsServicePort") @JsonProperty("tlsPort") Integer tlsPort, @JsonProperty("enablePlaintextPort") Boolean enablePlaintextPort, @JsonProperty("enableTlsPort") boolean enableTlsPort, - @JsonProperty("labels") @Nullable Map labels + @JsonProperty("labels") @Nullable Map labels, + @JsonProperty("deploymentGroup") @Nullable String deploymentGroup ) { init( @@ -149,7 +176,8 @@ public DruidNode( tlsPort, enablePlaintextPort == null || enablePlaintextPort.booleanValue(), enableTlsPort, - labels + labels, + deploymentGroup ); } @@ -161,7 +189,8 @@ private void init( Integer tlsPort, boolean enablePlaintextPort, boolean enableTlsPort, - Map labels + Map labels, + @Nullable String deploymentGroup ) { Preconditions.checkNotNull(serviceName); @@ -222,6 +251,13 @@ private void init( this.host = host; this.bindOnHost = bindOnHost; this.labels = labels; + this.deploymentGroup = deploymentGroup; + } + + @Nullable + public String getDeploymentGroup() + { + return deploymentGroup; } @Nullable @@ -277,7 +313,18 @@ public String getBuildRevision() public DruidNode withService(String service) { - return new DruidNode(service, host, bindOnHost, plaintextPort, tlsPort, enablePlaintextPort, enableTlsPort); + return new DruidNode( + service, + host, + bindOnHost, + plaintextPort, + null, + tlsPort, + enablePlaintextPort, + enableTlsPort, + labels, + deploymentGroup + ); } public String getServiceScheme() @@ -360,13 +407,14 @@ public boolean equals(Object o) enableTlsPort == druidNode.enableTlsPort && Objects.equals(serviceName, druidNode.serviceName) && Objects.equals(host, druidNode.host) && - Objects.equals(labels, druidNode.labels); + Objects.equals(labels, druidNode.labels) && + Objects.equals(deploymentGroup, druidNode.deploymentGroup); } @Override public int hashCode() { - return Objects.hash(serviceName, host, port, plaintextPort, enablePlaintextPort, tlsPort, enableTlsPort, labels); + return Objects.hash(serviceName, host, port, plaintextPort, enablePlaintextPort, tlsPort, enableTlsPort, labels, deploymentGroup); } @Override @@ -382,6 +430,7 @@ public String toString() ", tlsPort=" + tlsPort + ", enableTlsPort=" + enableTlsPort + ", labels=" + labels + + ", deploymentGroup='" + deploymentGroup + '\'' + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java index 31a62a59846f..0e3568c674e8 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DruidServerMetadata.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordination; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; @@ -40,6 +41,22 @@ public class DruidServerMetadata private final String tier; private final ServerType type; private final int priority; + @Nullable + private final String deploymentGroup; + + public DruidServerMetadata( + String name, + @Nullable String hostAndPort, + @Nullable String hostAndTlsPort, + long maxSize, + @Nullable Long storageSize, + ServerType type, + String tier, + int priority + ) + { + this(name, hostAndPort, hostAndTlsPort, maxSize, storageSize, type, tier, priority, null); + } // Either hostAndPort or hostAndTlsPort would be null depending on the type of connection. @JsonCreator @@ -51,7 +68,8 @@ public DruidServerMetadata( @JsonProperty("storageSize") @Nullable Long storageSize, @JsonProperty("type") ServerType type, @JsonProperty("tier") String tier, - @JsonProperty("priority") int priority + @JsonProperty("priority") int priority, + @JsonProperty("deploymentGroup") @Nullable String deploymentGroup ) { this.name = Preconditions.checkNotNull(name); @@ -63,6 +81,7 @@ public DruidServerMetadata( this.tier = tier; this.type = type; this.priority = priority; + this.deploymentGroup = deploymentGroup; } @JsonProperty @@ -120,6 +139,19 @@ public int getPriority() return priority; } + /** + * Operator-set tag identifying the deployment group of this server (e.g. red/black for R/B upgrades). + * Null means unset. Used by version-aware query routing on the broker. + * Omitted from JSON when null so older consumers that don't know about this field see unchanged output. + */ + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getDeploymentGroup() + { + return deploymentGroup; + } + public boolean isSegmentReplicationTarget() { return type.isSegmentReplicationTarget(); @@ -163,13 +195,16 @@ public boolean equals(Object o) if (type != that.type) { return false; } - return priority == that.priority; + if (priority != that.priority) { + return false; + } + return Objects.equals(deploymentGroup, that.deploymentGroup); } @Override public int hashCode() { - return Objects.hash(name, hostAndPort, hostAndTlsPort, maxSize, storageSize, tier, type, priority); + return Objects.hash(name, hostAndPort, hostAndTlsPort, maxSize, storageSize, tier, type, priority, deploymentGroup); } @Override @@ -184,6 +219,7 @@ public String toString() ", tier='" + tier + '\'' + ", type=" + type + ", priority=" + priority + + ", deploymentGroup='" + deploymentGroup + '\'' + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 66652961e367..671db379de59 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -84,6 +84,15 @@ public class CoordinatorDynamicConfig */ private final Map> historicalTierAliases; + /** + * Deployment groups the coordinator enforces per-group replication and handoff for. + * When non-empty, the coordinator ensures {@code requiredReplicas} per listed group rather than + * per tier, and the handoff endpoint requires at least one server in each listed group (that has + * online servers) to serve the segment before declaring handoff complete. + * An empty set disables this behavior and restores default tier-wide replication. + */ + private final Set coordinatingVersions; + /** * Stale pending segments belonging to the data sources in this list are not killed by {@code * KillStalePendingSegments}. In other words, segments in these data sources are "protected". @@ -135,7 +144,8 @@ public CoordinatorDynamicConfig( @JsonProperty("debugDimensions") @Nullable Map debugDimensions, @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes, @JsonProperty("cloneServers") @Nullable Map cloneServers, - @JsonProperty("historicalTierAliases") @Nullable Map> historicalTierAliases + @JsonProperty("historicalTierAliases") @Nullable Map> historicalTierAliases, + @JsonProperty("coordinatingVersions") @Nullable Set coordinatingVersions ) { this.markSegmentAsUnusedDelayMillis = @@ -183,6 +193,7 @@ public CoordinatorDynamicConfig( this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of()); this.historicalTierAliases = Configs.valueOrDefault(historicalTierAliases, Map.of()); + this.coordinatingVersions = Configs.valueOrDefault(coordinatingVersions, Set.of()); final Set aliasKeys = this.historicalTierAliases.keySet(); for (Set mappedTiers : this.historicalTierAliases.values()) { if (!Sets.intersection(mappedTiers, aliasKeys).isEmpty()) { @@ -364,6 +375,12 @@ public Map> getHistoricalTierAliases() return historicalTierAliases; } + @JsonProperty + public Set getCoordinatingVersions() + { + return coordinatingVersions; + } + /** * List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load * segments. This causes decreases the average time taken to load segments. However, this also means less resources @@ -398,6 +415,7 @@ public String toString() ", turboLoadingNodes=" + turboLoadingNodes + ", cloneServers=" + cloneServers + ", historicalTierAliases=" + historicalTierAliases + + ", coordinatingVersions=" + coordinatingVersions + '}'; } @@ -435,7 +453,8 @@ public boolean equals(Object o) && Objects.equals(turboLoadingNodes, that.turboLoadingNodes) && Objects.equals(debugDimensions, that.debugDimensions) && Objects.equals(cloneServers, that.cloneServers) - && Objects.equals(historicalTierAliases, that.historicalTierAliases); + && Objects.equals(historicalTierAliases, that.historicalTierAliases) + && Objects.equals(coordinatingVersions, that.coordinatingVersions); } @Override @@ -460,7 +479,8 @@ public int hashCode() debugDimensions, turboLoadingNodes, cloneServers, - historicalTierAliases + historicalTierAliases, + coordinatingVersions ); } @@ -518,6 +538,7 @@ public static class Builder private Set turboLoadingNodes; private Map cloneServers; private Map> historicalTierAliases; + private Set coordinatingVersions; public Builder() { @@ -543,7 +564,8 @@ public Builder( @JsonProperty("debugDimensions") @Nullable Map debugDimensions, @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes, @JsonProperty("cloneServers") @Nullable Map cloneServers, - @JsonProperty("historicalTierAliases") @Nullable Map> historicalTierAliases + @JsonProperty("historicalTierAliases") @Nullable Map> historicalTierAliases, + @JsonProperty("coordinatingVersions") @Nullable Set coordinatingVersions ) { this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis; @@ -565,6 +587,7 @@ public Builder( this.turboLoadingNodes = turboLoadingNodes; this.cloneServers = cloneServers; this.historicalTierAliases = historicalTierAliases; + this.coordinatingVersions = coordinatingVersions; } public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis) @@ -669,6 +692,12 @@ public Builder withHistoricalTierAliases(Map> historicalTier return this; } + public Builder withCoordinatingVersions(Set coordinatingVersions) + { + this.coordinatingVersions = coordinatingVersions; + return this; + } + /** * Builds a CoordinatoryDynamicConfig using either the configured values, or * the default value if not configured. @@ -697,7 +726,8 @@ public CoordinatorDynamicConfig build() debugDimensions, turboLoadingNodes, cloneServers, - historicalTierAliases + historicalTierAliases, + coordinatingVersions ); } @@ -730,7 +760,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) valueOrDefault(debugDimensions, defaults.getDebugDimensions()), valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()), valueOrDefault(cloneServers, defaults.getCloneServers()), - valueOrDefault(historicalTierAliases, defaults.getHistoricalTierAliases()) + valueOrDefault(historicalTierAliases, defaults.getHistoricalTierAliases()), + valueOrDefault(coordinatingVersions, defaults.getCoordinatingVersions()) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index 5e3bcfd31de5..a65befa7d39c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.TreeSet; import java.util.stream.Collectors; /** @@ -46,6 +47,11 @@ public class DruidCluster private final Set realtimes; private final Map> historicals; private final Map> managedHistoricals; + /** + * Managed historicals indexed by tier and then by deployment group. Only servers with a non-null + * deploymentGroup appear here. Used to support per-group replication and handoff enforcement. + */ + private final Map>> managedHistoricalsByTierAndGroup; private final Set brokers; private final List allManagedServers; @@ -70,6 +76,7 @@ private DruidCluster( return CollectionUtils.newTreeSet(Comparator.naturalOrder(), managedServers); } ); + this.managedHistoricalsByTierAndGroup = initManagedHistoricalsByTierAndGroup(); this.brokers = Collections.unmodifiableSet(brokers); this.allManagedServers = initAllManagedServers(); } @@ -111,6 +118,73 @@ public NavigableSet getManagedHistoricalsByTier(String tier) return managedHistoricals.get(tier); } + /** + * Returns the distinct non-null deployment groups present among managed historicals in the given tier. + */ + public Set getDeploymentGroupsForTier(String tier) + { + final Map> groupMap = managedHistoricalsByTierAndGroup.get(tier); + return groupMap == null ? Collections.emptySet() : groupMap.keySet(); + } + + /** + * Returns managed historicals in the given tier that belong to the given deployment group. + * Returns an empty set if no servers for that (tier, group) pair exist. + */ + public NavigableSet getManagedHistoricalsByTierAndGroup(String tier, String group) + { + final Map> groupMap = managedHistoricalsByTierAndGroup.get(tier); + if (groupMap == null) { + return Collections.emptyNavigableSet(); + } + final NavigableSet servers = groupMap.get(group); + return servers == null ? Collections.emptyNavigableSet() : servers; + } + + /** + * Returns managed historicals in the given tier whose {@code deploymentGroup} is null or is not + * present in {@code coordinatingVersions}. These are the "uncoordinated" servers whose replica + * counts roll up to the tier-wide {@link ReplicaCountKey}; they must still be visited by drop / + * cancellation passes when the rest of the tier is operating in per-group mode. + */ + public NavigableSet getUncoordinatedManagedHistoricalsByTier( + String tier, + Set coordinatingVersions + ) + { + final NavigableSet all = managedHistoricals.get(tier); + if (all == null || all.isEmpty()) { + return Collections.emptyNavigableSet(); + } + if (coordinatingVersions == null || coordinatingVersions.isEmpty()) { + return all; + } + final NavigableSet filtered = new TreeSet<>(Comparator.naturalOrder()); + for (ServerHolder server : all) { + final String group = server.getServer().getMetadata().getDeploymentGroup(); + if (group == null || !coordinatingVersions.contains(group)) { + filtered.add(server); + } + } + return filtered; + } + + private Map>> initManagedHistoricalsByTierAndGroup() + { + final Map>> result = new HashMap<>(); + managedHistoricals.forEach((tier, servers) -> { + for (ServerHolder server : servers) { + final String group = server.getServer().getMetadata().getDeploymentGroup(); + if (group != null) { + result.computeIfAbsent(tier, t -> new HashMap<>()) + .computeIfAbsent(group, g -> new TreeSet<>(Comparator.naturalOrder())) + .add(server); + } + } + }); + return Collections.unmodifiableMap(result); + } + public List getAllManagedServers() { return allManagedServers; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index e3791c9dc233..9676a0ecac10 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -31,6 +31,9 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.Duration; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; import java.util.Set; /** @@ -60,9 +63,27 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) return params; } - params.getDruidCluster().getManagedHistoricals().forEach( - (tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run() - ); + final Set coordinatingVersions = params.getCoordinatorDynamicConfig().getCoordinatingVersions(); + params.getDruidCluster().getManagedHistoricals().forEach((tier, servers) -> { + if (coordinatingVersions.isEmpty()) { + new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run(); + } else { + // Partition tier servers by deployment group so segments never move across groups. + // Servers with no deploymentGroup form their own partition keyed under the empty string. + final Map> serversByGroup = partitionByDeploymentGroup(servers); + int remainingGroups = serversByGroup.size(); + int remainingSegmentsToMove = maxSegmentsToMove; + for (final Set groupServers : serversByGroup.values()) { + final int groupMaxSegmentsToMove = + (remainingSegmentsToMove + remainingGroups - 1) / remainingGroups; + if (groupMaxSegmentsToMove > 0) { + new TierSegmentBalancer(tier, groupServers, groupMaxSegmentsToMove, params).run(); + remainingSegmentsToMove -= groupMaxSegmentsToMove; + } + --remainingGroups; + } + } + }); CoordinatorRunStats runStats = params.getCoordinatorStats(); params.getBalancerStrategy() @@ -123,4 +144,13 @@ private Pair getNumHistoricalsAndSegments(DruidCluster cluster return Pair.of(numHistoricals, numSegments); } + private static Map> partitionByDeploymentGroup(Set servers) + { + final Map> byGroup = new LinkedHashMap<>(); + for (final ServerHolder server : servers) { + final String group = server.getServer().getMetadata().getDeploymentGroup(); + byGroup.computeIfAbsent(group == null ? "" : group, g -> new LinkedHashSet<>()).add(server); + } + return byGroup; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicaCountKey.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicaCountKey.java new file mode 100644 index 000000000000..8fc41ce779e1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicaCountKey.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import javax.annotation.Nullable; +import java.util.Set; + +/** + * Map key used by {@link SegmentReplicaCountMap}. A null {@code group} represents tier-wide + * tracking; a non-null {@code group} represents a specific deployment group within the tier + * (used when the coordinator is enforcing per-group replication via {@code coordinatingVersions}). + */ +public record ReplicaCountKey(String tier, @Nullable String group) +{ + public static ReplicaCountKey forTier(String tier) + { + return new ReplicaCountKey(tier, null); + } + + /** + * Returns a (tier, group) key when {@code group} is non-null and present in + * {@code coordinatingVersions}; otherwise a plain tier-wide key. + */ + public static ReplicaCountKey from(String tier, @Nullable String group, Set coordinatingVersions) + { + if (group != null && coordinatingVersions.contains(group)) { + return new ReplicaCountKey(tier, group); + } + return new ReplicaCountKey(tier, null); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java index cc007449b578..5123b6d84414 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java @@ -48,12 +48,25 @@ public class RoundRobinServerSelector { private final Map tierToServers = new HashMap<>(); + private final Map> tierGroupToServers = new HashMap<>(); - public RoundRobinServerSelector(DruidCluster cluster) + public RoundRobinServerSelector(DruidCluster cluster, Set coordinatingVersions) { cluster.getManagedHistoricals().forEach( (tier, servers) -> tierToServers.put(tier, new CircularServerList(servers)) ); + + if (!coordinatingVersions.isEmpty()) { + cluster.getManagedHistoricals().keySet().forEach(tier -> { + for (String group : coordinatingVersions) { + final var groupServers = cluster.getManagedHistoricalsByTierAndGroup(tier, group); + if (!groupServers.isEmpty()) { + tierGroupToServers.computeIfAbsent(tier, t -> new HashMap<>()) + .put(group, new CircularServerList(groupServers)); + } + } + }); + } } /** @@ -70,6 +83,20 @@ public Iterator getServersInTierToLoadSegment(String tier, DataSeg return new EligibleServerIterator(segment, iterator); } + /** + * Returns an iterator over servers in the given tier and deployment group that are eligible to + * load the given segment. + */ + public Iterator getServersInTierAndGroupToLoadSegment(String tier, String group, DataSegment segment) + { + final Map groupMap = tierGroupToServers.get(tier); + if (groupMap == null) { + return Collections.emptyIterator(); + } + final CircularServerList list = groupMap.get(group); + return list == null ? Collections.emptyIterator() : new EligibleServerIterator(segment, list); + } + /** * Iterator over servers in a tier that are eligible to load a given segment. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java index d89c009f4726..3d5d3c903253 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java @@ -42,6 +42,7 @@ public class SegmentLoadingConfig private final boolean useRoundRobinSegmentAssignment; private final Map> historicalTierAliases; + private final Set coordinatingVersions; /** * Creates a new SegmentLoadingConfig with recomputed coordinator config values @@ -67,7 +68,8 @@ public static SegmentLoadingConfig create(CoordinatorDynamicConfig dynamicConfig 60, true, numBalancerThreads, - dynamicConfig.getHistoricalTierAliases() + dynamicConfig.getHistoricalTierAliases(), + dynamicConfig.getCoordinatingVersions() ); } else { // Use the configured values @@ -77,7 +79,8 @@ public static SegmentLoadingConfig create(CoordinatorDynamicConfig dynamicConfig dynamicConfig.getReplicantLifetime(), dynamicConfig.isUseRoundRobinSegmentAssignment(), dynamicConfig.getBalancerComputeThreads(), - dynamicConfig.getHistoricalTierAliases() + dynamicConfig.getHistoricalTierAliases(), + dynamicConfig.getCoordinatingVersions() ); } } @@ -88,7 +91,8 @@ private SegmentLoadingConfig( int maxLifetimeInLoadQueue, boolean useRoundRobinSegmentAssignment, int balancerComputeThreads, - Map> historicalTierAliases + Map> historicalTierAliases, + Set coordinatingVersions ) { this.maxSegmentsInLoadQueue = maxSegmentsInLoadQueue; @@ -97,6 +101,7 @@ private SegmentLoadingConfig( this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment; this.balancerComputeThreads = balancerComputeThreads; this.historicalTierAliases = historicalTierAliases; + this.coordinatingVersions = coordinatingVersions; } public int getMaxSegmentsInLoadQueue() @@ -128,4 +133,9 @@ public Map> getHistoricalTierAliases() { return historicalTierAliases; } + + public Set getCoordinatingVersions() + { + return coordinatingVersions; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java index 241759f8b951..b4e2028b5f7d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java @@ -27,36 +27,40 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; /** - * Contains a mapping from tier to {@link SegmentReplicaCount}s. + * Contains a mapping from {@link ReplicaCountKey} to {@link SegmentReplicaCount}s. *

* Used by the {@link StrategicSegmentAssigner} to make assignment decisions. */ public class SegmentReplicaCountMap { - private final Map> replicaCounts = new HashMap<>(); + private final Map> replicaCounts = new HashMap<>(); - static SegmentReplicaCountMap create(DruidCluster cluster) + static SegmentReplicaCountMap create(DruidCluster cluster, Set coordinatingVersions) { final SegmentReplicaCountMap replicaCountMap = new SegmentReplicaCountMap(); - replicaCountMap.initReplicaCounts(cluster); + replicaCountMap.initReplicaCounts(cluster, coordinatingVersions); return replicaCountMap; } - private void initReplicaCounts(DruidCluster cluster) + private void initReplicaCounts(DruidCluster cluster, Set coordinatingVersions) { cluster.getManagedHistoricals().forEach( (tier, historicals) -> historicals.forEach( serverHolder -> { + final String group = serverHolder.getServer().getMetadata().getDeploymentGroup(); + final ReplicaCountKey key = ReplicaCountKey.from(tier, group, coordinatingVersions); + // Add segments already loaded on this server for (DataSegment segment : serverHolder.getServedSegments()) { - computeIfAbsent(segment.getId(), tier).incrementLoaded(); + computeIfAbsent(segment.getId(), key).incrementLoaded(); } // Add segments queued for load, drop or move on this server serverHolder.getQueuedSegments().forEach( - (segment, state) -> computeIfAbsent(segment.getId(), tier) + (segment, state) -> computeIfAbsent(segment.getId(), key) .incrementQueued(state) ); } @@ -66,7 +70,7 @@ private void initReplicaCounts(DruidCluster cluster) cluster.getBrokers().forEach(broker -> { final ImmutableDruidServer server = broker.getServer(); for (DataSegment segment : server.iterateAllSegments()) { - computeIfAbsent(segment.getId(), server.getTier()) + computeIfAbsent(segment.getId(), ReplicaCountKey.forTier(server.getTier())) .incrementLoadedOnNonHistoricalServer(); } }); @@ -74,16 +78,16 @@ private void initReplicaCounts(DruidCluster cluster) cluster.getRealtimes().forEach(realtime -> { final ImmutableDruidServer server = realtime.getServer(); for (DataSegment segment : server.iterateAllSegments()) { - computeIfAbsent(segment.getId(), server.getTier()) + computeIfAbsent(segment.getId(), ReplicaCountKey.forTier(server.getTier())) .incrementLoadedOnNonHistoricalServer(); } }); } - SegmentReplicaCount get(SegmentId segmentId, String tier) + SegmentReplicaCount get(SegmentId segmentId, ReplicaCountKey key) { SegmentReplicaCount count = replicaCounts.getOrDefault(segmentId, Collections.emptyMap()) - .get(tier); + .get(key); return count == null ? new SegmentReplicaCount() : count; } @@ -95,10 +99,10 @@ SegmentReplicaCount getTotal(SegmentId segmentId) return total; } - public SegmentReplicaCount computeIfAbsent(SegmentId segmentId, String tier) + public SegmentReplicaCount computeIfAbsent(SegmentId segmentId, ReplicaCountKey key) { return replicaCounts.computeIfAbsent(segmentId, s -> new HashMap<>()) - .computeIfAbsent(tier, t -> new SegmentReplicaCount()); + .computeIfAbsent(key, t -> new SegmentReplicaCount()); } public SegmentReplicationStatus toReplicationStatus() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java index 7121642f25ed..298a00d3e11b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java @@ -36,9 +36,9 @@ public class SegmentReplicationStatus { private final Map totalReplicaCounts; - private final Map> replicaCountsInTier; + private final Map> replicaCountsInTier; - public SegmentReplicationStatus(Map> replicaCountsInTier) + public SegmentReplicationStatus(Map> replicaCountsInTier) { this.replicaCountsInTier = ImmutableMap.copyOf(replicaCountsInTier); @@ -64,16 +64,16 @@ public Map> getTierToDatasourceToUnderReplicated( final Map> tierToUnderReplicated = new HashMap<>(); for (DataSegment segment : usedSegments) { - final Map tierToReplicaCount = replicaCountsInTier.get(segment.getId()); + final Map tierToReplicaCount = replicaCountsInTier.get(segment.getId()); if (tierToReplicaCount == null) { continue; } - tierToReplicaCount.forEach((tier, counts) -> { + tierToReplicaCount.forEach((key, counts) -> { final int underReplicated = ignoreMissingServers ? counts.missing() : counts.missingAndLoadable(); if (underReplicated >= 0) { Object2LongOpenHashMap datasourceToUnderReplicated = (Object2LongOpenHashMap) - tierToUnderReplicated.computeIfAbsent(tier, ds -> new Object2LongOpenHashMap<>()); + tierToUnderReplicated.computeIfAbsent(key.tier(), ds -> new Object2LongOpenHashMap<>()); datasourceToUnderReplicated.addTo(segment.getDataSource(), underReplicated); } }); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index d4fbfd82e316..f96a1e56418c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -34,6 +34,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.util.ArrayList; import java.util.Comparator; @@ -42,6 +43,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -65,6 +67,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler private final boolean useRoundRobinAssignment; private final Map> historicalTierAliases; + private final Set coordinatingVersions; private final Map> datasourceToInvalidLoadTiers = new HashMap<>(); private final Map tierToHistoricalCount = new HashMap<>(); @@ -84,10 +87,11 @@ public StrategicSegmentAssigner( this.cluster = cluster; this.strategy = strategy; this.loadQueueManager = loadQueueManager; - this.replicaCountMap = SegmentReplicaCountMap.create(cluster); + this.coordinatingVersions = loadingConfig.getCoordinatingVersions(); + this.replicaCountMap = SegmentReplicaCountMap.create(cluster, coordinatingVersions); this.replicationThrottler = createReplicationThrottler(cluster, loadingConfig); this.useRoundRobinAssignment = loadingConfig.isUseRoundRobinSegmentAssignment(); - this.serverSelector = useRoundRobinAssignment ? new RoundRobinServerSelector(cluster) : null; + this.serverSelector = useRoundRobinAssignment ? new RoundRobinServerSelector(cluster, coordinatingVersions) : null; this.historicalTierAliases = loadingConfig.getHistoricalTierAliases(); cluster.getManagedHistoricals().forEach( @@ -182,7 +186,9 @@ private boolean moveSegment(DataSegment segment, ServerHolder serverA, ServerHol if (serverA.isLoadingSegment(segment)) { // Cancel the load on serverA and load on serverB instead if (serverA.cancelOperation(SegmentAction.LOAD, segment)) { - int loadedCountOnTier = replicaCountMap.get(segment.getId(), tier) + final String srcGroup = serverA.getServer().getMetadata().getDeploymentGroup(); + final ReplicaCountKey moveKey = ReplicaCountKey.from(tier, srcGroup, coordinatingVersions); + int loadedCountOnTier = replicaCountMap.get(segment.getId(), moveKey) .loadedNotDropping(); if (loadedCountOnTier >= 1) { return replicateSegment(segment, serverB); @@ -233,16 +239,39 @@ public void replicateSegment(DataSegment segment, Map tierToRep final Map effectiveTierToReplicaCount = expandWithAliases(tierToReplicaCount); final Set allTiersInCluster = Sets.newHashSet(cluster.getTierNames()); + // Pre-compute active deployment groups per tier once; used in both the required-count loop and the + // update loop below to avoid calling Sets.intersection twice per tier per segment. + final Map> tierToActiveGroups = new HashMap<>(); + if (!coordinatingVersions.isEmpty()) { + for (String tier : allTiersInCluster) { + final Set groups = Sets.intersection(coordinatingVersions, cluster.getDeploymentGroupsForTier(tier)); + if (!groups.isEmpty()) { + tierToActiveGroups.put(tier, groups); + } + } + } + if (effectiveTierToReplicaCount.isEmpty()) { // Track the counts for a segment even if it requires 0 replicas on all tiers - replicaCountMap.computeIfAbsent(segment.getId(), DruidServer.DEFAULT_TIER); + replicaCountMap.computeIfAbsent(segment.getId(), ReplicaCountKey.forTier(DruidServer.DEFAULT_TIER)); } else { // Identify empty tiers and determine total required replicas effectiveTierToReplicaCount.forEach((tier, requiredReplicas) -> { reportTierCapacityStats(segment, requiredReplicas, tier); - SegmentReplicaCount replicaCount = replicaCountMap.computeIfAbsent(segment.getId(), tier); - replicaCount.setRequired(requiredReplicas, tierToHistoricalCount.getOrDefault(tier, 0)); + final Set groupsInTier = tierToActiveGroups.get(tier); + if (groupsInTier == null) { + SegmentReplicaCount replicaCount = replicaCountMap.computeIfAbsent(segment.getId(), ReplicaCountKey.forTier(tier)); + replicaCount.setRequired(requiredReplicas, tierToHistoricalCount.getOrDefault(tier, 0)); + } else { + // For each group present in this tier, set required replicas under the (tier, group) key. + for (String group : groupsInTier) { + final ReplicaCountKey key = ReplicaCountKey.from(tier, group, coordinatingVersions); + final int groupServerCount = cluster.getManagedHistoricalsByTierAndGroup(tier, group).size(); + replicaCountMap.computeIfAbsent(segment.getId(), key) + .setRequired(requiredReplicas, groupServerCount); + } + } if (!allTiersInCluster.contains(tier)) { datasourceToInvalidLoadTiers.computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>()) @@ -264,34 +293,68 @@ public void replicateSegment(DataSegment segment, Map tierToRep // Update replicas in every tier int dropsQueued = 0; for (String tier : allTiersInCluster) { + final int requiredReplicas = effectiveTierToReplicaCount.getOrDefault(tier, 0); + final Set groupsInTier = tierToActiveGroups.get(tier); + if (groupsInTier != null) { + for (String group : groupsInTier) { + dropsQueued += updateReplicasInTier( + segment, + tier, + group, + requiredReplicas, + replicaSurplus - dropsQueued + ); + } + // Also process uncoordinated servers in the tier (deploymentGroup null or not in + // coordinatingVersions). Their replicas roll up to the tier-wide ReplicaCountKey, so + // skipping this leg leaves their loads/drops unmanaged when the tier has at least one + // coordinated group active. The required replica count is 0 for the tier-wide scope when + // any group is coordinated, so this branch is effectively a "drop / cancel surplus" pass. + dropsQueued += updateReplicasInTier( + segment, + tier, + null, + 0, + replicaSurplus - dropsQueued + ); + continue; + } dropsQueued += updateReplicasInTier( segment, tier, - effectiveTierToReplicaCount.getOrDefault(tier, 0), + null, + requiredReplicas, replicaSurplus - dropsQueued ); } } /** - * Queues load or drop operations on this tier based on the required - * number of replicas and the current state. + * Queues load or drop operations on this tier (or a single deployment group within it) based on + * the required number of replicas and the current state. + *

+ * When {@code group} is non-null, the scope is restricted to that group: replica counts are read + * under the (tier, group) composite key, eligible servers are restricted to that group, and the + * round-robin selector picks only from that group's servers. Throttling and stats keys still use + * the plain tier name. *

* The {@code maxReplicasToDrop} helps to maintain the required level of * replication in the cluster. This ensures that segment read concurrency does * not suffer during a tier shift or load rule change. *

- * Returns the number of new drop operations queued on this tier. + * Returns the number of new drop operations queued. */ private int updateReplicasInTier( DataSegment segment, String tier, + @Nullable String group, int requiredReplicas, int maxReplicasToDrop ) { - final SegmentReplicaCount replicaCountOnTier - = replicaCountMap.get(segment.getId(), tier); + // group is non-null only when it is in coordinatingVersions; from() falls back to tier-wide if not. + final ReplicaCountKey key = ReplicaCountKey.from(tier, group, coordinatingVersions); + final SegmentReplicaCount replicaCountOnTier = replicaCountMap.get(segment.getId(), key); final int projectedReplicas = replicaCountOnTier.loadedNotDropping() + replicaCountOnTier.loading() @@ -305,8 +368,18 @@ private int updateReplicasInTier( return 0; } - final SegmentStatusInTier segmentStatus = - new SegmentStatusInTier(segment, cluster.getManagedHistoricalsByTier(tier)); + // When group is null but the tier has at least one coordinated deployment group, restrict + // tier-wide processing to uncoordinated servers only — otherwise group-scoped servers would + // be re-processed by the tier-wide pass and double-counted. + final NavigableSet serversInScope; + if (group != null) { + serversInScope = cluster.getManagedHistoricalsByTierAndGroup(tier, group); + } else if (!coordinatingVersions.isEmpty() && !cluster.getDeploymentGroupsForTier(tier).isEmpty()) { + serversInScope = cluster.getUncoordinatedManagedHistoricalsByTier(tier, coordinatingVersions); + } else { + serversInScope = cluster.getManagedHistoricalsByTier(tier); + } + final SegmentStatusInTier segmentStatus = new SegmentStatusInTier(segment, serversInScope); // Cancel all moves in this tier if it does not need to have replicas if (shouldCancelMoves) { @@ -324,7 +397,7 @@ private int updateReplicasInTier( int numReplicasToLoad = replicaDeficit - cancelledDrops; if (numReplicasToLoad > 0) { int numLoadedReplicas = replicaCountOnTier.loadedNotDropping() + cancelledDrops; - int numLoadsQueued = loadReplicas(numReplicasToLoad, numLoadedReplicas, segment, tier, segmentStatus); + int numLoadsQueued = loadReplicas(numReplicasToLoad, numLoadedReplicas, segment, tier, group, segmentStatus); incrementStat(Stats.Segments.ASSIGNED, segment, tier, numLoadsQueued); } } @@ -385,7 +458,7 @@ public void broadcastSegment(DataSegment segment) // Update required replica counts tierToRequiredReplicas.object2IntEntrySet().fastForEach( - entry -> replicaCountMap.computeIfAbsent(segment.getId(), entry.getKey()) + entry -> replicaCountMap.computeIfAbsent(segment.getId(), ReplicaCountKey.forTier(entry.getKey())) .setRequired(entry.getIntValue(), entry.getIntValue()) ); @@ -527,13 +600,16 @@ private int dropReplicasFromServers( } /** - * Queues load of {@code numToLoad} replicas of the segment on a tier. + * Queues load of {@code numToLoad} replicas of the segment. When {@code group} is non-null, + * the round-robin selector is restricted to servers in that group; otherwise it ranges over + * the whole tier. */ private int loadReplicas( int numToLoad, int numLoadedReplicas, DataSegment segment, String tier, + @Nullable String group, SegmentStatusInTier segmentStatus ) { @@ -550,10 +626,14 @@ private int loadReplicas( return 0; } - final Iterator serverIterator = - useRoundRobinAssignment - ? serverSelector.getServersInTierToLoadSegment(tier, segment) - : strategy.findServersToLoadSegment(segment, eligibleServers); + final Iterator serverIterator; + if (useRoundRobinAssignment) { + serverIterator = group == null + ? serverSelector.getServersInTierToLoadSegment(tier, segment) + : serverSelector.getServersInTierAndGroupToLoadSegment(tier, group, segment); + } else { + serverIterator = strategy.findServersToLoadSegment(segment, eligibleServers); + } if (!serverIterator.hasNext()) { incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No strategic server", segment, tier); return 0; diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 5a3f5decf138..7b3d5174f58c 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -56,6 +56,7 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.SegmentUpdateResponse; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; @@ -121,6 +122,7 @@ public class DataSourcesResource private final AuthorizerMapper authorizerMapper; private final DruidCoordinator coordinator; private final AuditManager auditManager; + private final CoordinatorConfigManager coordinatorConfigManager; @Inject public DataSourcesResource( @@ -130,7 +132,8 @@ public DataSourcesResource( OverlordClient overlordClient, AuthorizerMapper authorizerMapper, DruidCoordinator coordinator, - AuditManager auditManager + AuditManager auditManager, + CoordinatorConfigManager coordinatorConfigManager ) { this.serverInventoryView = serverInventoryView; @@ -140,6 +143,7 @@ public DataSourcesResource( this.authorizerMapper = authorizerMapper; this.coordinator = coordinator; this.auditManager = auditManager; + this.coordinatorConfigManager = coordinatorConfigManager; } @GET @@ -934,6 +938,7 @@ public Response isHandOffComplete( // A segment that is not eligible for load will never be handed off boolean eligibleForLoad = false; + LoadRule matchingLoadRule = null; for (Rule rule : rules) { final boolean applies; if (rule.isIntervalBased()) { @@ -947,7 +952,10 @@ public Response isHandOffComplete( applies = rule.appliesTo(segment, now); } if (applies) { - eligibleForLoad = rule instanceof LoadRule && ((LoadRule) rule).shouldMatchingSegmentBeLoaded(); + if (rule instanceof LoadRule && ((LoadRule) rule).shouldMatchingSegmentBeLoaded()) { + eligibleForLoad = true; + matchingLoadRule = (LoadRule) rule; + } break; } } @@ -973,11 +981,32 @@ public Response isHandOffComplete( Iterable servedSegmentsInInterval = prepareServedSegmentsInInterval(timeline, theInterval); - if (isSegmentLoaded(servedSegmentsInInterval, descriptor)) { - return Response.ok(true).build(); + if (!isSegmentLoaded(servedSegmentsInInterval, descriptor)) { + return Response.ok(false).build(); + } + + // When coordinatingVersions is configured, additionally verify per-group coverage. + final Set coordinatingVersions = coordinatorConfigManager == null + ? Set.of() + : coordinatorConfigManager.getCurrentDynamicConfig().getCoordinatingVersions(); + if (!coordinatingVersions.isEmpty() && matchingLoadRule != null) { + final Map> activeGroupsByTier = + computeActiveDeploymentGroupsByTier(coordinatingVersions); + for (Map.Entry tierEntry : matchingLoadRule.getTieredReplicants().entrySet()) { + if (tierEntry.getValue() <= 0) { + continue; + } + final Set activeGroups = + activeGroupsByTier.getOrDefault(tierEntry.getKey(), Set.of()); + for (String group : activeGroups) { + if (!isSegmentLoadedForDeploymentGroup(servedSegmentsInInterval, descriptor, tierEntry.getKey(), group)) { + return Response.ok(false).build(); + } + } + } } - return Response.ok(false).build(); + return Response.ok(true).build(); } catch (Exception e) { log.error(e, "Error while handling hand off check request"); @@ -1010,4 +1039,54 @@ static boolean isSegmentLoaded(Iterable servedSegments return false; } + /** + * Builds a {@code tier -> activeGroups} map in a single pass over the server inventory. Only + * segment-replication-target servers whose {@code deploymentGroup} is in + * {@code coordinatingVersions} contribute. Tiers with no active groups are absent from the + * returned map. Groups with no online servers do not block handoff. + */ + private Map> computeActiveDeploymentGroupsByTier(Set coordinatingVersions) + { + final Map> activeGroupsByTier = new HashMap<>(); + for (DruidServer server : serverInventoryView.getInventory()) { + if (!server.getType().isSegmentReplicationTarget()) { + continue; + } + final String group = server.getMetadata().getDeploymentGroup(); + if (group == null || !coordinatingVersions.contains(group)) { + continue; + } + activeGroupsByTier.computeIfAbsent(server.getTier(), t -> new HashSet<>()).add(group); + } + return activeGroupsByTier; + } + + /** + * Returns true if at least one segment-replication-target server in the given {@code tier} and + * {@code group} serves the segment described by {@code descriptor}. The tier check is required + * because the same {@code deploymentGroup} name may appear across multiple tiers, so a server + * outside the rule-required tier must not satisfy the per-tier handoff check. + */ + static boolean isSegmentLoadedForDeploymentGroup( + Iterable servedSegments, + SegmentDescriptor descriptor, + String tier, + String group + ) + { + for (ImmutableSegmentLoadInfo segmentLoadInfo : servedSegments) { + if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) + && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber() + && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 + && segmentLoadInfo.getServers().stream().anyMatch( + s -> s.isSegmentReplicationTarget() + && tier.equals(s.getTier()) + && group.equals(s.getDeploymentGroup()) + )) { + return true; + } + } + return false; + } + } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index e39aabf984c7..7409c627b3bf 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -542,6 +542,154 @@ public void testEmptyIgnoredTiersConfig() throws Exception setupViews(null, Collections.emptySet(), true); } + @Test(expected = ISE.class) + public void testEmptyWatchedDeploymentGroupsConfig() throws Exception + { + setupViewsWithDeploymentGroupConfig(Collections.emptySet(), false); + } + + @Test + public void testWatchedDeploymentGroupsExcludesNonMatchingHistorical() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + // black historical adds 1 segment; red historical's add is filtered out before reaching the latch + segmentAddedLatch = new CountDownLatch(1); + segmentRemovedLatch = new CountDownLatch(0); + + setupViewsWithDeploymentGroupConfig(ImmutableSet.of("black"), false); + + final DruidServer blackHistorical = setupDataServerWithDeploymentGroup( + ServerType.HISTORICAL, + "default_tier", + "black-historical:1", + 0, + "black" + ); + final DruidServer redHistorical = setupDataServerWithDeploymentGroup( + ServerType.HISTORICAL, + "default_tier", + "red-historical:1", + 0, + "red" + ); + + final DataSegment segment = dataSegmentWithIntervalAndVersion("2024-01-01/P1D", "v1"); + baseView.addSegment(blackHistorical, segment); + baseView.addSegment(redHistorical, segment); + + Assert.assertTrue(awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = brokerServerView.getTimeline( + new TableDataSource(segment.getDataSource()) + ).get(); + List> holders = timeline.lookup(segment.getInterval()); + Assert.assertEquals(1, holders.size()); + ServerSelector selector = holders.get(0).getObject().iterator().next().getObject(); + // Only the black historical should be selectable. + Assert.assertEquals( + List.of(blackHistorical.getMetadata()), + selector.getAllServers(CloneQueryMode.EXCLUDECLONES) + ); + } + + @Test + public void testWatchedDeploymentGroupsAllowsRealtimeByDefault() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + // both realtimes are watched (bypass), black historical adds 1 = 3 total + segmentAddedLatch = new CountDownLatch(3); + segmentRemovedLatch = new CountDownLatch(0); + + setupViewsWithDeploymentGroupConfig(ImmutableSet.of("black"), false); + + final DruidServer blackHistorical = setupDataServerWithDeploymentGroup( + ServerType.HISTORICAL, + "default_tier", + "black-historical:1", + 0, + "black" + ); + final DruidServer redPeon = setupDataServerWithDeploymentGroup( + ServerType.INDEXER_EXECUTOR, + null, + "red-peon:1", + 0, + "red" + ); + final DruidServer blackPeon = setupDataServerWithDeploymentGroup( + ServerType.INDEXER_EXECUTOR, + null, + "black-peon:1", + 0, + "black" + ); + + final DataSegment historicalSegment = dataSegmentWithIntervalAndVersion("2024-01-01/P1D", "v1"); + final DataSegment redPeonSegment = dataSegmentWithIntervalAndVersion("2024-01-02/P1D", "v1"); + final DataSegment blackPeonSegment = dataSegmentWithIntervalAndVersion("2024-01-03/P1D", "v1"); + baseView.addSegment(blackHistorical, historicalSegment); + baseView.addSegment(redPeon, redPeonSegment); + baseView.addSegment(blackPeon, blackPeonSegment); + + Assert.assertTrue(awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(awaitLatch(segmentAddedLatch)); + + Set serverNames = brokerServerView.getDruidServerMetadatas().stream() + .map(DruidServerMetadata::getName) + .collect(Collectors.toSet()); + Assert.assertTrue(serverNames.contains(blackHistorical.getName())); + Assert.assertTrue(serverNames.contains(redPeon.getName())); + Assert.assertTrue(serverNames.contains(blackPeon.getName())); + } + + @Test + public void testStrictRealtimeDeploymentGroupFilterExcludesRealtime() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + // strict mode: only black historical + black peon = 2; red peon is filtered. + segmentAddedLatch = new CountDownLatch(2); + segmentRemovedLatch = new CountDownLatch(0); + + setupViewsWithDeploymentGroupConfig(ImmutableSet.of("black"), true); + + final DruidServer blackHistorical = setupDataServerWithDeploymentGroup( + ServerType.HISTORICAL, + "default_tier", + "black-historical:1", + 0, + "black" + ); + final DruidServer redPeon = setupDataServerWithDeploymentGroup( + ServerType.INDEXER_EXECUTOR, + null, + "red-peon:1", + 0, + "red" + ); + final DruidServer blackPeon = setupDataServerWithDeploymentGroup( + ServerType.INDEXER_EXECUTOR, + null, + "black-peon:1", + 0, + "black" + ); + + baseView.addSegment(blackHistorical, dataSegmentWithIntervalAndVersion("2024-01-01/P1D", "v1")); + baseView.addSegment(redPeon, dataSegmentWithIntervalAndVersion("2024-01-02/P1D", "v1")); + baseView.addSegment(blackPeon, dataSegmentWithIntervalAndVersion("2024-01-03/P1D", "v1")); + + Assert.assertTrue(awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(awaitLatch(segmentAddedLatch)); + + Set serverNames = brokerServerView.getDruidServerMetadatas().stream() + .map(DruidServerMetadata::getName) + .collect(Collectors.toSet()); + Assert.assertTrue(serverNames.contains(blackHistorical.getName())); + Assert.assertTrue(serverNames.contains(blackPeon.getName())); + Assert.assertFalse("red peon must be filtered under strict mode", serverNames.contains(redPeon.getName())); + } + @Test public void testDifferentTierStrategiesForHistoricalAndRealtimeServers() throws Exception { @@ -651,6 +799,21 @@ private DruidServer setupDruidServer(ServerType serverType, String tier, String return druidServer; } + private DruidServer setupDataServerWithDeploymentGroup( + ServerType serverType, + String tier, + String name, + int priority, + String deploymentGroup + ) + { + final DruidServer druidServer = new DruidServer( + new DruidServerMetadata(name, name, null, 1000000, null, serverType, tier, priority, deploymentGroup) + ); + baseView.addServer(druidServer); + return druidServer; + } + private boolean awaitLatch(CountDownLatch latch) throws InterruptedException { return latch.await(AWAIT_SECONDS, TimeUnit.SECONDS); @@ -701,6 +864,29 @@ private void setupViews(TierSelectorStrategy historicalStrategy, TierSelectorStr setupViews(historicalStrategy, realtimeStrategy, new BrokerSegmentWatcherConfig()); } + private void setupViewsWithDeploymentGroupConfig(Set watchedDeploymentGroups, boolean strict) + throws InterruptedException + { + setupViews( + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + new BrokerSegmentWatcherConfig() + { + @Override + public Set getWatchedDeploymentGroups() + { + return watchedDeploymentGroups; + } + + @Override + public boolean isStrictRealtimeDeploymentGroupFilter() + { + return strict; + } + } + ); + } + private void setupViews(Set watchedTiers, Set ignoredTiers, boolean watchRealtimeTasks) throws InterruptedException { setupViews( diff --git a/server/src/test/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProviderTest.java b/server/src/test/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProviderTest.java index 05fb11e4b620..9aa5fc1f9635 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProviderTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/ServiceAnnouncingChatHandlerProviderTest.java @@ -88,12 +88,8 @@ private void testRegistrationWithAnnounce(boolean useThreeArgConstructor) ChatHandler testChatHandler = new TestChatHandler(); Capture captured = Capture.newInstance(); - EasyMock.expect(node.getHost()).andReturn(TEST_HOST); - EasyMock.expect(node.isBindOnHost()).andReturn(false); - EasyMock.expect(node.getPlaintextPort()).andReturn(TEST_PORT); - EasyMock.expect(node.isEnablePlaintextPort()).andReturn(true); - EasyMock.expect(node.isEnableTlsPort()).andReturn(false); - EasyMock.expect(node.getTlsPort()).andReturn(-1); + final DruidNode serviceNode = new DruidNode(TEST_SERVICE_NAME, TEST_HOST, false, TEST_PORT, null, true, false); + EasyMock.expect(node.withService(TEST_SERVICE_NAME)).andReturn(serviceNode); serviceAnnouncer.announce(EasyMock.capture(captured)); replayAll(); @@ -117,12 +113,7 @@ private void testRegistrationWithAnnounce(boolean useThreeArgConstructor) captured.reset(); resetAll(); - EasyMock.expect(node.getHost()).andReturn(TEST_HOST); - EasyMock.expect(node.isBindOnHost()).andReturn(false); - EasyMock.expect(node.getPlaintextPort()).andReturn(TEST_PORT); - EasyMock.expect(node.isEnablePlaintextPort()).andReturn(true); - EasyMock.expect(node.getTlsPort()).andReturn(-1); - EasyMock.expect(node.isEnableTlsPort()).andReturn(false); + EasyMock.expect(node.withService(TEST_SERVICE_NAME)).andReturn(serviceNode); serviceAnnouncer.unannounce(EasyMock.capture(captured)); replayAll(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java index 656862d70222..29c2f2537321 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java @@ -21,6 +21,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.timeline.DataSegment; @@ -129,4 +130,52 @@ public void testIsEmpty() Assert.assertFalse(clusterBuilder.build().isEmpty()); Assert.assertTrue(emptyCluster.isEmpty()); } + + @Test + public void testGetDeploymentGroupsForTier_multipleGroups() + { + final ServerHolder redServer = serverHolderWithGroup("tier1", "red"); + final ServerHolder blueServer = serverHolderWithGroup("tier1", "blue"); + final DruidCluster cluster = DruidCluster.builder().add(redServer).add(blueServer).build(); + + final Set groups = cluster.getDeploymentGroupsForTier("tier1"); + Assert.assertEquals(Set.of("red", "blue"), groups); + } + + @Test + public void testGetDeploymentGroupsForTier_nullGroupExcluded() + { + // Servers without a deploymentGroup are not returned by getDeploymentGroupsForTier + final ServerHolder ungrouped = new ServerHolder( + new DruidServer("h1", "h1", null, 100L, null, ServerType.HISTORICAL, "tier1", 0) + .toImmutableDruidServer(), + new TestLoadQueuePeon() + ); + final DruidCluster cluster = DruidCluster.builder().add(ungrouped).build(); + + Assert.assertTrue(cluster.getDeploymentGroupsForTier("tier1").isEmpty()); + } + + @Test + public void testGetManagedHistoricalsByTierAndGroup() + { + final ServerHolder redServer = serverHolderWithGroup("tier1", "red"); + final ServerHolder blueServer = serverHolderWithGroup("tier1", "blue"); + final DruidCluster cluster = DruidCluster.builder().add(redServer).add(blueServer).build(); + + Assert.assertEquals(Set.of(redServer), cluster.getManagedHistoricalsByTierAndGroup("tier1", "red")); + Assert.assertEquals(Set.of(blueServer), cluster.getManagedHistoricalsByTierAndGroup("tier1", "blue")); + Assert.assertTrue(cluster.getManagedHistoricalsByTierAndGroup("tier1", "green").isEmpty()); + } + + private static ServerHolder serverHolderWithGroup(String tier, String group) + { + final DruidServerMetadata metadata = new DruidServerMetadata( + group + "-host", group + "-host", null, 100L, null, ServerType.HISTORICAL, tier, 0, group + ); + return new ServerHolder( + new DruidServer(metadata).toImmutableDruidServer(), + new TestLoadQueuePeon() + ); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java index 17f4757650fe..e2bf13393ffe 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java @@ -19,11 +19,13 @@ package org.apache.druid.server.coordinator.duty; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCluster; @@ -286,6 +288,36 @@ public void testMaxSegmentsToMoveIsHonored() Assert.assertEquals(1, holder3.getPeon().getSegmentsToLoad().size()); } + @Test + public void testMaxSegmentsToMoveIsHonoredAcrossDeploymentGroups() + { + final ServerHolder redHolder1 = createHolder(createHistorical("red1", "red"), segment1, segment2); + final ServerHolder redHolder2 = createHolder(createHistorical("red2", "red")); + final ServerHolder blueHolder1 = createHolder(createHistorical("blue1", "blue"), segment3, segment4); + final ServerHolder blueHolder2 = createHolder(createHistorical("blue2", "blue")); + + final CoordinatorDynamicConfig dynamicConfig = + CoordinatorDynamicConfig.builder() + .withSmartSegmentLoading(false) + .withMaxSegmentsToMove(1) + .withCoordinatingVersions(ImmutableSet.of("red", "blue")) + .build(); + final DruidCoordinatorRuntimeParams params = + defaultRuntimeParamsBuilder(redHolder1, redHolder2, blueHolder1, blueHolder2) + .withDynamicConfigs(dynamicConfig) + .build(); + + final CoordinatorRunStats stats = runBalancer(params); + final long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2"); + + Assert.assertEquals(1L, totalMoved); + Assert.assertEquals( + 1, + redHolder2.getPeon().getSegmentsToLoad().size() + blueHolder2.getPeon().getSegmentsToLoad().size() + ); + } + @Test public void testMoveForMultipleDatasources() { @@ -310,6 +342,73 @@ public void testMoveForMultipleDatasources() Assert.assertEquals(2L, totalMoved); } + @Test + public void testBalancerDoesNotMoveSegmentsAcrossDeploymentGroups() + { + // Two red servers (one full, one empty) and two blue servers (one full, one empty). + // With coordinatingVersions configured, balancing must stay within each group: + // segments from server1 may only move to server2 (red), segments from server3 may only move to server4 (blue). + final DruidServer redServer1 = new DruidServer( + new DruidServerMetadata("red1", "red1", null, 100L, null, ServerType.HISTORICAL, "normal", 0, "red") + ); + final DruidServer redServer2 = new DruidServer( + new DruidServerMetadata("red2", "red2", null, 100L, null, ServerType.HISTORICAL, "normal", 0, "red") + ); + final DruidServer blueServer1 = new DruidServer( + new DruidServerMetadata("blue1", "blue1", null, 100L, null, ServerType.HISTORICAL, "normal", 0, "blue") + ); + final DruidServer blueServer2 = new DruidServer( + new DruidServerMetadata("blue2", "blue2", null, 100L, null, ServerType.HISTORICAL, "normal", 0, "blue") + ); + + // Put segment1+segment2 on redServer1 only, and segment3+segment4 on blueServer1 only. + final ServerHolder redHolder1 = createHolder(redServer1, segment1, segment2); + final ServerHolder redHolder2 = createHolder(redServer2); + final ServerHolder blueHolder1 = createHolder(blueServer1, segment3, segment4); + final ServerHolder blueHolder2 = createHolder(blueServer2); + + final CoordinatorDynamicConfig dynamicConfig = + CoordinatorDynamicConfig.builder() + .withSmartSegmentLoading(false) + .withMaxSegmentsToMove(4) + .withCoordinatingVersions(ImmutableSet.of("red", "blue")) + .build(); + DruidCoordinatorRuntimeParams params = + defaultRuntimeParamsBuilder(redHolder1, redHolder2, blueHolder1, blueHolder2) + .withDynamicConfigs(dynamicConfig) + .withBalancerStrategy(balancerStrategy) + .withSegmentAssignerUsing(loadQueueManager) + .build(); + + runBalancer(params); + + // Segments queued for load on red2 must only contain datasource1 (segment1/segment2 originated there). + for (DataSegment loaded : redHolder2.getPeon().getSegmentsToLoad()) { + Assert.assertEquals( + "Red destination must not receive blue segments", + "datasource1", + loaded.getDataSource() + ); + } + // Segments queued for load on blue2 must only contain datasource2 (segment3/segment4 originated there). + for (DataSegment loaded : blueHolder2.getPeon().getSegmentsToLoad()) { + Assert.assertEquals( + "Blue destination must not receive red segments", + "datasource2", + loaded.getDataSource() + ); + } + // Red servers should never receive blue segments and vice versa. + Assert.assertTrue( + redHolder1.getPeon().getSegmentsToLoad().stream() + .noneMatch(s -> "datasource2".equals(s.getDataSource())) + ); + Assert.assertTrue( + blueHolder1.getPeon().getSegmentsToLoad().stream() + .noneMatch(s -> "datasource1".equals(s.getDataSource())) + ); + } + private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params) { params = new BalanceSegments(Duration.standardMinutes(1)).run(params); @@ -364,6 +463,13 @@ private ServerHolder createHolder( ); } + private DruidServer createHistorical(String name, String deploymentGroup) + { + return new DruidServer( + new DruidServerMetadata(name, name, null, 100L, null, ServerType.HISTORICAL, "normal", 0, deploymentGroup) + ); + } + private DataSegment createHourlySegment(String datasource, DateTime start, String version) { return new DataSegment( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java index 131750d9581b..2660a67dda4f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -28,6 +28,7 @@ import org.apache.druid.client.ImmutableDruidServerTests; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinator; @@ -289,6 +290,9 @@ private static void mockDruidServer( EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes(); EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).anyTimes(); EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).anyTimes(); + EasyMock.expect(druidServer.getMetadata()) + .andReturn(new DruidServerMetadata(name, name, null, maxSize, null, serverType, tier, 0, null)) + .anyTimes(); ImmutableDruidServerTests.expectSegments(druidServer, segments); EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes(); EasyMock.expect(druidServer.getType()).andReturn(serverType).anyTimes(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java index cbec62ee0e21..f432fe8e0693 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.Iterator; +import java.util.Set; public class RoundRobinServerSelectorTest { @@ -64,7 +65,7 @@ public void testSingleIterator() .builder() .addTier(TIER, serverXL, serverM, serverXS, serverL) .build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, Set.of()); // Verify that only eligible servers are returned in order of available size Iterator pickedServers = selector.getServersInTierToLoadSegment(TIER, segment); @@ -91,7 +92,7 @@ public void testNextIteratorContinuesFromSamePosition() .builder() .addTier(TIER, serverXL, serverM, serverXS, serverL) .build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, Set.of()); // Verify that only eligible servers are returned in order of available size Iterator pickedServers = selector.getServersInTierToLoadSegment(TIER, segment); @@ -113,7 +114,7 @@ public void testNextIteratorContinuesFromSamePosition() public void testNoServersInTier() { DruidCluster cluster = DruidCluster.builder().addTier(TIER).build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, Set.of()); Iterator eligibleServers = selector.getServersInTierToLoadSegment(TIER, segment); Assert.assertFalse(eligibleServers.hasNext()); @@ -129,7 +130,7 @@ public void testNoEligibleServerInTier() createHistorical("server3", 10), createHistorical("server4", 20) ).build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, Set.of()); // Verify that only eligible servers are returned in order of available size Iterator eligibleServers = selector.getServersInTierToLoadSegment(TIER, segment); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssignerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssignerTest.java new file mode 100644 index 000000000000..a6cf85fa458e --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssignerTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.segment.TestDataSource; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCluster; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy; +import org.apache.druid.server.coordinator.rules.ForeverLoadRule; +import org.apache.druid.server.coordinator.rules.LoadRule; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tests for deployment-group-aware segment assignment in {@link StrategicSegmentAssigner}. + */ +@RunWith(Parameterized.class) +public class StrategicSegmentAssignerTest +{ + private static final String TIER = "tier1"; + private static final String GROUP_RED = "red"; + private static final String GROUP_BLUE = "blue"; + + private final boolean useRoundRobinAssignment; + private final AtomicInteger serverId = new AtomicInteger(); + + private SegmentLoadQueueManager loadQueueManager; + private ListeningExecutorService exec; + private CostBalancerStrategy balancerStrategy; + + @Parameterized.Parameters(name = "useRoundRobin = {0}") + public static List getTestParams() + { + return Arrays.asList(true, false); + } + + public StrategicSegmentAssignerTest(boolean useRoundRobinAssignment) + { + this.useRoundRobinAssignment = useRoundRobinAssignment; + } + + @Before + public void setUp() + { + loadQueueManager = new SegmentLoadQueueManager(null, null); + exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "StrategicSegmentAssignerTest-%d")); + balancerStrategy = new CostBalancerStrategy(exec); + } + + @After + public void tearDown() + { + exec.shutdown(); + } + + @Test + public void testSingleGroupTier_noCoordinatingVersions_behaviorUnchanged() + { + final DataSegment segment = createSegment(); + final ServerHolder server1 = createServer(TIER, null); + final ServerHolder server2 = createServer(TIER, null); + DruidCluster cluster = DruidCluster.builder().addTier(TIER, server1, server2).build(); + + CoordinatorRunStats stats = runRule( + loadForever(ImmutableMap.of(TIER, 1)), + segment, + cluster, + Collections.emptySet(), + segment + ); + + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER, TestDataSource.WIKI)); + Assert.assertEquals(1, server1.getLoadingSegments().size() + server2.getLoadingSegments().size()); + } + + @Test + public void testTwoGroupTier_eachGroupGetsRequiredReplicas() + { + // Two groups with two servers each; rule requires 1 replica in the tier. + // With coordinatingVersions active, each group should receive 1 replica independently. + final DataSegment segment = createSegment(); + final ServerHolder redServer1 = createServer(TIER, GROUP_RED); + final ServerHolder redServer2 = createServer(TIER, GROUP_RED); + final ServerHolder blueServer1 = createServer(TIER, GROUP_BLUE); + final ServerHolder blueServer2 = createServer(TIER, GROUP_BLUE); + DruidCluster cluster = DruidCluster + .builder() + .addTier(TIER, redServer1, redServer2, blueServer1, blueServer2) + .build(); + + CoordinatorRunStats stats = runRule( + loadForever(ImmutableMap.of(TIER, 1)), + segment, + cluster, + Set.of(GROUP_RED, GROUP_BLUE), + segment + ); + + // 1 replica per group = 2 total assignments, both reported under the same tier. + Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER, TestDataSource.WIKI)); + } + + @Test + public void testTwoGroupTier_segmentAlreadyLoadedInBothGroups_noAdditionalAssignment() + { + final DataSegment segment = createSegment(); + final ServerHolder redServer = createServer(TIER, GROUP_RED, segment); + final ServerHolder blueServer = createServer(TIER, GROUP_BLUE, segment); + DruidCluster cluster = DruidCluster.builder().addTier(TIER, redServer, blueServer).build(); + + CoordinatorRunStats stats = runRule( + loadForever(ImmutableMap.of(TIER, 1)), + segment, + cluster, + Set.of(GROUP_RED, GROUP_BLUE), + segment + ); + + Assert.assertFalse(stats.hasStat(Stats.Segments.ASSIGNED)); + } + + @Test + public void testTwoGroupTier_segmentLoadedOnlyInOneGroup_assignsToMissingGroup() + { + final DataSegment segment = createSegment(); + // Red already has the segment; blue does not. + final ServerHolder redServer = createServer(TIER, GROUP_RED, segment); + final ServerHolder blueServer = createServer(TIER, GROUP_BLUE); + DruidCluster cluster = DruidCluster.builder().addTier(TIER, redServer, blueServer).build(); + + CoordinatorRunStats stats = runRule( + loadForever(ImmutableMap.of(TIER, 1)), + segment, + cluster, + Set.of(GROUP_RED, GROUP_BLUE), + segment + ); + + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER, TestDataSource.WIKI)); + Assert.assertEquals(0, redServer.getLoadingSegments().size()); + Assert.assertEquals(1, blueServer.getLoadingSegments().size()); + } + + @Test + public void testTwoGroupTier_onlyOneGroupInCoordinatingVersions_tierWideBehaviorForOther() + { + // Only "red" is in coordinatingVersions. "blue" servers exist but are not coordinated, + // so the tier falls back to a single tier-wide replica count of 1. + final DataSegment segment = createSegment(); + final ServerHolder redServer = createServer(TIER, GROUP_RED); + final ServerHolder blueServer = createServer(TIER, GROUP_BLUE); + DruidCluster cluster = DruidCluster.builder().addTier(TIER, redServer, blueServer).build(); + + CoordinatorRunStats stats = runRule( + loadForever(ImmutableMap.of(TIER, 1)), + segment, + cluster, + Set.of(GROUP_RED), // only red is coordinated — intersection has 1 entry, no multi-group expansion + segment + ); + + // Intersection of coordinatingVersions and tier groups yields only {"red"}, which is a single + // group — same as the tier-wide path. One replica is assigned across the whole tier. + Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER, TestDataSource.WIKI)); + } + + private CoordinatorRunStats runRule( + LoadRule rule, + DataSegment segment, + DruidCluster cluster, + Set coordinatingVersions, + DataSegment... usedSegments + ) + { + DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams + .builder() + .withDruidCluster(cluster) + .withBalancerStrategy(balancerStrategy) + .withUsedSegments(usedSegments) + .withDynamicConfigs( + CoordinatorDynamicConfig.builder() + .withSmartSegmentLoading(false) + .withUseRoundRobinSegmentAssignment(useRoundRobinAssignment) + .withCoordinatingVersions(coordinatingVersions) + .build() + ) + .withSegmentAssignerUsing(loadQueueManager) + .build(); + + rule.run(segment, params.getSegmentAssigner()); + return params.getCoordinatorStats(); + } + + private ServerHolder createServer(String tier, String deploymentGroup, DataSegment... loadedSegments) + { + final int id = serverId.incrementAndGet(); + final String name = "hist_" + tier + "_" + id; + DruidServer server = new DruidServer( + new DruidServerMetadata(name, name, null, 10L << 30, null, ServerType.HISTORICAL, tier, 0, deploymentGroup) + ); + for (DataSegment segment : loadedSegments) { + server.addDataSegment(segment); + } + return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon()); + } + + private static LoadRule loadForever(Map tieredReplicants) + { + return new ForeverLoadRule(tieredReplicants, null); + } + + private static DataSegment createSegment() + { + return DataSegment.builder() + .dataSource(TestDataSource.WIKI) + .interval(Intervals.of("2024-01-01/2024-01-02")) + .version("1") + .shardSpec(NoneShardSpec.instance()) + .size(100) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index f56344ec12ec..ccd3a51177e4 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -281,6 +281,7 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() null, ImmutableSet.of("host1"), null, + null, null ); Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); @@ -308,6 +309,7 @@ public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegme null, ImmutableSet.of("host1"), null, + null, null ); Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 8b117834d363..332e9922ad0d 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -53,6 +53,8 @@ import org.apache.druid.segment.TestDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.CannotMatchBehavior; import org.apache.druid.server.coordinator.rules.ExactProjectionPartialLoadMatcher; @@ -177,7 +179,8 @@ public void setUp() overlordClient, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - auditManager + auditManager, + null ); } @@ -289,7 +292,7 @@ public Access authorize(AuthenticationResult authenticationResult1, Resource res }; DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, overlordClient, authMapper, null, auditManager); + new DataSourcesResource(inventoryView, null, null, overlordClient, authMapper, null, auditManager, null); Response response = dataSourcesResource.getQueryableDataSources("full", null, request); Set result = (Set) response.getEntity(); @@ -667,7 +670,8 @@ public void testIsHandOffComplete() null, null, null, - auditManager + auditManager, + null ); // test dropped @@ -752,7 +756,8 @@ public void testIsHandOffCompleteSegmentNotInMetadataReturnsTrue() null, null, null, - auditManager + auditManager, + null ); EasyMock.expect(databaseRuleManager.getRulesWithDefault(TestDataSource.WIKI)) .andReturn(ImmutableList.of(partialRule)) @@ -794,7 +799,8 @@ public void testIsHandOffCompleteForcesMetadataRefreshOnSnapshotMiss() null, null, null, - auditManager + auditManager, + null ); String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z"; DataSegment segment = buildHandoffSegment(TestDataSource.WIKI, Intervals.of(interval), "v1", 1); @@ -842,7 +848,8 @@ public void testIsHandOffCompleteWithPartialLoadRuleFallThrough() null, null, null, - auditManager + auditManager, + null ); String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z"; @@ -893,7 +900,8 @@ public void testIsHandOffCompleteWithPartialLoadRuleMatcherResolves() null, null, null, - auditManager + auditManager, + null ); String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z"; @@ -1882,7 +1890,7 @@ public void testGetDatasourceLoadstatusFull() EasyMock.replay(segmentsMetadataManager, druidCoordinator); DataSourcesResource dataSourcesResource = - new DataSourcesResource(null, segmentsMetadataManager, null, null, null, druidCoordinator, auditManager); + new DataSourcesResource(null, segmentsMetadataManager, null, null, null, druidCoordinator, auditManager, null); Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, "full", null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1941,7 +1949,7 @@ public void testGetDatasourceLoadstatusFullAndComputeUsingClusterView() EasyMock.replay(segmentsMetadataManager, druidCoordinator); DataSourcesResource dataSourcesResource = - new DataSourcesResource(null, segmentsMetadataManager, null, null, null, druidCoordinator, auditManager); + new DataSourcesResource(null, segmentsMetadataManager, null, null, null, druidCoordinator, auditManager, null); Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, "full", "computeUsingClusterView"); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1982,6 +1990,348 @@ private DataSegment createSegment(Interval interval, String version, int partiti ); } + // --- Deployment group tests --- + + @Test + public void testIsSegmentLoadedForDeploymentGroup_segmentServedByMatchingGroup() + { + final Interval interval = Intervals.of("2011-04-01/2011-04-02"); + final DruidServerMetadata redServer = createServerMetadataWithGroup("red-host", ServerType.HISTORICAL, "red"); + Assert.assertTrue( + DataSourcesResource.isSegmentLoadedForDeploymentGroup( + Collections.singletonList(new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(redServer) + )), + new SegmentDescriptor(interval, "v1", 1), + "tier1", + "red" + ) + ); + } + + @Test + public void testIsSegmentLoadedForDeploymentGroup_segmentServedByWrongGroup() + { + final Interval interval = Intervals.of("2011-04-01/2011-04-02"); + final DruidServerMetadata blueServer = createServerMetadataWithGroup("blue-host", ServerType.HISTORICAL, "blue"); + Assert.assertFalse( + DataSourcesResource.isSegmentLoadedForDeploymentGroup( + Collections.singletonList(new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(blueServer) + )), + new SegmentDescriptor(interval, "v1", 1), + "tier1", + "red" + ) + ); + } + + @Test + public void testIsSegmentLoadedForDeploymentGroup_nonReplicationTargetIgnored() + { + final Interval interval = Intervals.of("2011-04-01/2011-04-02"); + // A realtime server in "red" group should not satisfy the check since it's not a replication target + final DruidServerMetadata realtimeRed = createServerMetadataWithGroup("rt-host", ServerType.REALTIME, "red"); + Assert.assertFalse( + DataSourcesResource.isSegmentLoadedForDeploymentGroup( + Collections.singletonList(new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(realtimeRed) + )), + new SegmentDescriptor(interval, "v1", 1), + "tier1", + "red" + ) + ); + } + + @Test + public void testIsHandOffCompleteWithDeploymentGroups_onlyOneGroupServes_returnsFalse() + { + // Timeline only contains the segment served by "red"; "blue" is online but doesn't have it. + final MetadataRuleManager ruleManager = EasyMock.createMock(MetadataRuleManager.class); + final CoordinatorConfigManager configManager = EasyMock.createMock(CoordinatorConfigManager.class); + final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder() + .withCoordinatingVersions(ImmutableSet.of("red", "blue")) + .build(); + EasyMock.expect(configManager.getCurrentDynamicConfig()).andReturn(dynamicConfig).anyTimes(); + + final Rule loadRule = new IntervalLoadRule( + Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), + ImmutableMap.of("tier1", 1), + null + ); + EasyMock.expect(ruleManager.getRulesWithDefault(TestDataSource.WIKI)) + .andReturn(ImmutableList.of(loadRule)) + .once(); + + final String interval = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z"; + final SegmentLoadInfo redLoad = new SegmentLoadInfo(createSegment(Intervals.of(interval), "v1", 1)); + redLoad.addServer(createServerMetadataWithGroup("red-host", ServerType.HISTORICAL, "red")); + + final VersionedIntervalTimeline timeline = + new VersionedIntervalTimeline<>(null) + { + @Override + public List> lookupWithIncompletePartitions(Interval i) + { + final PartitionHolder holder = + new PartitionHolder<>(new NumberedPartitionChunk<>(1, 1, redLoad)); + return ImmutableList.of(new TimelineObjectHolder<>(Intervals.of(interval), "v1", holder)); + } + }; + + final DruidServer redServer = EasyMock.createMock(DruidServer.class); + EasyMock.expect(redServer.getTier()).andReturn("tier1").anyTimes(); + EasyMock.expect(redServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes(); + EasyMock.expect(redServer.getMetadata()).andReturn( + createServerMetadataWithGroup("red-host", ServerType.HISTORICAL, "red") + ).anyTimes(); + final DruidServer blueServer = EasyMock.createMock(DruidServer.class); + EasyMock.expect(blueServer.getTier()).andReturn("tier1").anyTimes(); + EasyMock.expect(blueServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes(); + EasyMock.expect(blueServer.getMetadata()).andReturn( + createServerMetadataWithGroup("blue-host", ServerType.HISTORICAL, "blue") + ).anyTimes(); + + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(timeline) + .once(); + EasyMock.expect(inventoryView.getInventory()) + .andReturn(ImmutableList.of(redServer, blueServer)) + .once(); + + EasyMock.replay(ruleManager, configManager, inventoryView, redServer, blueServer); + + final DataSourcesResource resource = new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + ruleManager, + null, + null, + null, + auditManager, + configManager + ); + + final Response response = resource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Assert.assertFalse((boolean) response.getEntity()); + + EasyMock.verify(ruleManager, configManager, inventoryView, redServer, blueServer); + } + + @Test + public void testIsHandOffCompleteWithDeploymentGroups_bothGroupsServe_returnsTrue() + { + // Both "red" and "blue" servers serve the segment — handoff is complete. + final MetadataRuleManager ruleManager = EasyMock.createMock(MetadataRuleManager.class); + final CoordinatorConfigManager configManager = EasyMock.createMock(CoordinatorConfigManager.class); + final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder() + .withCoordinatingVersions(ImmutableSet.of("red", "blue")) + .build(); + EasyMock.expect(configManager.getCurrentDynamicConfig()).andReturn(dynamicConfig).anyTimes(); + + final Rule loadRule = new IntervalLoadRule( + Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), + ImmutableMap.of("tier1", 1), + null + ); + EasyMock.expect(ruleManager.getRulesWithDefault(TestDataSource.WIKI)) + .andReturn(ImmutableList.of(loadRule)) + .once(); + + final String interval = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z"; + // Single SegmentLoadInfo served by both groups — models a segment replicated to both fleets. + final SegmentLoadInfo bothLoad = new SegmentLoadInfo(createSegment(Intervals.of(interval), "v1", 1)); + bothLoad.addServer(createServerMetadataWithGroup("red-host", ServerType.HISTORICAL, "red")); + bothLoad.addServer(createServerMetadataWithGroup("blue-host", ServerType.HISTORICAL, "blue")); + + final VersionedIntervalTimeline timeline = + new VersionedIntervalTimeline<>(null) + { + @Override + public List> lookupWithIncompletePartitions(Interval i) + { + final PartitionHolder holder = + new PartitionHolder<>(new NumberedPartitionChunk<>(1, 1, bothLoad)); + return ImmutableList.of(new TimelineObjectHolder<>(Intervals.of(interval), "v1", holder)); + } + }; + + final DruidServer redServer = EasyMock.createMock(DruidServer.class); + EasyMock.expect(redServer.getTier()).andReturn("tier1").anyTimes(); + EasyMock.expect(redServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes(); + EasyMock.expect(redServer.getMetadata()).andReturn( + createServerMetadataWithGroup("red-host", ServerType.HISTORICAL, "red") + ).anyTimes(); + final DruidServer blueServer = EasyMock.createMock(DruidServer.class); + EasyMock.expect(blueServer.getTier()).andReturn("tier1").anyTimes(); + EasyMock.expect(blueServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes(); + EasyMock.expect(blueServer.getMetadata()).andReturn( + createServerMetadataWithGroup("blue-host", ServerType.HISTORICAL, "blue") + ).anyTimes(); + + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(timeline) + .once(); + EasyMock.expect(inventoryView.getInventory()) + .andReturn(ImmutableList.of(redServer, blueServer)) + .once(); + + EasyMock.replay(ruleManager, configManager, inventoryView, redServer, blueServer); + + final DataSourcesResource resource = new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + ruleManager, + null, + null, + null, + auditManager, + configManager + ); + + final Response response = resource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Assert.assertTrue((boolean) response.getEntity()); + + EasyMock.verify(ruleManager, configManager, inventoryView, redServer, blueServer); + } + + @Test + public void testIsHandOffCompleteWithDeploymentGroups_groupWithNoServersDoesNotBlock() + { + // "red" group has a server but "blue" has none — blue should not block handoff + final MetadataRuleManager ruleManager = EasyMock.createMock(MetadataRuleManager.class); + final CoordinatorConfigManager configManager = EasyMock.createMock(CoordinatorConfigManager.class); + final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder() + .withCoordinatingVersions(ImmutableSet.of("red", "blue")) + .build(); + EasyMock.expect(configManager.getCurrentDynamicConfig()).andReturn(dynamicConfig).anyTimes(); + + final Rule loadRule = new IntervalLoadRule( + Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), + ImmutableMap.of("tier1", 1), + null + ); + EasyMock.expect(ruleManager.getRulesWithDefault(TestDataSource.WIKI)) + .andReturn(ImmutableList.of(loadRule)) + .once(); + + final String interval = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z"; + final SegmentLoadInfo redLoad = new SegmentLoadInfo(createSegment(Intervals.of(interval), "v1", 1)); + redLoad.addServer(createServerMetadataWithGroup("red-host", ServerType.HISTORICAL, "red")); + + final VersionedIntervalTimeline timeline = + new VersionedIntervalTimeline<>(null) + { + @Override + public List> lookupWithIncompletePartitions(Interval i) + { + final PartitionHolder holder = + new PartitionHolder<>(new NumberedPartitionChunk<>(1, 1, redLoad)); + return ImmutableList.of(new TimelineObjectHolder<>(Intervals.of(interval), "v1", holder)); + } + }; + + // Only "red" server is online; "blue" is listed in coordinatingVersions but has no online server + final DruidServer redServer = EasyMock.createMock(DruidServer.class); + EasyMock.expect(redServer.getTier()).andReturn("tier1").anyTimes(); + EasyMock.expect(redServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes(); + EasyMock.expect(redServer.getMetadata()).andReturn( + createServerMetadataWithGroup("red-host", ServerType.HISTORICAL, "red") + ).anyTimes(); + + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(timeline) + .once(); + EasyMock.expect(inventoryView.getInventory()) + .andReturn(ImmutableList.of(redServer)) + .once(); + + EasyMock.replay(ruleManager, configManager, inventoryView, redServer); + + final DataSourcesResource resource = new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + ruleManager, + null, + null, + null, + auditManager, + configManager + ); + + // Only "red" active; "red" has segment. "blue" absent — handoff complete. + final Response response = resource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Assert.assertTrue((boolean) response.getEntity()); + + EasyMock.verify(ruleManager, configManager, inventoryView, redServer); + } + + @Test + public void testIsHandOffCompleteWithDeploymentGroups_emptyGroupsDefaultBehavior() + { + // coordinatingVersions is empty — existing single-group behavior unchanged + final MetadataRuleManager ruleManager = EasyMock.createMock(MetadataRuleManager.class); + final CoordinatorConfigManager configManager = EasyMock.createMock(CoordinatorConfigManager.class); + final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder().build(); + EasyMock.expect(configManager.getCurrentDynamicConfig()).andReturn(dynamicConfig).anyTimes(); + + final Rule loadRule = new IntervalLoadRule( + Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), + ImmutableMap.of("tier1", 1), + null + ); + EasyMock.expect(ruleManager.getRulesWithDefault(TestDataSource.WIKI)) + .andReturn(ImmutableList.of(loadRule)) + .once(); + + final String interval = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z"; + final SegmentLoadInfo segLoad = new SegmentLoadInfo(createSegment(Intervals.of(interval), "v1", 1)); + segLoad.addServer(createHistoricalServerMetadata("hist-host")); + + final VersionedIntervalTimeline timeline = + new VersionedIntervalTimeline<>(null) + { + @Override + public List> lookupWithIncompletePartitions(Interval i) + { + final PartitionHolder holder = + new PartitionHolder<>(new NumberedPartitionChunk<>(1, 1, segLoad)); + return ImmutableList.of(new TimelineObjectHolder<>(Intervals.of(interval), "v1", holder)); + } + }; + + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(timeline) + .once(); + + EasyMock.replay(ruleManager, configManager, inventoryView); + + final DataSourcesResource resource = new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + ruleManager, + null, + null, + null, + auditManager, + configManager + ); + + final Response response = resource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Assert.assertTrue((boolean) response.getEntity()); + + EasyMock.verify(ruleManager, configManager, inventoryView); + } + + private DruidServerMetadata createServerMetadataWithGroup(String name, ServerType type, String group) + { + return new DruidServerMetadata(name, name, null, 10000, null, type, "tier1", 1, group); + } + private void prepareRequestForAudit() { EasyMock.expect(request.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").anyTimes(); diff --git a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java index 59251ab4c704..c2a1611ed6f0 100644 --- a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java +++ b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java @@ -135,6 +135,20 @@ private Server findServerInner(final Pair selected) Server server = selected == null ? null : selected.rhs; if (server == null) { + // Fail-closed when a deployment-group filter is configured: do not fall back to a cached + // backup, since the cached broker may now be outside the acceptable deployment groups + // (re-announced with a different tag, or removed while still cached). Clear any stale + // entries instead so a recovery later starts from a clean slate. + if (hostSelector.isDeploymentGroupFilterEnabled()) { + log.warn( + "No server found for serviceName[%s] under deployment-group filter; " + + "skipping backup fallback to preserve red/black isolation.", + serviceName + ); + serverBackup.remove(serviceName); + return null; + } + log.error( "No server found for serviceName[%s]. Using backup", serviceName diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java index e18863853794..14e97bf8d84e 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; +import java.util.Set; /** */ @@ -60,6 +61,13 @@ public class TieredBrokerConfig new PriorityTieredBrokerSelectorStrategy(0, 1) ); + /** + * If non-empty, the router only routes to brokers whose deploymentGroup is in this set. + * Empty/null means no filtering. A broker with no deploymentGroup tag matches only when this is empty. + */ + @JsonProperty + private Set acceptableDeploymentGroups = null; + // tier, public LinkedHashMap getTierToBrokerMap() { @@ -89,4 +97,9 @@ public List getStrategies() { return ImmutableList.copyOf(strategies); } + + public Set getAcceptableDeploymentGroups() + { + return acceptableDeploymentGroups; + } } diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 6a20f3bb418d..96dcaf70caf3 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -31,6 +31,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -46,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -124,12 +126,19 @@ public void start() return; } + final Set acceptable = tierConfig.getAcceptableDeploymentGroups(); + if (acceptable != null && acceptable.isEmpty()) { + throw new ISE("If configured, 'druid.router.acceptableDeploymentGroups' must be non-empty"); + } + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { servers.put(entry.getValue(), new NodesHolder()); } DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER); druidNodeDiscovery.registerListener( + // The deploymentGroup filter is applied only on add; if a broker's tag changes in place, + // discovery emits remove + add, so the new tag is re-evaluated on the next add event. new DruidNodeDiscovery.Listener() { @Override @@ -137,6 +146,15 @@ public void nodesAdded(Collection nodes) { nodes.forEach( (node) -> { + if (!isDeploymentGroupAllowed(node)) { + log.debug( + "Excluding broker[%s] with deploymentGroup[%s] (acceptable=%s)", + node.getDruidNode().getHostAndPortToUse(), + node.getDruidNode().getDeploymentGroup(), + tierConfig.getAcceptableDeploymentGroups() + ); + return; + } NodesHolder nodesHolder = servers.get(node.getDruidNode().getServiceName()); if (nodesHolder != null) { nodesHolder.add(node.getDruidNode().getHostAndPortToUse(), TO_SERVER.apply(node)); @@ -182,6 +200,17 @@ public String getDefaultServiceName() return tierConfig.getDefaultBrokerServiceName(); } + /** + * Returns true if a deployment-group filter is configured. When enabled, callers must avoid + * any cached/backup routing that could surface a broker which has since fallen outside the + * acceptable groups; the filter is intended to be fail-closed. + */ + public boolean isDeploymentGroupFilterEnabled() + { + final Set acceptable = tierConfig.getAcceptableDeploymentGroups(); + return acceptable != null && !acceptable.isEmpty(); + } + public Pair select(final Query query) { synchronized (lock) { @@ -266,7 +295,26 @@ private Pair getServerPair(String brokerServiceName) nodesHolder = servers.get(tierConfig.getDefaultBrokerServiceName()); } - return new Pair<>(brokerServiceName, nodesHolder.pick()); + Server picked = nodesHolder.pick(); + if (picked == null && tierConfig.getAcceptableDeploymentGroups() != null + && !tierConfig.getAcceptableDeploymentGroups().isEmpty()) { + log.warn( + "No brokers available for serviceName[%s] after applying deploymentGroup filter[%s]. " + + "Check that brokers with a matching deploymentGroup are running.", + brokerServiceName, + tierConfig.getAcceptableDeploymentGroups() + ); + } + return new Pair<>(brokerServiceName, picked); + } + + private boolean isDeploymentGroupAllowed(DiscoveryDruidNode node) + { + final Set acceptable = tierConfig.getAcceptableDeploymentGroups(); + if (acceptable == null || acceptable.isEmpty()) { + return true; + } + return acceptable.contains(node.getDruidNode().getDeploymentGroup()); } public Pair selectForSql(SqlQuery sqlQuery) diff --git a/services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java b/services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java index 35aa5f534639..513fe2060900 100644 --- a/services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java +++ b/services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java @@ -72,37 +72,79 @@ public int getPort() return 0; } }; + } + @After + public void tearDown() + { + EasyMock.verify(brokerSelector); + } + + @Test + public void testFindServer() + { EasyMock.expect(brokerSelector.select(EasyMock.anyObject(Query.class))).andReturn( Pair.of("service", server) ); EasyMock.replay(brokerSelector); + + final Server server = newQueryHostFinder().findServer(newQuery()); + + Assert.assertEquals("foo", server.getHost()); } - @After - public void tearDown() + @Test + public void testFindServerUsesBackupWhenDeploymentGroupFilterIsDisabled() { - EasyMock.verify(brokerSelector); + EasyMock.expect(brokerSelector.select(EasyMock.anyObject(Query.class))).andReturn( + Pair.of("service", server) + ); + EasyMock.expect(brokerSelector.select(EasyMock.anyObject(Query.class))).andReturn( + Pair.of("service", null) + ); + EasyMock.expect(brokerSelector.isDeploymentGroupFilterEnabled()).andReturn(false); + EasyMock.replay(brokerSelector); + + final QueryHostFinder queryHostFinder = newQueryHostFinder(); + + Assert.assertSame(server, queryHostFinder.findServer(newQuery())); + Assert.assertSame(server, queryHostFinder.findServer(newQuery())); } @Test - public void testFindServer() + public void testFindServerSkipsBackupWhenDeploymentGroupFilterIsEnabled() + { + EasyMock.expect(brokerSelector.select(EasyMock.anyObject(Query.class))).andReturn( + Pair.of("service", server) + ); + EasyMock.expect(brokerSelector.select(EasyMock.anyObject(Query.class))).andReturn( + Pair.of("service", null) + ); + EasyMock.expect(brokerSelector.isDeploymentGroupFilterEnabled()).andReturn(true); + EasyMock.replay(brokerSelector); + + final QueryHostFinder queryHostFinder = newQueryHostFinder(); + + Assert.assertSame(server, queryHostFinder.findServer(newQuery())); + Assert.assertNull(queryHostFinder.findServer(newQuery())); + } + + private QueryHostFinder newQueryHostFinder() { - QueryHostFinder queryRunner = new QueryHostFinder( + return new QueryHostFinder( brokerSelector, new RendezvousHashAvaticaConnectionBalancer() ); + } - Server server = queryRunner.findServer( - new TimeBoundaryQuery( - new TableDataSource("test"), - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-08-31/2011-09-01"))), - null, - null, - null - ) + private Query newQuery() + { + return new TimeBoundaryQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-08-31/2011-09-01"))), + null, + null, + null ); - - Assert.assertEquals("foo", server.getHost()); } } diff --git a/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorDeploymentGroupTest.java b/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorDeploymentGroupTest.java new file mode 100644 index 000000000000..f8d8bb6d365f --- /dev/null +++ b/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorDeploymentGroupTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.router; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.client.DruidServer; +import org.apache.druid.client.selector.Server; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscovery; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.Druids; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.server.DruidNode; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Set; + +public class TieredBrokerHostSelectorDeploymentGroupTest +{ + private TieredBrokerHostSelector brokerSelector; + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + + @After + public void tearDown() + { + if (brokerSelector != null) { + brokerSelector.stop(); + } + EasyMock.verify(druidNodeDiscoveryProvider); + } + + @Test + public void testFilterExcludesBrokersWithNonMatchingDeploymentGroup() + { + final DiscoveryDruidNode blackBroker = makeBroker("black-broker", "blackHost", "black"); + final DiscoveryDruidNode redBroker = makeBroker("black-broker", "redHost", "red"); + final DiscoveryDruidNode untaggedBroker = makeBroker("black-broker", "untaggedHost", null); + + setupSelector(ImmutableSet.of("black"), blackBroker, redBroker, untaggedBroker); + + final Pair picked = brokerSelector.select(simpleQuery()); + Assert.assertEquals("black-broker", picked.lhs); + Assert.assertEquals("blackHost:8080", picked.rhs.getHost()); + + // Round-robin should keep returning the only matching broker. + Assert.assertEquals("blackHost:8080", brokerSelector.select(simpleQuery()).rhs.getHost()); + } + + @Test + public void testFilterUnsetIncludesAllBrokers() + { + final DiscoveryDruidNode b1 = makeBroker("default-broker", "host1", "black"); + final DiscoveryDruidNode b2 = makeBroker("default-broker", "host2", "red"); + + setupSelector(null, b1, b2); + + final Set seenHosts = ImmutableSet.of( + brokerSelector.select(simpleQuery()).rhs.getHost(), + brokerSelector.select(simpleQuery()).rhs.getHost() + ); + Assert.assertEquals(ImmutableSet.of("host1:8080", "host2:8080"), seenHosts); + } + + @Test + public void testFilterEliminatingAllBrokersReturnsNullServer() + { + final DiscoveryDruidNode redBroker = makeBroker("default-broker", "redHost", "red"); + + setupSelector(ImmutableSet.of("black"), redBroker); + + final Pair picked = brokerSelector.select(simpleQuery()); + Assert.assertEquals("default-broker", picked.lhs); + Assert.assertNull("Filter should fail closed when no broker matches", picked.rhs); + } + + private DiscoveryDruidNode makeBroker(String serviceName, String host, String deploymentGroup) + { + return new DiscoveryDruidNode( + new DruidNode(serviceName, host, false, 8080, null, null, true, false, null, deploymentGroup), + NodeRole.BROKER, + ImmutableMap.of() + ); + } + + private void setupSelector(Set acceptableDeploymentGroups, DiscoveryDruidNode... brokers) + { + druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class); + + final Collection brokerSet = Arrays.asList(brokers); + final DruidNodeDiscovery druidNodeDiscovery = new DruidNodeDiscovery() + { + @Override + public Collection getAllNodes() + { + return brokerSet; + } + + @Override + public void registerListener(Listener listener) + { + listener.nodesAdded(ImmutableList.copyOf(brokerSet)); + listener.nodeViewInitialized(); + } + }; + + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + final String defaultBrokerName = brokers[0].getDruidNode().getServiceName(); + brokerSelector = new TieredBrokerHostSelector( + new NoopRuleManager(), + new TieredBrokerConfig() + { + @Override + public LinkedHashMap getTierToBrokerMap() + { + return new LinkedHashMap<>(ImmutableMap.of(DruidServer.DEFAULT_TIER, defaultBrokerName)); + } + + @Override + public String getDefaultBrokerServiceName() + { + return defaultBrokerName; + } + + @Override + public Set getAcceptableDeploymentGroups() + { + return acceptableDeploymentGroups; + } + }, + druidNodeDiscoveryProvider, + ImmutableList.of() + ); + brokerSelector.start(); + } + + private TimeseriesQuery simpleQuery() + { + return Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity("all") + .aggregators(Collections.singletonList(new CountAggregatorFactory("rows"))) + .intervals(Collections.singletonList(Intervals.of("2024-01-01/2024-01-02"))) + .build(); + } + + private static class NoopRuleManager extends CoordinatorRuleManager + { + NoopRuleManager() + { + super(null, null); + } + + // Returning false short-circuits select() to the default-lookup path, which is what + // these tests want to exercise — the filter is applied when nodes are added to the holder. + @Override + public boolean isStarted() + { + return false; + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 3d3d2a65791f..d623c9af6d39 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -196,6 +196,7 @@ public class SystemSchema extends AbstractSchema .add("labels", ColumnType.STRING) .add("available_processors", ColumnType.LONG) .add("total_memory", ColumnType.LONG) + .add("deployment_group", ColumnType.STRING) .build(); static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature @@ -701,7 +702,8 @@ private Object[] buildRowForNonDataServer(DiscoveryDruidNode discoveryDruidNode) node.getBuildRevision(), node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels()), (long) discoveryDruidNode.getAvailableProcessors(), - discoveryDruidNode.getTotalMemory() + discoveryDruidNode.getTotalMemory(), + node.getDeploymentGroup() }; } @@ -730,7 +732,8 @@ private Object[] buildRowForNonDataServerWithLeadership( node.getBuildRevision(), node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels()), (long) discoveryDruidNode.getAvailableProcessors(), - discoveryDruidNode.getTotalMemory() + discoveryDruidNode.getTotalMemory(), + node.getDeploymentGroup() }; } @@ -771,7 +774,8 @@ private Object[] buildRowForDiscoverableDataServer( node.getBuildRevision(), node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels()), (long) discoveryDruidNode.getAvailableProcessors(), - discoveryDruidNode.getTotalMemory() + discoveryDruidNode.getTotalMemory(), + node.getDeploymentGroup() }; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index bb381b18718f..a85ecb011db1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -588,7 +588,7 @@ public void testGetTableMap() final SystemSchema.ServersTable serversTable = (SystemSchema.ServersTable) schema.getTableMap().get("servers"); final RelDataType serverRowType = serversTable.getRowType(new JavaTypeFactoryImpl()); final List serverFields = serverRowType.getFieldList(); - Assert.assertEquals(16, serverFields.size()); + Assert.assertEquals(17, serverFields.size()); Assert.assertEquals("server", serverFields.get(0).getName()); Assert.assertEquals(SqlTypeName.VARCHAR, serverFields.get(0).getType().getSqlTypeName()); @@ -1241,7 +1241,8 @@ private Object[] createExpectedRow( buildRevision, labels, availableProcessors, - totalMemory + totalMemory, + null }; } diff --git a/website/.spelling b/website/.spelling index 28701817f362..93a5476bccfb 100644 --- a/website/.spelling +++ b/website/.spelling @@ -321,6 +321,7 @@ consumerProperties cron csv customizable +cutover dataset datasets datasketches