Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2022,7 +2022,7 @@ log4j config to route these logs to different sources based on the feed of the e
|`druid.emitter.logging.loggerClass`|The class used for logging.|`org.apache.druid.java.util.emitter.core.LoggingEmitter`|
|`druid.emitter.logging.logLevel`|Choices: debug, info, warn, error. The log level at which message are logged.|info|
|`druid.emitter.logging.shouldFilterMetrics`|When true, only metrics listed in the allow list are emitted; non-metric events (e.g. alerts) are always emitted. When false, all events are logged (backward-compatible).|false|
|`druid.emitter.logging.allowedMetricsPath`|Path to a JSON file whose keys are the allowed metric names. Only used when `shouldFilterMetrics` is true. If null or empty, the bundled classpath resource `defaultMetrics.json` is used. If a path is set but the file is missing, a warning is logged and the emitter falls back to the default classpath resource.|null|
|`druid.emitter.logging.metricSpecPath`|Path to a JSON file whose keys are the allowed metric names. Only used when `shouldFilterMetrics` is true. If null or empty, the bundled classpath resource `defaultMetrics.json` is used.|null|

#### HTTP emitter module

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
public Metrics(PrometheusEmitterConfig config)
{
String namespace = config.getNamespace();
String path = config.getDimensionMapPath();
String path = config.getMetricSpecPath().orElse(config.getDimensionMapPath());

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
PrometheusEmitterConfig.getDimensionMapPath
should be avoided because it has been deprecated.
boolean isAddHostAsLabel = config.isAddHostAsLabel();
boolean isAddServiceAsLabel = config.isAddServiceAsLabel();
Map<String, String> extraLabels = config.getExtraLabels();
Expand Down Expand Up @@ -136,7 +136,7 @@
InputStream is;
if (Strings.isNullOrEmpty(path)) {
log.info("Using default metric configuration");
is = this.getClass().getClassLoader().getResourceAsStream("defaultMetrics.json");
is = this.getClass().getClassLoader().getResourceAsStream(PrometheusEmitterConfig.DEFAULT_METRIC_SPEC_PATH);
} else {
log.info("Using metric configuration at [%s]", path);
is = new FileInputStream(new File(path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.emitter.prometheus;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.prometheus.client.CollectorRegistry;
Expand All @@ -30,24 +30,26 @@
import io.prometheus.client.exporter.PushGateway;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.AbstractFilteringEmitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.MetricAllowlistLoader;
import org.apache.druid.java.util.emitter.core.MetricAllowlistParser;
import org.apache.druid.java.util.emitter.core.MetricAllowlistParsers;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

/**
*
*/
public class PrometheusEmitter implements Emitter
public class PrometheusEmitter extends AbstractFilteringEmitter
{
private static final Logger log = new Logger(PrometheusEmitter.class);
private final Metrics metrics;
Expand All @@ -70,6 +72,23 @@

public PrometheusEmitter(PrometheusEmitterConfig config)
{
super(
config.isShouldFilterMetrics(),
config.isShouldFilterMetrics()
? config.getMetricSpecPath()
.or(() -> Optional.ofNullable(config.getDimensionMapPath()))

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
PrometheusEmitterConfig.getDimensionMapPath
should be avoided because it has been deprecated.
.map(path -> MetricAllowlistLoader.loadAllowlistFromFile(
new ObjectMapper(),
path,
MetricAllowlistParsers::parseMetricNameObject
))
.orElseGet(() -> MetricAllowlistLoader.loadAllowlistFromClasspath(
new ObjectMapper(),
PrometheusEmitterConfig.DEFAULT_METRIC_SPEC_PATH,
MetricAllowlistParsers::parseMetricNameObject
))
: Collections.emptySet()
);
this.config = config;
this.strategy = config.getStrategy();
metrics = new Metrics(config);
Expand Down Expand Up @@ -135,15 +154,20 @@
}

@Override
public void emit(Event event)
protected boolean shouldFilterEvent(final Event event)
{
if (event instanceof ServiceMetricEvent) {
emitMetric((ServiceMetricEvent) event);
}
return !(event instanceof ServiceMetricEvent)
|| shouldFilterOutMetric(((ServiceMetricEvent) event).getMetric());
}

private void emitMetric(ServiceMetricEvent metricEvent)
@Override
protected void emitFilteredEvent(final Event event)
{
// for events passed initial filering when it's diasabled
if (!(event instanceof ServiceMetricEvent)) {
return;
}
final ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
String name = metricEvent.getMetric();
String service = metricEvent.getService();
String host = metricEvent.getHost();
Expand Down Expand Up @@ -286,6 +310,12 @@
this.pushGateway = pushGateway;
}

@Override
public MetricAllowlistParser getMetricAllowlistParser()
{
return MetricAllowlistParsers::parseMetricNameObject;
}

/**
* Cleans up stale metrics that have not been updated within the configured TTL.
* This method is called periodically by the TTL scheduler when using the 'exporter' strategy with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.core.GlobalEmitterConfig;
import org.joda.time.Duration;

import javax.annotation.Nullable;
Expand All @@ -35,8 +36,9 @@
/**
*
*/
public class PrometheusEmitterConfig
public class PrometheusEmitterConfig extends GlobalEmitterConfig
{
public static final String DEFAULT_METRIC_SPEC_PATH = "defaultPrometheusMetrics.json";

static final Pattern PATTERN = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*");

Expand All @@ -46,6 +48,11 @@ public class PrometheusEmitterConfig
@JsonProperty
private final String namespace;

/**
* Deprecated in favor of {@link GlobalEmitterConfig#getMetricSpecPath()} kept for backward compatibility.
* If metricSpecPath is not provided, the emitter falls back to this value.
*/
@Deprecated
@JsonProperty
@Nullable
private final String dimensionMapPath;
Expand Down Expand Up @@ -159,6 +166,10 @@ public String getNamespace()
return namespace;
}

/**
* Deprecated alias for {@link GlobalEmitterConfig#getMetricSpecPath()}.
*/
@Deprecated
public String getDimensionMapPath()
{
return dimensionMapPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,19 @@ public void testMetricsConfigurationWithNonExistentMetric()
@Test
public void testMetricsConfigurationWithUnSupportedType()
{
PrometheusEmitterConfig config = new PrometheusEmitterConfig(null, "test_5", "src/test/resources/defaultInvalidMetricsTest.json", null, null, true, true, null, null, null, null);
PrometheusEmitterConfig config = new PrometheusEmitterConfig(
null,
"test_5",
"src/test/resources/defaultInvalidMetricsTest.json",
null,
null,
true,
true,
null,
null,
null,
null
);
ISE iseException = Assert.assertThrows(ISE.class, () -> {
new Metrics(config);
});
Expand All @@ -124,7 +136,19 @@ public void testMetricsConfigurationWithUnSupportedType()
@Test
public void testMetricsConfigurationWithTimerHistogramBuckets()
{
PrometheusEmitterConfig config = new PrometheusEmitterConfig(null, "test_6", "src/test/resources/defaultMetricsTest.json", null, null, true, true, null, null, null, null);
PrometheusEmitterConfig config = new PrometheusEmitterConfig(
null,
"test_6",
"src/test/resources/defaultMetricsTest.json",
null,
null,
true,
true,
null,
null,
null,
null
);
Metrics metrics = new Metrics(config);
DimensionsAndCollector dimensionsAndCollector = metrics.getByName("query/time", "historical");
Assert.assertNotNull(dimensionsAndCollector);
Expand All @@ -137,5 +161,4 @@ public void testMetricsConfigurationWithTimerHistogramBuckets()
double[] expectedHistogramBuckets = {10.0, 30.0, 60.0, 120.0, 200.0, 300.0};
Assert.assertArrayEquals(expectedHistogramBuckets, dimensionsAndCollector.getHistogramBuckets(), 0.0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.emitter.service.UnitEvent;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -605,6 +606,49 @@ public void testCounterWithNegativeValue()
emitter.close();
}

@Test
public void testFilteringAllowsConfiguredMetricOnly() throws Exception
{
final PrometheusEmitterConfig config = new PrometheusEmitterConfig(
PrometheusEmitterConfig.Strategy.exporter,
"druid",
null,
0,
null,
false,
false,
null,
null,
false,
null
);
config.setShouldFilterMetrics(true);

final PrometheusEmitter emitter = new PrometheusEmitter(config);
Assert.assertFalse(emitter.shouldFilterOutMetric("query/time"));
Assert.assertTrue(emitter.shouldFilterOutMetric("some/unlisted/metric"));
emitter.emit(
ServiceMetricEvent.builder().setMetric("query/time", 10).setDimension("dataSource", "wikipedia").setDimension("type", "groupBy")
.build(ImmutableMap.of("service", "broker", "host", "druid.test.cn"))
);
emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 5).build("broker", "druid.test.cn"));
emitter.emit(new UnitEvent("alerts", 1));

Assert.assertEquals(
1.0,
CollectorRegistry.defaultRegistry.getSampleValue(
"druid_query_time_bucket",
new String[]{"dataSource", "type", "le"},
new String[]{"wikipedia", "groupBy", "0.1"}
),
0.0
);
Assert.assertNull(
CollectorRegistry.defaultRegistry.getSampleValue("druid_some_unlisted_metric")
);
emitter.close();
}

@After
public void tearDown()
{
Expand Down
6 changes: 6 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,12 @@
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>defaultMetrics.json</include>
</includes>
</resource>
<resource>
<directory>
${project.build.directory}/hyperic-sigar-${sigar.base.version}/sigar-bin/lib
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.emitter.core;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Optional;
import java.util.Set;

/**
* Base class for emitters that support metric-name-based filtering.
*
* <p>The base implementation provides a single filtering gate in {@link #emit(Event)} and then
* delegates actual emission to {@link #emitFilteredEvent(Event)}.
*/
public abstract class AbstractFilteringEmitter implements Emitter, MetricFilteringEmitter
{
private final boolean shouldFilterMetrics;
private final Set<String> allowedMetricNames;

protected AbstractFilteringEmitter(final boolean shouldFilterMetrics, final Set<String> allowedMetricNames)
{
this.shouldFilterMetrics = shouldFilterMetrics;
this.allowedMetricNames = Set.copyOf(allowedMetricNames);
}

protected static Set<String> loadAllowedMetricNames(
final boolean shouldFilterMetrics,
final ObjectMapper objectMapper,
final Optional<String> metricSpecPath,
final String defaultMetricSpecPath,
final MetricAllowlistParser parser
)
{
if (!shouldFilterMetrics) {
return Set.of();
}

return metricSpecPath
.map(path -> MetricAllowlistLoader.loadAllowlistFromFile(objectMapper, path, parser))
.orElseGet(() -> MetricAllowlistLoader.loadAllowlistFromClasspath(objectMapper, defaultMetricSpecPath, parser));
}

@Override
public final void emit(final Event event)
{
preEmit();
if (shouldFilterMetrics && shouldFilterEvent(event)) {
return;
}
emitFilteredEvent(event);
}

@Override
public boolean shouldFilterOutMetric(final String metricName)
{
return !allowedMetricNames.contains(metricName);
}

protected boolean isShouldFilterMetrics()
{
return shouldFilterMetrics;
}

protected Set<String> getAllowedMetricNames()
{
return allowedMetricNames;
}

/**
* Returns whether this event should be dropped before emission.
*
* <p>Implementations should apply emitter-specific event handling semantics here.
*/
protected abstract boolean shouldFilterEvent(Event event);

/**
* Hook for pre-emission checks that must run before metric filtering.
*/
protected void preEmit()
{
// default no-op
}

/**
* Emits an event that already passed filtering (or is not a metric event).
*/
protected abstract void emitFilteredEvent(Event event);
}
Loading
Loading