Skip to content
Open
4 changes: 3 additions & 1 deletion docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ To use these APIs, ensure you have read and write permissions for the CONFIG res

#### Get dynamic configuration

Retrieves the current dynamic execution config for the Kubernetes task runner.
> Prior to Druid 37.0.0, this API will return an empty value when the dynamic config has not been updated via the POST method below. This has since changed to always reflect the dynamic config that will be used by the task runner to create K8s jobs.

Retrieve the current execution config used by the Kubernetes task runner.
Returns a JSON object with the dynamic configuration properties.

##### URL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
import org.joda.time.Period;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;

Expand All @@ -48,6 +49,12 @@ public KubernetesTaskRunnerEffectiveConfig(
this.dynamicConfigSupplier = dynamicConfigSupplier;
}

@Nullable
public KubernetesTaskRunnerDynamicConfig getDynamicConfig()
{
return dynamicConfigSupplier == null ? null : dynamicConfigSupplier.get();
}

@Override
public String getNamespace()
{
Expand Down Expand Up @@ -203,4 +210,3 @@ public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
return dynamicConfigSupplier.get().getPodTemplateSelectStrategy();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,25 @@ public class DefaultKubernetesTaskRunnerDynamicConfig implements KubernetesTaskR

@JsonCreator
public DefaultKubernetesTaskRunnerDynamicConfig(
@Nullable
@JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy podTemplateSelectStrategy,
@Nullable
@JsonProperty("capacity") Integer capacity
)
{
this.podTemplateSelectStrategy = podTemplateSelectStrategy;
this.capacity = capacity;
}

@Nullable
@Override
@JsonProperty
public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
{
return podTemplateSelectStrategy;
}

@Nullable
@Override
@JsonProperty
public Integer getCapacity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
import org.joda.time.Interval;
Expand All @@ -43,7 +44,6 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* Resource that manages Kubernetes-specific execution configurations for running tasks.
Expand All @@ -57,16 +57,18 @@ public class KubernetesTaskExecutionConfigResource
private static final Logger log = new Logger(KubernetesTaskExecutionConfigResource.class);
private final JacksonConfigManager configManager;
private final AuditManager auditManager;
private AtomicReference<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = null;
private final KubernetesTaskRunnerEffectiveConfig effectiveConfig;

@Inject
public KubernetesTaskExecutionConfigResource(
final JacksonConfigManager configManager,
final AuditManager auditManager
final AuditManager auditManager,
final KubernetesTaskRunnerEffectiveConfig effectiveConfig
)
{
this.configManager = configManager;
this.auditManager = auditManager;
this.effectiveConfig = effectiveConfig;
}

/**
Expand All @@ -84,12 +86,13 @@ public Response setExecutionConfig(
@Context final HttpServletRequest req
)
{
KubernetesTaskRunnerDynamicConfig currentConfig = getDynamicConfig();
final KubernetesTaskRunnerDynamicConfig persistedDynamicConfig = effectiveConfig.getDynamicConfig();
KubernetesTaskRunnerDynamicConfig mergedConfig = dynamicConfig;

if (currentConfig != null) {
mergedConfig = currentConfig.merge(dynamicConfig);
if (persistedDynamicConfig != null) {
mergedConfig = persistedDynamicConfig.merge(dynamicConfig);
}

final ConfigManager.SetResult setResult = configManager.set(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
mergedConfig,
Expand Down Expand Up @@ -154,14 +157,14 @@ public Response getExecutionConfigHistory(
@ResourceFilters(ConfigResourceFilter.class)
public Response getExecutionConfig()
{
return Response.ok(getDynamicConfig()).build();
return Response.ok(getEffectiveConfig()).build();
}

private KubernetesTaskRunnerDynamicConfig getDynamicConfig()
private KubernetesTaskRunnerDynamicConfig getEffectiveConfig()
{
if (dynamicConfigRef == null) {
dynamicConfigRef = configManager.watch(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class);
}
return dynamicConfigRef.get();
return new DefaultKubernetesTaskRunnerDynamicConfig(
effectiveConfig.getPodTemplateSelectStrategy(),
effectiveConfig.getCapacity()
);
}
}
Loading
Loading