diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c31cf0747cba..d2c9afe81fa9 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -2022,7 +2022,7 @@ log4j config to route these logs to different sources based on the feed of the e |`druid.emitter.logging.loggerClass`|The class used for logging.|`org.apache.druid.java.util.emitter.core.LoggingEmitter`| |`druid.emitter.logging.logLevel`|Choices: debug, info, warn, error. The log level at which message are logged.|info| |`druid.emitter.logging.shouldFilterMetrics`|When true, only metrics listed in the allow list are emitted; non-metric events (e.g. alerts) are always emitted. When false, all events are logged (backward-compatible).|false| -|`druid.emitter.logging.allowedMetricsPath`|Path to a JSON file whose keys are the allowed metric names. Only used when `shouldFilterMetrics` is true. If null or empty, the bundled classpath resource `defaultMetrics.json` is used. If a path is set but the file is missing, a warning is logged and the emitter falls back to the default classpath resource.|null| +|`druid.emitter.logging.metricSpecPath`|Path to a JSON file whose keys are the allowed metric names. Only used when `shouldFilterMetrics` is true. If null or empty, the bundled classpath resource `defaultMetrics.json` is used.|null| #### HTTP emitter module diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java index 4edb76e7d84c..c396ac186544 100644 --- a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/Metrics.java @@ -66,7 +66,7 @@ public DimensionsAndCollector getByName(String name, String service) public Metrics(PrometheusEmitterConfig config) { String namespace = config.getNamespace(); - String path = config.getDimensionMapPath(); + String path = config.getMetricSpecPath().orElse(config.getDimensionMapPath()); boolean isAddHostAsLabel = config.isAddHostAsLabel(); boolean isAddServiceAsLabel = config.isAddServiceAsLabel(); Map extraLabels = config.getExtraLabels(); @@ -136,7 +136,7 @@ private Map readConfig(String path) InputStream is; if (Strings.isNullOrEmpty(path)) { log.info("Using default metric configuration"); - is = this.getClass().getClassLoader().getResourceAsStream("defaultMetrics.json"); + is = this.getClass().getClassLoader().getResourceAsStream(PrometheusEmitterConfig.DEFAULT_METRIC_SPEC_PATH); } else { log.info("Using metric configuration at [%s]", path); is = new FileInputStream(new File(path)); diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java index 60af27fcd4d1..80169ffcea00 100644 --- a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java @@ -19,7 +19,7 @@ package org.apache.druid.emitter.prometheus; - +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import io.prometheus.client.CollectorRegistry; @@ -30,24 +30,26 @@ import io.prometheus.client.exporter.PushGateway; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.core.AbstractFilteringEmitter; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.MetricAllowlistLoader; +import org.apache.druid.java.util.emitter.core.MetricAllowlistParser; +import org.apache.druid.java.util.emitter.core.MetricAllowlistParsers; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -/** - * - */ -public class PrometheusEmitter implements Emitter +public class PrometheusEmitter extends AbstractFilteringEmitter { private static final Logger log = new Logger(PrometheusEmitter.class); private final Metrics metrics; @@ -70,6 +72,23 @@ static PrometheusEmitter of(PrometheusEmitterConfig config) public PrometheusEmitter(PrometheusEmitterConfig config) { + super( + config.isShouldFilterMetrics(), + config.isShouldFilterMetrics() + ? config.getMetricSpecPath() + .or(() -> Optional.ofNullable(config.getDimensionMapPath())) + .map(path -> MetricAllowlistLoader.loadAllowlistFromFile( + new ObjectMapper(), + path, + MetricAllowlistParsers::parseMetricNameObject + )) + .orElseGet(() -> MetricAllowlistLoader.loadAllowlistFromClasspath( + new ObjectMapper(), + PrometheusEmitterConfig.DEFAULT_METRIC_SPEC_PATH, + MetricAllowlistParsers::parseMetricNameObject + )) + : Collections.emptySet() + ); this.config = config; this.strategy = config.getStrategy(); metrics = new Metrics(config); @@ -135,15 +154,20 @@ private static URL createURLSneakily(final String urlString) } @Override - public void emit(Event event) + protected boolean shouldFilterEvent(final Event event) { - if (event instanceof ServiceMetricEvent) { - emitMetric((ServiceMetricEvent) event); - } + return !(event instanceof ServiceMetricEvent) + || shouldFilterOutMetric(((ServiceMetricEvent) event).getMetric()); } - private void emitMetric(ServiceMetricEvent metricEvent) + @Override + protected void emitFilteredEvent(final Event event) { + // for events passed initial filering when it's diasabled + if (!(event instanceof ServiceMetricEvent)) { + return; + } + final ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; String name = metricEvent.getMetric(); String service = metricEvent.getService(); String host = metricEvent.getHost(); @@ -286,6 +310,12 @@ public void setPushGateway(PushGateway pushGateway) this.pushGateway = pushGateway; } + @Override + public MetricAllowlistParser getMetricAllowlistParser() + { + return MetricAllowlistParsers::parseMetricNameObject; + } + /** * Cleans up stale metrics that have not been updated within the configured TTL. * This method is called periodically by the TTL scheduler when using the 'exporter' strategy with diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java index 56def5c4c2d8..16d6fd3d54fe 100644 --- a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.core.GlobalEmitterConfig; import org.joda.time.Duration; import javax.annotation.Nullable; @@ -35,8 +36,9 @@ /** * */ -public class PrometheusEmitterConfig +public class PrometheusEmitterConfig extends GlobalEmitterConfig { + public static final String DEFAULT_METRIC_SPEC_PATH = "defaultPrometheusMetrics.json"; static final Pattern PATTERN = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*"); @@ -46,6 +48,11 @@ public class PrometheusEmitterConfig @JsonProperty private final String namespace; + /** + * Deprecated in favor of {@link GlobalEmitterConfig#getMetricSpecPath()} kept for backward compatibility. + * If metricSpecPath is not provided, the emitter falls back to this value. + */ + @Deprecated @JsonProperty @Nullable private final String dimensionMapPath; @@ -159,6 +166,10 @@ public String getNamespace() return namespace; } + /** + * Deprecated alias for {@link GlobalEmitterConfig#getMetricSpecPath()}. + */ + @Deprecated public String getDimensionMapPath() { return dimensionMapPath; diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultPrometheusMetrics.json similarity index 100% rename from extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json rename to extensions-contrib/prometheus-emitter/src/main/resources/defaultPrometheusMetrics.json diff --git a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java index 7139e446ab4f..572aceb884a3 100644 --- a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java +++ b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/MetricsTest.java @@ -114,7 +114,19 @@ public void testMetricsConfigurationWithNonExistentMetric() @Test public void testMetricsConfigurationWithUnSupportedType() { - PrometheusEmitterConfig config = new PrometheusEmitterConfig(null, "test_5", "src/test/resources/defaultInvalidMetricsTest.json", null, null, true, true, null, null, null, null); + PrometheusEmitterConfig config = new PrometheusEmitterConfig( + null, + "test_5", + "src/test/resources/defaultInvalidMetricsTest.json", + null, + null, + true, + true, + null, + null, + null, + null + ); ISE iseException = Assert.assertThrows(ISE.class, () -> { new Metrics(config); }); @@ -124,7 +136,19 @@ public void testMetricsConfigurationWithUnSupportedType() @Test public void testMetricsConfigurationWithTimerHistogramBuckets() { - PrometheusEmitterConfig config = new PrometheusEmitterConfig(null, "test_6", "src/test/resources/defaultMetricsTest.json", null, null, true, true, null, null, null, null); + PrometheusEmitterConfig config = new PrometheusEmitterConfig( + null, + "test_6", + "src/test/resources/defaultMetricsTest.json", + null, + null, + true, + true, + null, + null, + null, + null + ); Metrics metrics = new Metrics(config); DimensionsAndCollector dimensionsAndCollector = metrics.getByName("query/time", "historical"); Assert.assertNotNull(dimensionsAndCollector); @@ -137,5 +161,4 @@ public void testMetricsConfigurationWithTimerHistogramBuckets() double[] expectedHistogramBuckets = {10.0, 30.0, 60.0, 120.0, 200.0, 300.0}; Assert.assertArrayEquals(expectedHistogramBuckets, dimensionsAndCollector.getHistogramBuckets(), 0.0); } - } diff --git a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java index 2158260afb3b..fb6bda4939a3 100644 --- a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java +++ b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.emitter.service.UnitEvent; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -605,6 +606,49 @@ public void testCounterWithNegativeValue() emitter.close(); } + @Test + public void testFilteringAllowsConfiguredMetricOnly() throws Exception + { + final PrometheusEmitterConfig config = new PrometheusEmitterConfig( + PrometheusEmitterConfig.Strategy.exporter, + "druid", + null, + 0, + null, + false, + false, + null, + null, + false, + null + ); + config.setShouldFilterMetrics(true); + + final PrometheusEmitter emitter = new PrometheusEmitter(config); + Assert.assertFalse(emitter.shouldFilterOutMetric("query/time")); + Assert.assertTrue(emitter.shouldFilterOutMetric("some/unlisted/metric")); + emitter.emit( + ServiceMetricEvent.builder().setMetric("query/time", 10).setDimension("dataSource", "wikipedia").setDimension("type", "groupBy") + .build(ImmutableMap.of("service", "broker", "host", "druid.test.cn")) + ); + emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 5).build("broker", "druid.test.cn")); + emitter.emit(new UnitEvent("alerts", 1)); + + Assert.assertEquals( + 1.0, + CollectorRegistry.defaultRegistry.getSampleValue( + "druid_query_time_bucket", + new String[]{"dataSource", "type", "le"}, + new String[]{"wikipedia", "groupBy", "0.1"} + ), + 0.0 + ); + Assert.assertNull( + CollectorRegistry.defaultRegistry.getSampleValue("druid_some_unlisted_metric") + ); + emitter.close(); + } + @After public void tearDown() { diff --git a/processing/pom.xml b/processing/pom.xml index dcc9da8a5de6..38c507ba85b4 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -531,6 +531,12 @@ + + src/main/resources + + defaultMetrics.json + + ${project.build.directory}/hyperic-sigar-${sigar.base.version}/sigar-bin/lib diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/AbstractFilteringEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/AbstractFilteringEmitter.java new file mode 100644 index 000000000000..3534095c5ab5 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/AbstractFilteringEmitter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.Optional; +import java.util.Set; + +/** + * Base class for emitters that support metric-name-based filtering. + * + *

The base implementation provides a single filtering gate in {@link #emit(Event)} and then + * delegates actual emission to {@link #emitFilteredEvent(Event)}. + */ +public abstract class AbstractFilteringEmitter implements Emitter, MetricFilteringEmitter +{ + private final boolean shouldFilterMetrics; + private final Set allowedMetricNames; + + protected AbstractFilteringEmitter(final boolean shouldFilterMetrics, final Set allowedMetricNames) + { + this.shouldFilterMetrics = shouldFilterMetrics; + this.allowedMetricNames = Set.copyOf(allowedMetricNames); + } + + protected static Set loadAllowedMetricNames( + final boolean shouldFilterMetrics, + final ObjectMapper objectMapper, + final Optional metricSpecPath, + final String defaultMetricSpecPath, + final MetricAllowlistParser parser + ) + { + if (!shouldFilterMetrics) { + return Set.of(); + } + + return metricSpecPath + .map(path -> MetricAllowlistLoader.loadAllowlistFromFile(objectMapper, path, parser)) + .orElseGet(() -> MetricAllowlistLoader.loadAllowlistFromClasspath(objectMapper, defaultMetricSpecPath, parser)); + } + + @Override + public final void emit(final Event event) + { + preEmit(); + if (shouldFilterMetrics && shouldFilterEvent(event)) { + return; + } + emitFilteredEvent(event); + } + + @Override + public boolean shouldFilterOutMetric(final String metricName) + { + return !allowedMetricNames.contains(metricName); + } + + protected boolean isShouldFilterMetrics() + { + return shouldFilterMetrics; + } + + protected Set getAllowedMetricNames() + { + return allowedMetricNames; + } + + /** + * Returns whether this event should be dropped before emission. + * + *

Implementations should apply emitter-specific event handling semantics here. + */ + protected abstract boolean shouldFilterEvent(Event event); + + /** + * Hook for pre-emission checks that must run before metric filtering. + */ + protected void preEmit() + { + // default no-op + } + + /** + * Emits an event that already passed filtering (or is not a metric event). + */ + protected abstract void emitFilteredEvent(Event event); +} diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/BaseHttpEmittingConfig.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/BaseHttpEmittingConfig.java index 44ecaf9d334b..21ab6feb378b 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/BaseHttpEmittingConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/BaseHttpEmittingConfig.java @@ -27,8 +27,9 @@ import javax.validation.constraints.Min; import java.util.concurrent.TimeUnit; -public class BaseHttpEmittingConfig +public class BaseHttpEmittingConfig extends GlobalEmitterConfig { + public static final String DEFAULT_METRIC_SPEC_PATH = "defaultMetrics.json"; public static final long DEFAULT_FLUSH_MILLIS = 60 * 1000; public static final int DEFAULT_FLUSH_COUNTS = 500; @@ -181,6 +182,8 @@ protected String toStringBase() ", contentEncoding=" + contentEncoding + ", batchQueueSizeLimit=" + batchQueueSizeLimit + ", httpTimeoutAllowanceFactor=" + httpTimeoutAllowanceFactor + - ", minHttpTimeoutMillis=" + minHttpTimeoutMillis; + ", minHttpTimeoutMillis=" + minHttpTimeoutMillis + + ", shouldFilterMetrics=" + isShouldFilterMetrics() + + ", metricSpecPath='" + getMetricSpecPath() + '\''; } } diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java index 217b24831749..f7e19613da85 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java @@ -126,6 +126,18 @@ static Map makeHttpMap(Properties props) Float.parseFloat(props.getProperty("org.apache.druid.java.util.emitter.http.minHttpTimeoutMillis")) ); } + if (props.containsKey("org.apache.druid.java.util.emitter.shouldFilterMetrics")) { + httpMap.put( + "shouldFilterMetrics", + Boolean.parseBoolean(props.getProperty("org.apache.druid.java.util.emitter.shouldFilterMetrics")) + ); + } + if (props.containsKey("org.apache.druid.java.util.emitter.metricSpecPath")) { + httpMap.put( + "metricSpecPath", + props.getProperty("org.apache.druid.java.util.emitter.metricSpecPath") + ); + } return httpMap; } @@ -145,9 +157,9 @@ static Map makeLoggingMap(Properties props) "shouldFilterMetrics", Boolean.parseBoolean(props.getProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics")) ); } - if (props.containsKey("org.apache.druid.java.util.emitter.logging.allowedMetricsPath")) { + if (props.containsKey("org.apache.druid.java.util.emitter.logging.metricSpecPath")) { loggingMap.put( - "allowedMetricsPath", props.getProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath") + "metricSpecPath", props.getProperty("org.apache.druid.java.util.emitter.logging.metricSpecPath") ); } return loggingMap; diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/GlobalEmitterConfig.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/GlobalEmitterConfig.java new file mode 100644 index 000000000000..369caf783711 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/GlobalEmitterConfig.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Optional; + +/** + * Shared metric-filtering configuration for emitter implementations. + */ +public class GlobalEmitterConfig +{ + /** + * When true, only metrics listed in the allowed metrics configuration are emitted. + * If {@link #metricSpecPath} is null/empty, the bundled default allowlist + * (`defaultMetrics.json` on the classpath) is used. If a path is provided, + * it is loaded from that file instead. + * Defaults to false (emit all metrics). + */ + @JsonProperty + private boolean shouldFilterMetrics; + + /** + * Optional path to a JSON file containing a JSON object keyed by allowed metric names, + * for example `{"query/time": [], "jvm/gc/cpu": []}`. + * Only used when {@link #shouldFilterMetrics} is true. + * If null or empty, the bundled default resource (`defaultMetrics.json`) is loaded + * from the classpath. + */ + @JsonProperty + private String metricSpecPath; + + public boolean isShouldFilterMetrics() + { + return shouldFilterMetrics; + } + + public Optional getMetricSpecPath() + { + return Optional.ofNullable(metricSpecPath); + } + + public void setShouldFilterMetrics(boolean shouldFilterMetrics) + { + this.shouldFilterMetrics = shouldFilterMetrics; + } + + public void setMetricSpecPath(String metricSpecPath) + { + this.metricSpecPath = metricSpecPath; + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java index bf1817fd5c5c..43446288ba01 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfig.java @@ -52,6 +52,8 @@ public HttpEmitterConfig(BaseHttpEmittingConfig base, String recipientBaseUrl) this.batchQueueSizeLimit = base.batchQueueSizeLimit; this.httpTimeoutAllowanceFactor = base.httpTimeoutAllowanceFactor; this.minHttpTimeoutMillis = base.minHttpTimeoutMillis; + this.setShouldFilterMetrics(base.isShouldFilterMetrics()); + this.setMetricSpecPath(base.getMetricSpecPath().orElse(null)); } public String getRecipientBaseUrl() diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java index 5006e025ed3c..9c83ab700048 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.RequestBuilder; @@ -59,7 +60,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.zip.GZIPOutputStream; -public class HttpPostEmitter implements Flushable, Closeable, Emitter +public class HttpPostEmitter extends AbstractFilteringEmitter implements Flushable, Closeable { private static final int MAX_EVENT_SIZE = 1023 * 1024; // Set max size slightly less than 1M to allow for metadata @@ -152,6 +153,16 @@ public HttpPostEmitter(HttpEmitterConfig config, AsyncHttpClient client) public HttpPostEmitter(HttpEmitterConfig config, AsyncHttpClient client, ObjectMapper jsonMapper) { + super( + config.isShouldFilterMetrics(), + loadAllowedMetricNames( + config.isShouldFilterMetrics(), + jsonMapper, + config.getMetricSpecPath(), + BaseHttpEmittingConfig.DEFAULT_METRIC_SPEC_PATH, + MetricAllowlistParsers::parseMetricNameObject + ) + ); batchingStrategy = config.getBatchingStrategy(); final int batchOverhead = batchingStrategy.batchStartLength() + batchingStrategy.batchEndLength(); Preconditions.checkArgument( @@ -221,7 +232,14 @@ private boolean isTerminated() } @Override - public void emit(Event event) + protected boolean shouldFilterEvent(final Event event) + { + return event instanceof ServiceMetricEvent + && shouldFilterOutMetric(((ServiceMetricEvent) event).getMetric()); + } + + @Override + protected void emitFilteredEvent(final Event event) { emitAndReturnBatch(event); } @@ -279,6 +297,12 @@ private byte[] eventToBytes(Event event) } } + @Override + public MetricAllowlistParser getMetricAllowlistParser() + { + return MetricAllowlistParsers::parseMetricNameObject; + } + private void writeLargeEvent(byte[] eventBytes) { // It's better to drop the oldest, not latest event, but dropping the oldest is not easy to implement, because diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java index 17392515dd64..466431525f3c 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java @@ -19,13 +19,8 @@ package org.apache.druid.java.util.emitter.core; -/** - */ - -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; -import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -33,31 +28,16 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.slf4j.MarkerFactory; -import javax.annotation.Nullable; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collections; -import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -/** - */ -public class LoggingEmitter implements Emitter +public class LoggingEmitter extends AbstractFilteringEmitter { - private static final Logger LOGGER = new Logger(LoggingEmitter.class); - private static final String DEFAULT_ALLOWED_METRICS_RESOURCE = "defaultMetrics.json"; - private final Logger log; private final Level level; private final ObjectMapper jsonMapper; - @Nullable - private final Set allowedMetrics; - private final AtomicBoolean started = new AtomicBoolean(false); public LoggingEmitter(LoggingEmitterConfig config, ObjectMapper jsonMapper) @@ -66,14 +46,14 @@ public LoggingEmitter(LoggingEmitterConfig config, ObjectMapper jsonMapper) new Logger(config.getLoggerClass()), Level.toLevel(config.getLogLevel()), jsonMapper, - config.shouldFilterMetrics(), - config.getAllowedMetricsPath() + config.isShouldFilterMetrics(), + config.getMetricSpecPath().orElse(null) ); } public LoggingEmitter(Logger log, Level level, ObjectMapper jsonMapper) { - this(log, level, jsonMapper, false, null); + this(log, level, jsonMapper, false, ""); } public LoggingEmitter( @@ -81,68 +61,30 @@ public LoggingEmitter( Level level, ObjectMapper jsonMapper, boolean shouldFilterMetrics, - @Nullable String allowedMetricsPath + String metricSpecPath ) { - this.log = log; - this.level = level; - this.jsonMapper = jsonMapper; - this.allowedMetrics = shouldFilterMetrics ? loadAllowedMetrics(allowedMetricsPath, jsonMapper) : null; - } - - /** - * Loads the allowed metric names from a JSON file. If the path is null or empty, - * loads from the bundled classpath resource (defaultMetrics.json). If a custom - * path is provided but the file is missing, logs a warning and falls back to - * the default classpath resource. - */ - private static Set loadAllowedMetrics(@Nullable String path, ObjectMapper jsonMapper) - { - final InputStream is = openAllowedMetricsStream(path); - try { - final Map metricsMap = jsonMapper.readValue(is, new TypeReference>() {}); - return Collections.unmodifiableSet(metricsMap.keySet()); - } - catch (IOException e) { - final String source = path == null || Strings.isNullOrEmpty(path) ? DEFAULT_ALLOWED_METRICS_RESOURCE : path; - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Allowed metrics file must be a JSON object with metric names as keys; failed to parse [%s]", source); - } - } - - /** - * Opens the allowed metrics configuration stream. Uses classpath resource when - * path is null/empty. When a custom path is specified but the file is missing, - * logs a warning and falls back to the default classpath resource. - */ - private static InputStream openAllowedMetricsStream(@Nullable String path) - { - if (Strings.isNullOrEmpty(path)) { - LOGGER.info("Using default allowed metrics configuration from classpath resource [%s]", DEFAULT_ALLOWED_METRICS_RESOURCE); - return openDefaultAllowedMetricsResource(); - } - try { - final InputStream is = new FileInputStream(new File(path)); - LOGGER.info("Using allowed metrics configuration at [%s]", path); - return is; - } - catch (FileNotFoundException e) { - LOGGER.warn(e, "Allowed metrics file [%s] not found, falling back to default classpath resource [%s]", - path, DEFAULT_ALLOWED_METRICS_RESOURCE); - return openDefaultAllowedMetricsResource(); - } + this( + log, + level, + jsonMapper, + shouldFilterMetrics, + loadAllowedMetricNames( + shouldFilterMetrics, + jsonMapper, + Strings.isNullOrEmpty(metricSpecPath) ? Optional.empty() : Optional.of(metricSpecPath), + LoggingEmitterConfig.DEFAULT_METRIC_SPEC_PATH, + MetricAllowlistParsers::parseMetricNameObject + ) + ); } - private static InputStream openDefaultAllowedMetricsResource() + public LoggingEmitter(Logger log, Level level, ObjectMapper jsonMapper, boolean shouldFilterMetrics, Set metricNames) { - final InputStream is = LoggingEmitter.class.getClassLoader().getResourceAsStream(DEFAULT_ALLOWED_METRICS_RESOURCE); - if (is == null) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("Could not find default allowed metrics resource [%s] on classpath", DEFAULT_ALLOWED_METRICS_RESOURCE); - } - return is; + super(shouldFilterMetrics, metricNames); + this.log = log; + this.level = level; + this.jsonMapper = jsonMapper; } @Override @@ -179,23 +121,25 @@ public void start() } @Override - public void emit(Event event) + protected boolean shouldFilterEvent(final Event event) + { + return event instanceof ServiceMetricEvent + && shouldFilterOutMetric(((ServiceMetricEvent) event).getMetric()); + } + + @Override + protected void preEmit() { synchronized (started) { if (!started.get()) { throw new RejectedExecutionException("Service not started."); } } + } - // Allowlist filtering: only applies to ServiceMetricEvents. - // Non-metric events (alerts, etc.) always pass through. - if (allowedMetrics != null && event instanceof ServiceMetricEvent) { - final String metricName = ((ServiceMetricEvent) event).getMetric(); - if (!allowedMetrics.contains(metricName)) { - return; - } - } - + @Override + protected void emitFilteredEvent(final Event event) + { try { switch (level) { case TRACE: @@ -271,9 +215,16 @@ public String toString() return "LoggingEmitter{" + "log=" + log + ", level=" + level + + ", shouldFilterMetrics=" + isShouldFilterMetrics() + '}'; } + @Override + public MetricAllowlistParser getMetricAllowlistParser() + { + return MetricAllowlistParsers::parseMetricNameObject; + } + public enum Level { TRACE, diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java index 7aa3d5797498..c09e630e5d3a 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java @@ -21,13 +21,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; -import javax.annotation.Nullable; import javax.validation.constraints.NotNull; /** */ -public class LoggingEmitterConfig +public class LoggingEmitterConfig extends GlobalEmitterConfig { + public static final String DEFAULT_METRIC_SPEC_PATH = "defaultMetrics.json"; + @NotNull @JsonProperty private String loggerClass = LoggingEmitter.class.getName(); @@ -36,26 +37,6 @@ public class LoggingEmitterConfig @JsonProperty private String logLevel = "info"; - /** - * When true, only metrics listed in the allowed metrics configuration are emitted. - * If {@link #allowedMetricsPath} is null/empty, the bundled default allowlist - * (defaultMetrics.json on the classpath) is used. If a path is provided, - * it is loaded from that file instead. - * Defaults to false (emit all metrics, backward-compatible behavior). - */ - @JsonProperty("shouldFilterMetrics") - private boolean shouldFilterMetrics = false; - - /** - * Optional path to a JSON file containing an array of allowed metric names. - * Only used when {@link #shouldFilterMetrics} is true. - * If null or empty, the bundled default resource (defaultMetrics.json) is loaded - * from the classpath, mirroring how the Prometheus emitter loads its defaultMetrics.json. - */ - @JsonProperty - @Nullable - private String allowedMetricsPath = null; - public String getLoggerClass() { return loggerClass; @@ -66,25 +47,14 @@ public String getLogLevel() return logLevel; } - public boolean shouldFilterMetrics() - { - return shouldFilterMetrics; - } - - @Nullable - public String getAllowedMetricsPath() - { - return allowedMetricsPath; - } - @Override public String toString() { return "LoggingEmitterConfig{" + "loggerClass='" + loggerClass + '\'' + ", logLevel='" + logLevel + '\'' + - ", shouldFilterMetrics=" + shouldFilterMetrics + - ", allowedMetricsPath='" + allowedMetricsPath + '\'' + + ", shouldFilterMetrics=" + isShouldFilterMetrics() + + ", metricSpecPath='" + getMetricSpecPath() + '\'' + '}'; } } diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistLoader.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistLoader.java new file mode 100644 index 000000000000..5da08c3bdf72 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistLoader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import org.apache.druid.error.DruidException; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +public final class MetricAllowlistLoader +{ + + public static Set loadAllowlistFromFile( + final ObjectMapper mapper, + final String allowlistPath, + final MetricAllowlistParser parser + ) + { + validateAllowlistPath(allowlistPath); + + final File allowlistFile = new File(allowlistPath); + + try (final InputStream is = new FileInputStream(allowlistFile)) { + return parseAllowlist(mapper, is, allowlistFile.getPath(), parser); + } + catch (FileNotFoundException e) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.NOT_FOUND) + .build("Metric spec file path [%s] was not found.", allowlistFile.getPath()); + } + catch (IOException e) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Failed to parse metric spec file path [%s].", allowlistFile.getPath()); + } + } + + public static Set loadAllowlistFromClasspath( + final ObjectMapper mapper, + final String resourcePath, + final MetricAllowlistParser parser + ) + { + validateAllowlistPath(resourcePath); + + final InputStream classpathInputStream = MetricAllowlistLoader.class.getClassLoader().getResourceAsStream(resourcePath); + if (classpathInputStream == null) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.NOT_FOUND) + .build("Metric spec file path [%s] was not found.", resourcePath); + } + + try (final InputStream is = classpathInputStream) { + return parseAllowlist(mapper, is, resourcePath, parser); + } + catch (IOException e) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Failed to parse metric spec file path [%s].", resourcePath); + } + } + + private static Set parseAllowlist( + final ObjectMapper mapper, + final InputStream inputStream, + final String source, + final MetricAllowlistParser parser + ) throws IOException + { + return parser.parse(mapper.readTree(inputStream), source); + } + + private static void validateAllowlistPath(final String allowlistPath) + { + if (Strings.isNullOrEmpty(allowlistPath)) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Metric spec file path was empty, value [%s].", allowlistPath); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParser.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParser.java new file mode 100644 index 000000000000..6d9cc6828f30 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParser.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.Set; + +/** + * Parses a metric specification document into a set of metric names that can be used for filtering. + * + *

The expected JSON shape is parser-dependent and is implemented by each parser implementation. + */ +public interface MetricAllowlistParser +{ + /** + * Parses metric configuration from the provided JSON node. + * + * @param metricConfig root JSON node of the metric specification + * @param source source identifier used for error reporting (for example, a path) + * + * @return metric names extracted from the configuration + */ + Set parse(JsonNode metricConfig, String source); +} diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParsers.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParsers.java new file mode 100644 index 000000000000..b3195b4350aa --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParsers.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; + +import java.util.Set; + +public final class MetricAllowlistParsers +{ + + public static Set parseMetricNameObject(JsonNode metricConfig, String source) + { + if (!metricConfig.isObject()) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Metric allowlist file [%s] must be a JSON object of metric names.", source); + } + final ImmutableSet.Builder metricNames = ImmutableSet.builder(); + metricConfig.fieldNames().forEachRemaining(metricNames::add); + return metricNames.build(); + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricFilteringEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricFilteringEmitter.java new file mode 100644 index 000000000000..6b186048bd85 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/MetricFilteringEmitter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +/** + * Contract for emitters that can filter metric events based on metric name. + */ +public interface MetricFilteringEmitter +{ + /** + * Returns whether a metric should be filtered out (not emitted). + * + * @param metricName metric name from {@link org.apache.druid.java.util.emitter.service.ServiceMetricEvent} + * + * @return {@code true} when the metric should be dropped + */ + boolean shouldFilterOutMetric(String metricName); + + /** + * Returns the parser used to load metric names from the configured metric specification. + */ + MetricAllowlistParser getMetricAllowlistParser(); +} diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitter.java index 1aec8be0e4ca..497ddc9c8dca 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitter.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitter.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.asynchttpclient.AsyncHttpClient; import java.io.Closeable; @@ -37,7 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; -public class ParametrizedUriEmitter implements Flushable, Closeable, Emitter +public class ParametrizedUriEmitter extends AbstractFilteringEmitter implements Flushable, Closeable { private static final Logger log = new Logger(ParametrizedUriEmitter.class); private static final Set ONLY_FEED_PARAM = ImmutableSet.of("feed"); @@ -85,6 +86,16 @@ public ParametrizedUriEmitter( UriExtractor uriExtractor ) { + super( + config.isShouldFilterMetrics(), + loadAllowedMetricNames( + config.isShouldFilterMetrics(), + jsonMapper, + config.getMetricSpecPath(), + ParametrizedUriEmitterConfig.DEFAULT_METRIC_SPEC_PATH, + MetricAllowlistParsers::parseMetricNameObject + ) + ); this.config = config; this.client = client; this.jsonMapper = jsonMapper; @@ -115,7 +126,14 @@ public void start() } @Override - public void emit(Event event) + protected boolean shouldFilterEvent(final Event event) + { + return event instanceof ServiceMetricEvent + && shouldFilterOutMetric(((ServiceMetricEvent) event).getMetric()); + } + + @Override + protected void emitFilteredEvent(final Event event) { try { URI uri = uriExtractor.apply(event); @@ -206,4 +224,11 @@ public String toString() ", config=" + config + '}'; } + + @Override + public MetricAllowlistParser getMetricAllowlistParser() + { + return MetricAllowlistParsers::parseMetricNameObject; + } + } diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfig.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfig.java index 9982f134dbc6..49ce61504448 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterConfig.java @@ -23,8 +23,9 @@ import javax.validation.constraints.NotNull; -public class ParametrizedUriEmitterConfig +public class ParametrizedUriEmitterConfig extends GlobalEmitterConfig { + public static final String DEFAULT_METRIC_SPEC_PATH = BaseHttpEmittingConfig.DEFAULT_METRIC_SPEC_PATH; private static final BaseHttpEmittingConfig DEFAULT_HTTP_EMITTING_CONFIG = new BaseHttpEmittingConfig(); @NotNull @@ -50,6 +51,8 @@ public String toString() return "ParametrizedUriEmitterConfig{" + "recipientBaseUrlPattern='" + recipientBaseUrlPattern + '\'' + ", httpEmittingConfig=" + httpEmittingConfig + + ", shouldFilterMetrics=" + isShouldFilterMetrics() + + ", metricSpecPath='" + getMetricSpecPath() + '\'' + '}'; } } diff --git a/processing/src/main/resources/defaultMetrics.json b/processing/src/main/resources/defaultMetrics.json index c4647a459c14..4e419b2bb93f 100644 --- a/processing/src/main/resources/defaultMetrics.json +++ b/processing/src/main/resources/defaultMetrics.json @@ -1,4 +1,19 @@ { + "cgroup/cpu/cores_quota": [], + "cgroup/cpu/shares": [], + "cgroup/cpu/usage/sys/percentage": [], + "cgroup/cpu/usage/total/percentage": [], + "cgroup/cpu/usage/user/percentage": [], + "cgroup/cpuset/cpu_count": [], + "cgroup/cpuset/effective_cpu_count": [], + "cgroup/cpuset/effective_mems_count": [], + "cgroup/cpuset/mems_count": [], + "cgroup/disk/read/count": [], + "cgroup/disk/read/size": [], + "cgroup/disk/write/count": [], + "cgroup/disk/write/size": [], + "cgroup/memory/limit/bytes": [], + "cgroup/memory/usage/bytes": [], "compact/segmentAnalyzer/fetchAndProcessMillis": [], "compact/task/count": [], "compactTask/availableSlot/count": [], @@ -11,8 +26,11 @@ "groupBy/spilledBytes": [], "groupBy/spilledQueries": [], "ingest/count": [], + "ingest/events/avgMessageGap": [], "ingest/events/duplicate": [], + "ingest/events/maxMessageGap": [], "ingest/events/messageGap": [], + "ingest/events/minMessageGap": [], "ingest/events/processed": [], "ingest/events/processedWithError": [], "ingest/events/thrownAway": [], @@ -22,9 +40,18 @@ "ingest/handoff/time": [], "ingest/input/bytes": [], "ingest/kafka/avgLag": [], + "ingest/kafka/avgLag/time": [], + "ingest/kafka/fetchOffsets/time": [], "ingest/kafka/lag": [], + "ingest/kafka/lag/time": [], "ingest/kafka/maxLag": [], + "ingest/kafka/maxLag/time": [], "ingest/kafka/partitionLag": [], + "ingest/kinesis/avgLag/time": [], + "ingest/kinesis/fetchOffsets/time": [], + "ingest/kinesis/lag/time": [], + "ingest/kinesis/maxLag/time": [], + "ingest/kinesis/partitionLag/time": [], "ingest/merge/cpu": [], "ingest/merge/time": [], "ingest/notices/queueSize": [], @@ -38,6 +65,8 @@ "ingest/rows/output": [], "ingest/segments/count": [], "ingest/rows/published": [], + "ingest/shuffle/bytes": [], + "ingest/shuffle/requests": [], "ingest/sink/count": [], "ingest/tombstones/count": [], "interval/compacted/count": [], @@ -67,18 +96,6 @@ "jvm/pool/init": [], "jvm/pool/max": [], "jvm/pool/used": [], - "kafka/consumer/bytesConsumed": [], - "kafka/consumer/fetch": [], - "kafka/consumer/fetchLatencyAvg": [], - "kafka/consumer/fetchLatencyMax": [], - "kafka/consumer/fetchRate": [], - "kafka/consumer/fetchSizeAvg": [], - "kafka/consumer/fetchSizeMax": [], - "kafka/consumer/incomingBytes": [], - "kafka/consumer/outgoingBytes": [], - "kafka/consumer/recordsConsumed": [], - "kafka/consumer/recordsLag": [], - "kafka/consumer/recordsPerRequestAvg": [], "kill/eligibleUnusedSegments/count": [], "kill/pendingSegments/count": [], "kill/task/count": [], @@ -94,38 +111,12 @@ "metadata/kill/datasource/count": [], "metadata/kill/rule/count": [], "metadata/kill/supervisor/count": [], - "metadatacache/backfill/count": [], "metadatacache/init/time": [], - "metadatacache/refresh/count": [], - "metadatacache/refresh/time": [], - "metadatacache/schemaPoll/count": [], - "metadatacache/schemaPoll/failed": [], "metadatacache/schemaPoll/time": [], + "query/byteLimit/exceeded/count": [], "query/bytes": [], - "query/cache/delta/averageBytes": [], - "query/cache/delta/errors": [], - "query/cache/delta/evictions": [], - "query/cache/delta/hitRate": [], - "query/cache/delta/hits": [], - "query/cache/delta/misses": [], - "query/cache/delta/numEntries": [], - "query/cache/delta/put/error": [], - "query/cache/delta/put/ok": [], - "query/cache/delta/put/oversized": [], - "query/cache/delta/sizeBytes": [], - "query/cache/delta/timeouts": [], - "query/cache/total/averageBytes": [], - "query/cache/total/errors": [], - "query/cache/total/evictions": [], - "query/cache/total/hitRate": [], - "query/cache/total/hits": [], - "query/cache/total/misses": [], - "query/cache/total/numEntries": [], - "query/cache/total/put/error": [], - "query/cache/total/put/ok": [], - "query/cache/total/put/oversized": [], - "query/cache/total/sizeBytes": [], - "query/cache/total/timeouts": [], + "query/cache/memcached/delta": [], + "query/cache/memcached/total": [], "query/count": [], "query/cpu/time": [], "query/failed/count": [], @@ -133,17 +124,21 @@ "query/node/bytes": [], "query/node/time": [], "query/node/ttfb": [], + "query/priority": [], + "query/resultCache/hit": [], + "query/rowLimit/exceeded/count": [], "query/segment/time": [], "query/segmentAndCache/time": [], + "query/segments/count": [], "query/success/count": [], "query/time": [], "query/timeout/count": [], "query/wait/time": [], - "schemacache/finalizedSchemaPayload/count": [], - "schemacache/finalizedSegmentMetadata/count": [], - "schemacache/inTransitSMQPublishedResults/count": [], - "schemacache/inTransitSMQResults/count": [], - "schemacache/realtime/count": [], + "s3/upload/part/queueSize": [], + "s3/upload/part/queuedTime": [], + "s3/upload/part/time": [], + "s3/upload/total/bytes": [], + "s3/upload/total/time": [], "segment/added/bytes": [], "segment/assignSkipped/count": [], "segment/assigned/count": [], @@ -155,21 +150,62 @@ "segment/dropQueue/count": [], "segment/dropSkipped/count": [], "segment/dropped/count": [], + "segment/kill/jobsProcessed/count": [], + "segment/kill/queueProcess/time": [], + "segment/kill/queueReset/time": [], + "segment/kill/skippedIntervals/count": [], + "segment/kill/unusedIntervals/count": [], + "segment/killed/deepStorage/count": [], + "segment/killed/metadataStore/count": [], "segment/loadQueue/assigned": [], "segment/loadQueue/cancelled": [], "segment/loadQueue/count": [], "segment/loadQueue/failed": [], "segment/loadQueue/size": [], "segment/loadQueue/success": [], + "segment/loading/rateKbps": [], "segment/max": [], + "segment/metadataCache/dataSource/deleted": [], + "segment/metadataCache/deleted": [], + "segment/metadataCache/interval/count": [], + "segment/metadataCache/pending/count": [], + "segment/metadataCache/pending/deleted": [], + "segment/metadataCache/pending/skipped": [], + "segment/metadataCache/pending/updated": [], + "segment/metadataCache/skipped": [], + "segment/metadataCache/sync/time": [], + "segment/metadataCache/transactions/readOnly": [], + "segment/metadataCache/transactions/readWrite": [], + "segment/metadataCache/transactions/writeOnly": [], + "segment/metadataCache/used/count": [], + "segment/metadataCache/used/stale": [], + "segment/metadataCache/used/updated": [], "segment/moveSkipped/count": [], "segment/moved/bytes": [], "segment/moved/count": [], "segment/nuked/bytes": [], "segment/overShadowed/count": [], + "segment/pending/count": [], "segment/pendingDelete": [], + "segment/rowCount/avg": [], + "segment/rowCount/range/count": [], "segment/scan/active": [], "segment/scan/pending": [], + "segment/schemaCache/backfill/count": [], + "segment/schemaCache/dataSource/removed": [], + "segment/schemaCache/deepStorageOnly/count": [], + "segment/schemaCache/deepStorageOnly/refresh/time": [], + "segment/schemaCache/pendingBackfill/count": [], + "segment/schemaCache/poll/count": [], + "segment/schemaCache/poll/failed": [], + "segment/schemaCache/realtime/count": [], + "segment/schemaCache/refresh/count": [], + "segment/schemaCache/refresh/time": [], + "segment/schemaCache/refreshSkipped/count": [], + "segment/schemaCache/rowSignature/changed": [], + "segment/schemaCache/rowSignature/column/count": [], + "segment/schemaCache/used/count": [], + "segment/schemaCache/usedFingerprint/count": [], "segment/size": [], "segment/skipCompact/bytes": [], "segment/skipCompact/count": [], @@ -178,6 +214,8 @@ "segment/unneeded/count": [], "segment/unneededEternityTombstone/count": [], "segment/used": [], + "segment/used/count": [], + "segment/used/deepStorageOnly/count": [], "segment/usedPercent": [], "segment/waitCompact/bytes": [], "segment/waitCompact/count": [], @@ -185,10 +223,16 @@ "serverview/init/time": [], "serverview/sync/healthy": [], "serverview/sync/unstableTime": [], - "service/heartbeat": [], "sqlQuery/bytes": [], "sqlQuery/planningTimeMs": [], "sqlQuery/time": [], + "subquery/byteLimit/count": [], + "subquery/bytes": [], + "subquery/fallback/count": [], + "subquery/fallback/insufficientType/count": [], + "subquery/fallback/unknownReason/count": [], + "subquery/rowLimit/count": [], + "subquery/rows": [], "sys/cpu": [], "sys/disk/queue": [], "sys/disk/read/count": [], @@ -234,6 +278,7 @@ "task/action/run/time": [], "task/action/success/count": [], "task/autoScaler/requiredCount": [], + "task/autoScaler/scaleActionTime": [], "task/failed/count": [], "task/pending/count": [], "task/pending/time": [], @@ -242,10 +287,24 @@ "task/segmentAvailability/wait/time": [], "task/success/count": [], "task/waiting/count": [], + "task/waiting/time": [], + "taskSlot/blacklisted/count": [], + "taskSlot/idle/count": [], + "taskSlot/lazy/count": [], + "taskSlot/total/count": [], + "taskSlot/used/count": [], "tier/historical/count": [], "tier/replication/factor": [], "tier/required/capacity": [], "tier/total/capacity": [], + "worker/task/assigned/count": [], + "worker/task/completed/count": [], + "worker/task/failed/count": [], + "worker/task/running/count": [], + "worker/task/success/count": [], + "worker/taskSlot/idle/count": [], + "worker/taskSlot/total/count": [], + "worker/taskSlot/used/count": [], "zk/connected": [], "zk/reconnect/time": [] -} \ No newline at end of file +} diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfigTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfigTest.java index ebc213303db4..46e1e42699fb 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfigTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterConfigTest.java @@ -136,6 +136,21 @@ public void testSettingEverythingLegacy() Assert.assertEquals(100, config.getMinHttpTimeoutMillis()); } + @Test + public void testSettingMetricFilteringLegacy() + { + final Properties props = new Properties(); + props.setProperty("org.apache.druid.java.util.emitter.http.url", "http://example.com/"); + props.setProperty("org.apache.druid.java.util.emitter.shouldFilterMetrics", "true"); + props.setProperty("org.apache.druid.java.util.emitter.metricSpecPath", "/tmp/http-metrics.json"); + + final ObjectMapper objectMapper = new ObjectMapper(); + final HttpEmitterConfig config = objectMapper.convertValue(Emitters.makeHttpMap(props), HttpEmitterConfig.class); + + Assert.assertTrue(config.isShouldFilterMetrics()); + Assert.assertEquals("/tmp/http-metrics.json", config.getMetricSpecPath().orElse(null)); + } + @Test public void testMemoryLimits() { diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java index 987de0b3c5e7..f8369176c635 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.primitives.Ints; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.emitter.service.UnitEvent; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; import org.asynchttpclient.Response; @@ -30,11 +32,13 @@ import java.io.IOException; import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; public class HttpPostEmitterTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() { @Override @@ -95,4 +99,45 @@ public void testRecoveryEmitAndReturnBatch() Assert.assertEquals(2, emitter.getTotalEmittedEvents()); } } + + @Test + public void testFilteringAllowsConfiguredMetricAndNonMetricEvent() throws Exception + { + final HttpEmitterConfig.Builder builder = new HttpEmitterConfig.Builder("http://foo.bar") + .setFlushMillis(100) + .setFlushCount(4) + .setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS) + .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS) + .setMaxBatchSize(1024 * 1024) + .setBatchQueueSizeLimit(1000); + builder.setShouldFilterMetrics(true); + + final HttpEmitterConfig config = builder.build(); + final List payloads = new ArrayList<>(); + httpClient.setGoHandler( + new GoHandler() + { + @Override + protected ListenableFuture go(final Request request) + { + payloads.add(StandardCharsets.UTF_8.decode(request.getByteBufferData().slice()).toString()); + return GoHandlers.immediateFuture(EmitterTest.okResponse()); + } + } + ); + + try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper())) { + emitter.start(); + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 200).build("test", "localhost")); + emitter.emit(new UnitEvent("alerts", 1)); + emitter.flush(); + + final String payload = String.join("", payloads); + Assert.assertTrue(payload.contains("\"metric\":\"query/time\"")); + Assert.assertTrue(payload.contains("\"feed\":\"alerts\"")); + Assert.assertFalse(payload.contains("\"metric\":\"some/unlisted/metric\"")); + Assert.assertEquals(2, emitter.getTotalEmittedEvents()); + } + } } diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterAllowlistConfigTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterAllowlistConfigTest.java new file mode 100644 index 000000000000..e266572c1a7e --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterAllowlistConfigTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +public class LoggingEmitterAllowlistConfigTest +{ + @Test + public void testUsesDefaultAllowlistWhenCustomPathIsNotProvided() + { + final Map configMap = new HashMap<>(); + configMap.put("shouldFilterMetrics", true); + final LoggingEmitterConfig config = new ObjectMapper().convertValue(configMap, LoggingEmitterConfig.class); + + final LoggingEmitter emitter = new LoggingEmitter(config, new ObjectMapper()); + Assert.assertNotNull(emitter); + Assert.assertTrue(emitter.getAllowedMetricNames().contains("jvm/gc/cpu")); + Assert.assertTrue(emitter.getAllowedMetricNames().contains("query/time")); + } + + @Test + public void testReadsCustomAllowlistAsMetricObject() throws IOException + { + final Path allowlist = Files.createTempFile("allowlist-object", ".json"); + Files.writeString(allowlist, "{\"jvm/gc/cpu\": [], \"jvm/gc/count\": []}"); + + final Map configMap = new HashMap<>(); + configMap.put("shouldFilterMetrics", true); + configMap.put("metricSpecPath", allowlist.toAbsolutePath().toString()); + final LoggingEmitterConfig config = new ObjectMapper().convertValue(configMap, LoggingEmitterConfig.class); + + final LoggingEmitter emitter = new LoggingEmitter(config, new ObjectMapper()); + Assert.assertNotNull(emitter); + Assert.assertEquals(2, emitter.getAllowedMetricNames().size()); + Assert.assertTrue(emitter.getAllowedMetricNames().contains("jvm/gc/cpu")); + Assert.assertTrue(emitter.getAllowedMetricNames().contains("jvm/gc/count")); + Assert.assertFalse(emitter.getAllowedMetricNames().contains("query/time")); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java index b7189699b61d..bda6314320da 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java @@ -38,8 +38,8 @@ public void testDefaults() ); Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(), config.getLoggerClass()); Assert.assertEquals("getLogLevel", "info", config.getLogLevel()); - Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics()); - Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath()); + Assert.assertFalse("isShouldFilterMetrics", config.isShouldFilterMetrics()); + Assert.assertFalse("getMetricSpecPath", config.getMetricSpecPath().isPresent()); } @Test @@ -54,8 +54,8 @@ public void testDefaultsLegacy() Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(), config.getLoggerClass()); Assert.assertEquals("getLogLevel", "debug", config.getLogLevel()); - Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics()); - Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath()); + Assert.assertFalse("isShouldFilterMetrics", config.isShouldFilterMetrics()); + Assert.assertFalse("getMetricSpecPath", config.getMetricSpecPath().isPresent()); } @Test @@ -65,7 +65,7 @@ public void testSettingEverything() props.setProperty("org.apache.druid.java.util.emitter.loggerClass", "Foo"); props.setProperty("org.apache.druid.java.util.emitter.logLevel", "INFO"); props.setProperty("org.apache.druid.java.util.emitter.shouldFilterMetrics", "true"); - props.setProperty("org.apache.druid.java.util.emitter.allowedMetricsPath", "/tmp/allowedMetrics.json"); + props.setProperty("org.apache.druid.java.util.emitter.metricSpecPath", "/tmp/logging-metrics.json"); final ObjectMapper objectMapper = new ObjectMapper(); final LoggingEmitterConfig config = objectMapper.convertValue( @@ -75,8 +75,8 @@ public void testSettingEverything() Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass()); Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel()); - Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics()); - Assert.assertEquals("getAllowedMetricsPath", "/tmp/allowedMetrics.json", config.getAllowedMetricsPath()); + Assert.assertTrue("isShouldFilterMetrics", config.isShouldFilterMetrics()); + Assert.assertEquals("getMetricSpecPath", "/tmp/logging-metrics.json", config.getMetricSpecPath().orElse(null)); } @Test @@ -86,7 +86,7 @@ public void testSettingEverythingLegacy() props.setProperty("org.apache.druid.java.util.emitter.logging.class", "Foo"); props.setProperty("org.apache.druid.java.util.emitter.logging.level", "INFO"); props.setProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics", "true"); - props.setProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath", "/custom/path.json"); + props.setProperty("org.apache.druid.java.util.emitter.logging.metricSpecPath", "/tmp/logging-metrics.json"); final ObjectMapper objectMapper = new ObjectMapper(); final LoggingEmitterConfig config = objectMapper.convertValue( @@ -96,7 +96,7 @@ public void testSettingEverythingLegacy() Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass()); Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel()); - Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics()); - Assert.assertEquals("getAllowedMetricsPath", "/custom/path.json", config.getAllowedMetricsPath()); + Assert.assertTrue("isShouldFilterMetrics", config.isShouldFilterMetrics()); + Assert.assertEquals("getMetricSpecPath", "/tmp/logging-metrics.json", config.getMetricSpecPath().orElse(null)); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java index b29cf44ef41b..23d55d4b9b8b 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java @@ -21,10 +21,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.UnitEvent; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -47,7 +47,6 @@ public class LoggingEmitterTest private List serializedObjects; private ObjectMapper trackingMapper; - private LoggingEmitter emitter; @Before public void setUp() @@ -71,7 +70,7 @@ public String writeValueAsString(Object value) throws JsonProcessingException private LoggingEmitter createEmitter(boolean shouldFilterMetrics, String allowedMetricsPath) { - emitter = new LoggingEmitter( + final LoggingEmitter emitter = new LoggingEmitter( new Logger(LoggingEmitter.class), LoggingEmitter.Level.WARN, trackingMapper, @@ -82,28 +81,19 @@ private LoggingEmitter createEmitter(boolean shouldFilterMetrics, String allowed return emitter; } - @After - public void tearDown() - { - if (emitter != null) { - emitter.close(); - emitter = null; - } - } - /** * Without filtering enabled, the emitter should log all events (backward compatibility). */ @Test public void testEmitAllWhenFilteringDisabled() { - createEmitter(false, null); - - emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); - emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); - emitter.emit(ServiceMetricEvent.builder().setMetric("some/random/metric", 1).build("test", "localhost")); + try (LoggingEmitter emitter = createEmitter(false, null)) { + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("some/random/metric", 1).build("test", "localhost")); - Assert.assertEquals("All events should be serialized (logged)", 3, serializedObjects.size()); + Assert.assertEquals("All events should be serialized (logged)", 3, serializedObjects.size()); + } } /** @@ -114,14 +104,12 @@ public void testEmitAllWhenFilteringDisabled() @Test public void testFilterWithDefaultResource() { - createEmitter(true, null); - - // "query/time" is in the default allowed metrics list - emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); - // "some/unlisted/metric" is NOT in the default list - emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 1).build("test", "localhost")); + try (LoggingEmitter emitter = createEmitter(true, null)) { + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 1).build("test", "localhost")); - Assert.assertEquals("Only the allowed metric should be serialized", 1, serializedObjects.size()); + Assert.assertEquals("Only the allowed metric should be serialized", 1, serializedObjects.size()); + } } /** @@ -131,13 +119,13 @@ public void testFilterWithDefaultResource() public void testFilterWithCustomFilePath() throws IOException { final File allowFile = createAllowlistFile("{\"query/time\": [], \"query/bytes\": []}"); - createEmitter(true, allowFile.getAbsolutePath()); - - emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); - emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); - emitter.emit(ServiceMetricEvent.builder().setMetric("query/bytes", 2048).build("test", "localhost")); + try (LoggingEmitter emitter = createEmitter(true, allowFile.getAbsolutePath())) { + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("query/bytes", 2048).build("test", "localhost")); - Assert.assertEquals("Only allowed metrics should be serialized", 2, serializedObjects.size()); + Assert.assertEquals("Only allowed metrics should be serialized", 2, serializedObjects.size()); + } } /** @@ -148,12 +136,11 @@ public void testFilterWithCustomFilePath() throws IOException public void testNonMetricEventsAlwaysPassThrough() throws IOException { final File allowFile = createAllowlistFile("{\"query/time\": []}"); - createEmitter(true, allowFile.getAbsolutePath()); - - // This is NOT a ServiceMetricEvent, so it should bypass the allowlist filter - emitter.emit(new UnitEvent("alerts", 42)); + try (LoggingEmitter emitter = createEmitter(true, allowFile.getAbsolutePath())) { + emitter.emit(new UnitEvent("alerts", 42)); - Assert.assertEquals("Non-metric events should bypass the allowlist filter", 1, serializedObjects.size()); + Assert.assertEquals("Non-metric events should bypass the allowlist filter", 1, serializedObjects.size()); + } } /** @@ -161,15 +148,9 @@ public void testNonMetricEventsAlwaysPassThrough() throws IOException * to the default classpath resource and emits successfully. */ @Test - public void testMissingCustomPathFallsBackToDefault() + public void testMissingCustomPathThrows() { - createEmitter(true, "/nonexistent/path/to/allowedMetrics.json"); - - // Fallback to default should allow "query/time" (in default list) and drop unlisted metrics - emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); - emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 1).build("test", "localhost")); - - Assert.assertEquals("Fallback to default should allow listed metrics only", 1, serializedObjects.size()); + Assert.assertThrows(DruidException.class, () -> createEmitter(true, "/nonexistent/path/to/allowedMetrics.json")); } /** @@ -179,12 +160,12 @@ public void testMissingCustomPathFallsBackToDefault() public void testEmptyAllowlistBlocksAllMetrics() throws IOException { final File allowFile = createAllowlistFile("{}"); - createEmitter(true, allowFile.getAbsolutePath()); + try (LoggingEmitter emitter = createEmitter(true, allowFile.getAbsolutePath())) { + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(new UnitEvent("alerts", 42)); - emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); - emitter.emit(new UnitEvent("alerts", 42)); - - Assert.assertEquals("Only non-metric event should pass through", 1, serializedObjects.size()); + Assert.assertEquals("Only non-metric event should pass through", 1, serializedObjects.size()); + } } /** @@ -194,12 +175,12 @@ public void testEmptyAllowlistBlocksAllMetrics() throws IOException public void testFilterDisabledIgnoresPath() throws IOException { final File allowFile = createAllowlistFile("{\"query/time\": []}"); - createEmitter(false, allowFile.getAbsolutePath()); + try (LoggingEmitter emitter = createEmitter(false, allowFile.getAbsolutePath())) { + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); - emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); - emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); - - Assert.assertEquals("All events should pass when filtering is disabled", 2, serializedObjects.size()); + Assert.assertEquals("All events should pass when filtering is disabled", 2, serializedObjects.size()); + } } private File createAllowlistFile(String jsonContent) throws IOException diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParsersTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParsersTest.java new file mode 100644 index 000000000000..eea9300e6a4f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/MetricAllowlistParsersTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; + +public class MetricAllowlistParsersTest +{ + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + public void testParseMetricNameObject() + throws Exception + { + final JsonNode jsonNode = MAPPER.readTree("{\"query/time\":[],\"jvm/gc/cpu\":[]}"); + + final Set metricNames = MetricAllowlistParsers.parseMetricNameObject(jsonNode, "test-source"); + + Assert.assertEquals(2, metricNames.size()); + Assert.assertTrue(metricNames.contains("query/time")); + Assert.assertTrue(metricNames.contains("jvm/gc/cpu")); + } + + @Test + public void testParseMetricNameObjectRequiresJsonObject() + throws Exception + { + final JsonNode jsonNode = MAPPER.readTree("[\"query/time\"]"); + + final DruidException exception = Assert.assertThrows( + DruidException.class, + () -> MetricAllowlistParsers.parseMetricNameObject(jsonNode, "test-source") + ); + + Assert.assertEquals( + "Metric allowlist file [test-source] must be a JSON object of metric names.", + exception.getMessage() + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterTest.java index c5dc26432fe0..b41e5faee0cf 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/ParametrizedUriEmitterTest.java @@ -20,10 +20,12 @@ package org.apache.druid.java.util.emitter.core; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.UnitEvent; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; @@ -62,6 +64,11 @@ public void tearDown() } private Emitter parametrizedEmmiter(String uriPattern) throws Exception + { + return parametrizedEmmiter(uriPattern, ImmutableMap.of()); + } + + private Emitter parametrizedEmmiter(String uriPattern, Map extraProps) throws Exception { final Properties props = new Properties(); props.setProperty("org.apache.druid.java.util.emitter.type", "parametrized"); @@ -70,6 +77,9 @@ private Emitter parametrizedEmmiter(String uriPattern) throws Exception "org.apache.druid.java.util.emitter.httpEmitting.flushTimeOut", String.valueOf(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS) ); + for (Map.Entry entry : extraProps.entrySet()) { + props.setProperty(entry.getKey(), entry.getValue()); + } lifecycle = new Lifecycle(); Emitter emitter = Emitters.create(props, httpClient, lifecycle); Assert.assertEquals(ParametrizedUriEmitter.class, emitter.getClass()); @@ -219,4 +229,43 @@ public void failEmitMalformedEvent() throws Exception ); } } + + @Test + public void testFilteringAllowsConfiguredMetricAndNonMetricEvent() throws Exception + { + final Emitter emitter = parametrizedEmmiter( + "http://example.com/{feed}", + ImmutableMap.of( + "org.apache.druid.java.util.emitter.shouldFilterMetrics", "true" + ) + ); + + final Map requests = new HashMap<>(); + httpClient.setGoHandler( + new GoHandler() + { + @Override + protected ListenableFuture go(final Request request) throws Exception + { + requests.put( + request.getUrl(), + JSON_MAPPER.readTree(StandardCharsets.UTF_8.decode(request.getByteBufferData().slice()).toString()) + ); + return GoHandlers.immediateFuture(EmitterTest.okResponse()); + } + }.times(2) + ); + + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 200).build("test", "localhost")); + emitter.emit(new UnitEvent("alerts", 1)); + emitter.flush(); + + Assert.assertTrue(httpClient.succeeded()); + final JsonNode metricsPayload = requests.get("http://example.com/metrics"); + Assert.assertNotNull(metricsPayload); + Assert.assertEquals(1, metricsPayload.size()); + Assert.assertEquals("query/time", metricsPayload.get(0).get("metric").asText()); + Assert.assertNotNull(requests.get("http://example.com/alerts")); + } }