diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f6ddebcb0b24..8df178323424 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -2019,6 +2019,8 @@ log4j config to route these logs to different sources based on the feed of the e |--------|-----------|--------| |`druid.emitter.logging.loggerClass`|The class used for logging.|`org.apache.druid.java.util.emitter.core.LoggingEmitter`| |`druid.emitter.logging.logLevel`|Choices: debug, info, warn, error. The log level at which message are logged.|info| +|`druid.emitter.logging.shouldFilterMetrics`|When true, only metrics listed in the allow list are emitted; non-metric events (e.g. alerts) are always emitted. When false, all events are logged (backward-compatible).|false| +|`druid.emitter.logging.allowedMetricsPath`|Path to a JSON file whose keys are the allowed metric names. Only used when `shouldFilterMetrics` is true. If null or empty, the bundled classpath resource `loggingEmitterAllowedMetrics.json` is used. If a path is set but the file is missing, a warning is logged and the emitter falls back to the default classpath resource.|null| #### HTTP emitter module diff --git a/processing/pom.xml b/processing/pom.xml index dcc9da8a5de6..3bf60b04387d 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -531,6 +531,9 @@ + + src/main/resources + ${project.build.directory}/hyperic-sigar-${sigar.base.version}/sigar-bin/lib diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java index 00d424b88fc4..217b24831749 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/Emitters.java @@ -140,6 +140,16 @@ static Map makeLoggingMap(Properties props) loggingMap.put( "logLevel", props.getProperty("org.apache.druid.java.util.emitter.logging.level", "debug") ); + if (props.containsKey("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics")) { + loggingMap.put( + "shouldFilterMetrics", Boolean.parseBoolean(props.getProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics")) + ); + } + if (props.containsKey("org.apache.druid.java.util.emitter.logging.allowedMetricsPath")) { + loggingMap.put( + "allowedMetricsPath", props.getProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath") + ); + } return loggingMap; } diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java index afc8e07e2c28..e53977adc8ca 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitter.java @@ -22,13 +22,26 @@ /** */ +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.slf4j.MarkerFactory; +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,22 +49,101 @@ */ public class LoggingEmitter implements Emitter { + private static final Logger LOGGER = new Logger(LoggingEmitter.class); + private static final String DEFAULT_ALLOWED_METRICS_RESOURCE = "loggingEmitterAllowedMetrics.json"; + private final Logger log; private final Level level; private final ObjectMapper jsonMapper; + @Nullable + private final Set allowedMetrics; private final AtomicBoolean started = new AtomicBoolean(false); public LoggingEmitter(LoggingEmitterConfig config, ObjectMapper jsonMapper) { - this(new Logger(config.getLoggerClass()), Level.toLevel(config.getLogLevel()), jsonMapper); + this( + new Logger(config.getLoggerClass()), + Level.toLevel(config.getLogLevel()), + jsonMapper, + config.shouldFilterMetrics(), + config.getAllowedMetricsPath() + ); } public LoggingEmitter(Logger log, Level level, ObjectMapper jsonMapper) + { + this(log, level, jsonMapper, false, null); + } + + public LoggingEmitter( + Logger log, + Level level, + ObjectMapper jsonMapper, + boolean shouldFilterMetrics, + @Nullable String allowedMetricsPath + ) { this.log = log; this.level = level; this.jsonMapper = jsonMapper; + this.allowedMetrics = shouldFilterMetrics ? loadAllowedMetrics(allowedMetricsPath, jsonMapper) : null; + } + + /** + * Loads the allowed metric names from a JSON file. If the path is null or empty, + * loads from the bundled classpath resource (loggingEmitterAllowedMetrics.json). + * If a custom path is provided but the file is missing, logs a warning and falls + * back to the default classpath resource. + */ + private static Set loadAllowedMetrics(@Nullable String path, ObjectMapper jsonMapper) + { + final InputStream is = openAllowedMetricsStream(path); + try { + final Map metricsMap = jsonMapper.readValue(is, new TypeReference>() {}); + return Collections.unmodifiableSet(metricsMap.keySet()); + } + catch (IOException e) { + final String source = path == null || Strings.isNullOrEmpty(path) ? DEFAULT_ALLOWED_METRICS_RESOURCE : path; + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Allowed metrics file must be a JSON object with metric names as keys; failed to parse [%s]", source); + } + } + + /** + * Opens the allowed metrics configuration stream. Uses the bundled + * loggingEmitterAllowedMetrics.json classpath resource when path is null/empty. + * When a custom path is specified but the file is missing, logs a warning and + * falls back to the default classpath resource. + */ + private static InputStream openAllowedMetricsStream(@Nullable String path) + { + if (Strings.isNullOrEmpty(path)) { + LOGGER.info("Using default allowed metrics configuration from classpath resource [%s]", DEFAULT_ALLOWED_METRICS_RESOURCE); + return openDefaultAllowedMetricsResource(); + } + try { + final InputStream is = new FileInputStream(new File(path)); + LOGGER.info("Using allowed metrics configuration at [%s]", path); + return is; + } + catch (FileNotFoundException e) { + LOGGER.warn(e, "Allowed metrics file [%s] not found, falling back to default classpath resource [%s]", + path, DEFAULT_ALLOWED_METRICS_RESOURCE); + return openDefaultAllowedMetricsResource(); + } + } + + private static InputStream openDefaultAllowedMetricsResource() + { + final InputStream is = LoggingEmitter.class.getClassLoader().getResourceAsStream(DEFAULT_ALLOWED_METRICS_RESOURCE); + if (is == null) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.NOT_FOUND) + .build("Could not find default allowed metrics resource [%s] on classpath", DEFAULT_ALLOWED_METRICS_RESOURCE); + } + return is; } @Override @@ -95,6 +187,16 @@ public void emit(Event event) throw new RejectedExecutionException("Service not started."); } } + + // Allowlist filtering: only applies to ServiceMetricEvents. + // Non-metric events (alerts, etc.) always pass through. + if (allowedMetrics != null && event instanceof ServiceMetricEvent) { + final String metricName = ((ServiceMetricEvent) event).getMetric(); + if (!allowedMetrics.contains(metricName)) { + return; + } + } + try { switch (level) { case TRACE: diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java index 39b85126f773..8f7be9f6bdb1 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; import javax.validation.constraints.NotNull; /** @@ -35,6 +36,26 @@ public class LoggingEmitterConfig @JsonProperty private String logLevel = "info"; + /** + * When true, only metrics listed in the allowed metrics configuration are emitted. + * If {@link #allowedMetricsPath} is null/empty, the bundled default allowlist + * (loggingEmitterAllowedMetrics.json on the classpath) is used. If a path is + * provided, it is loaded from that file instead. + * Defaults to false (emit all metrics, backward-compatible behavior). + */ + @JsonProperty("shouldFilterMetrics") + private boolean shouldFilterMetrics = false; + + /** + * Optional path to a JSON file containing an array of allowed metric names. + * Only used when {@link #shouldFilterMetrics} is true. + * If null or empty, the bundled default resource (loggingEmitterAllowedMetrics.json) + * is loaded from the classpath. + */ + @JsonProperty + @Nullable + private String allowedMetricsPath = null; + public String getLoggerClass() { return loggerClass; @@ -45,12 +66,25 @@ public String getLogLevel() return logLevel; } + public boolean shouldFilterMetrics() + { + return shouldFilterMetrics; + } + + @Nullable + public String getAllowedMetricsPath() + { + return allowedMetricsPath; + } + @Override public String toString() { return "LoggingEmitterConfig{" + "loggerClass='" + loggerClass + '\'' + ", logLevel='" + logLevel + '\'' + + ", shouldFilterMetrics=" + shouldFilterMetrics + + ", allowedMetricsPath='" + allowedMetricsPath + '\'' + '}'; } } diff --git a/processing/src/main/resources/loggingEmitterAllowedMetrics.json b/processing/src/main/resources/loggingEmitterAllowedMetrics.json new file mode 100644 index 000000000000..ea6c2fe6b5ff --- /dev/null +++ b/processing/src/main/resources/loggingEmitterAllowedMetrics.json @@ -0,0 +1,249 @@ +{ + "compact/segmentAnalyzer/fetchAndProcessMillis": [], + "compact/task/count": [], + "compactTask/availableSlot/count": [], + "compactTask/maxSlot/count": [], + "coordinator/global/time": [], + "coordinator/time": [], + "groupBy/maxMergeDictionarySize": [], + "groupBy/maxSpilledBytes": [], + "groupBy/mergeDictionarySize": [], + "groupBy/spilledBytes": [], + "groupBy/spilledQueries": [], + "ingest/count": [], + "ingest/events/duplicate": [], + "ingest/events/messageGap": [], + "ingest/events/processed": [], + "ingest/events/processedWithError": [], + "ingest/events/thrownAway": [], + "ingest/events/unparseable": [], + "ingest/handoff/count": [], + "ingest/handoff/failed": [], + "ingest/handoff/time": [], + "ingest/input/bytes": [], + "ingest/kafka/avgLag": [], + "ingest/kafka/lag": [], + "ingest/kafka/maxLag": [], + "ingest/kafka/partitionLag": [], + "ingest/merge/cpu": [], + "ingest/merge/time": [], + "ingest/notices/queueSize": [], + "ingest/notices/time": [], + "ingest/pause/time": [], + "ingest/persists/backPressure": [], + "ingest/persists/count": [], + "ingest/persists/cpu": [], + "ingest/persists/failed": [], + "ingest/persists/time": [], + "ingest/rows/output": [], + "ingest/segments/count": [], + "ingest/sink/count": [], + "ingest/tombstones/count": [], + "interval/compacted/count": [], + "interval/skipCompact/count": [], + "interval/waitCompact/count": [], + "jetty/numOpenConnections": [], + "jetty/threadPool/busy": [], + "jetty/threadPool/idle": [], + "jetty/threadPool/isLowOnThreads": [], + "jetty/threadPool/max": [], + "jetty/threadPool/min": [], + "jetty/threadPool/queueSize": [], + "jetty/threadPool/ready": [], + "jetty/threadPool/total": [], + "jetty/threadPool/utilizationRate": [], + "jetty/threadPool/utilized": [], + "jvm/bufferpool/capacity": [], + "jvm/bufferpool/count": [], + "jvm/bufferpool/used": [], + "jvm/gc/count": [], + "jvm/gc/cpu": [], + "jvm/mem/committed": [], + "jvm/mem/init": [], + "jvm/mem/max": [], + "jvm/mem/used": [], + "jvm/pool/committed": [], + "jvm/pool/init": [], + "jvm/pool/max": [], + "jvm/pool/used": [], + "kafka/consumer/bytesConsumed": [], + "kafka/consumer/fetch": [], + "kafka/consumer/fetchLatencyAvg": [], + "kafka/consumer/fetchLatencyMax": [], + "kafka/consumer/fetchRate": [], + "kafka/consumer/fetchSizeAvg": [], + "kafka/consumer/fetchSizeMax": [], + "kafka/consumer/incomingBytes": [], + "kafka/consumer/outgoingBytes": [], + "kafka/consumer/recordsConsumed": [], + "kafka/consumer/recordsLag": [], + "kafka/consumer/recordsPerRequestAvg": [], + "kill/eligibleUnusedSegments/count": [], + "kill/pendingSegments/count": [], + "kill/task/count": [], + "killTask/availableSlot/count": [], + "killTask/maxSlot/count": [], + "mergeBuffer/acquisitionTimeNs": [], + "mergeBuffer/maxAcquisitionTimeNs": [], + "mergeBuffer/pendingRequests": [], + "mergeBuffer/queries": [], + "mergeBuffer/used": [], + "metadata/kill/audit/count": [], + "metadata/kill/compaction/count": [], + "metadata/kill/datasource/count": [], + "metadata/kill/rule/count": [], + "metadata/kill/supervisor/count": [], + "metadatacache/backfill/count": [], + "metadatacache/init/time": [], + "metadatacache/refresh/count": [], + "metadatacache/refresh/time": [], + "metadatacache/schemaPoll/count": [], + "metadatacache/schemaPoll/failed": [], + "metadatacache/schemaPoll/time": [], + "query/bytes": [], + "query/cache/delta/averageBytes": [], + "query/cache/delta/errors": [], + "query/cache/delta/evictions": [], + "query/cache/delta/hitRate": [], + "query/cache/delta/hits": [], + "query/cache/delta/misses": [], + "query/cache/delta/numEntries": [], + "query/cache/delta/put/error": [], + "query/cache/delta/put/ok": [], + "query/cache/delta/put/oversized": [], + "query/cache/delta/sizeBytes": [], + "query/cache/delta/timeouts": [], + "query/cache/total/averageBytes": [], + "query/cache/total/errors": [], + "query/cache/total/evictions": [], + "query/cache/total/hitRate": [], + "query/cache/total/hits": [], + "query/cache/total/misses": [], + "query/cache/total/numEntries": [], + "query/cache/total/put/error": [], + "query/cache/total/put/ok": [], + "query/cache/total/put/oversized": [], + "query/cache/total/sizeBytes": [], + "query/cache/total/timeouts": [], + "query/count": [], + "query/cpu/time": [], + "query/failed/count": [], + "query/interrupted/count": [], + "query/node/bytes": [], + "query/node/time": [], + "query/node/ttfb": [], + "query/segment/time": [], + "query/segmentAndCache/time": [], + "query/success/count": [], + "query/time": [], + "query/timeout/count": [], + "query/wait/time": [], + "schemacache/finalizedSchemaPayload/count": [], + "schemacache/finalizedSegmentMetadata/count": [], + "schemacache/inTransitSMQPublishedResults/count": [], + "schemacache/inTransitSMQResults/count": [], + "schemacache/realtime/count": [], + "segment/added/bytes": [], + "segment/assignSkipped/count": [], + "segment/assigned/count": [], + "segment/availableDeepStorageOnly/count": [], + "segment/compacted/bytes": [], + "segment/compacted/count": [], + "segment/count": [], + "segment/deleted/count": [], + "segment/dropQueue/count": [], + "segment/dropSkipped/count": [], + "segment/dropped/count": [], + "segment/loadQueue/assigned": [], + "segment/loadQueue/cancelled": [], + "segment/loadQueue/count": [], + "segment/loadQueue/failed": [], + "segment/loadQueue/size": [], + "segment/loadQueue/success": [], + "segment/max": [], + "segment/moveSkipped/count": [], + "segment/moved/bytes": [], + "segment/moved/count": [], + "segment/nuked/bytes": [], + "segment/overShadowed/count": [], + "segment/pendingDelete": [], + "segment/scan/active": [], + "segment/scan/pending": [], + "segment/size": [], + "segment/skipCompact/bytes": [], + "segment/skipCompact/count": [], + "segment/unavailable/count": [], + "segment/underReplicated/count": [], + "segment/unneeded/count": [], + "segment/unneededEternityTombstone/count": [], + "segment/used": [], + "segment/usedPercent": [], + "segment/waitCompact/bytes": [], + "segment/waitCompact/count": [], + "serverview/init/time": [], + "serverview/sync/healthy": [], + "serverview/sync/unstableTime": [], + "service/heartbeat": [], + "sqlQuery/bytes": [], + "sqlQuery/planningTimeMs": [], + "sqlQuery/time": [], + "sys/cpu": [], + "sys/disk/queue": [], + "sys/disk/read/count": [], + "sys/disk/read/size": [], + "sys/disk/transferTime": [], + "sys/disk/write/count": [], + "sys/disk/write/size": [], + "sys/fs/files/count": [], + "sys/fs/files/free": [], + "sys/fs/max": [], + "sys/fs/used": [], + "sys/mem/free": [], + "sys/mem/max": [], + "sys/mem/used": [], + "sys/net/read/dropped": [], + "sys/net/read/errors": [], + "sys/net/read/packets": [], + "sys/net/read/size": [], + "sys/net/write/collisions": [], + "sys/net/write/errors": [], + "sys/net/write/packets": [], + "sys/net/write/size": [], + "sys/storage/used": [], + "sys/swap/free": [], + "sys/swap/max": [], + "sys/swap/pageIn": [], + "sys/swap/pageOut": [], + "sys/tcpv4/activeOpens": [], + "sys/tcpv4/attemptFails": [], + "sys/tcpv4/estabResets": [], + "sys/tcpv4/in/errs": [], + "sys/tcpv4/in/segs": [], + "sys/tcpv4/out/rsts": [], + "sys/tcpv4/out/segs": [], + "sys/tcpv4/passiveOpens": [], + "sys/tcpv4/retrans/segs": [], + "sys/uptime": [], + "task/action/batch/attempts": [], + "task/action/batch/queueTime": [], + "task/action/batch/runTime": [], + "task/action/batch/size": [], + "task/action/failed/count": [], + "task/action/run/time": [], + "task/action/success/count": [], + "task/autoScaler/requiredCount": [], + "task/failed/count": [], + "task/pending/count": [], + "task/pending/time": [], + "task/run/time": [], + "task/running/count": [], + "task/segmentAvailability/wait/time": [], + "task/success/count": [], + "task/waiting/count": [], + "tier/historical/count": [], + "tier/replication/factor": [], + "tier/required/capacity": [], + "tier/total/capacity": [], + "zk/connected": [], + "zk/reconnect/time": [] +} \ No newline at end of file diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java index 9aa93d5d0f5a..b7189699b61d 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterConfigTest.java @@ -38,6 +38,8 @@ public void testDefaults() ); Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(), config.getLoggerClass()); Assert.assertEquals("getLogLevel", "info", config.getLogLevel()); + Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics()); + Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath()); } @Test @@ -52,6 +54,8 @@ public void testDefaultsLegacy() Assert.assertEquals("getLoggerClass", LoggingEmitter.class.getName(), config.getLoggerClass()); Assert.assertEquals("getLogLevel", "debug", config.getLogLevel()); + Assert.assertFalse("shouldFilterMetrics", config.shouldFilterMetrics()); + Assert.assertNull("getAllowedMetricsPath", config.getAllowedMetricsPath()); } @Test @@ -60,6 +64,8 @@ public void testSettingEverything() final Properties props = new Properties(); props.setProperty("org.apache.druid.java.util.emitter.loggerClass", "Foo"); props.setProperty("org.apache.druid.java.util.emitter.logLevel", "INFO"); + props.setProperty("org.apache.druid.java.util.emitter.shouldFilterMetrics", "true"); + props.setProperty("org.apache.druid.java.util.emitter.allowedMetricsPath", "/tmp/allowedMetrics.json"); final ObjectMapper objectMapper = new ObjectMapper(); final LoggingEmitterConfig config = objectMapper.convertValue( @@ -69,6 +75,8 @@ public void testSettingEverything() Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass()); Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel()); + Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics()); + Assert.assertEquals("getAllowedMetricsPath", "/tmp/allowedMetrics.json", config.getAllowedMetricsPath()); } @Test @@ -77,6 +85,8 @@ public void testSettingEverythingLegacy() final Properties props = new Properties(); props.setProperty("org.apache.druid.java.util.emitter.logging.class", "Foo"); props.setProperty("org.apache.druid.java.util.emitter.logging.level", "INFO"); + props.setProperty("org.apache.druid.java.util.emitter.logging.shouldFilterMetrics", "true"); + props.setProperty("org.apache.druid.java.util.emitter.logging.allowedMetricsPath", "/custom/path.json"); final ObjectMapper objectMapper = new ObjectMapper(); final LoggingEmitterConfig config = objectMapper.convertValue( @@ -86,5 +96,7 @@ public void testSettingEverythingLegacy() Assert.assertEquals("getLoggerClass", "Foo", config.getLoggerClass()); Assert.assertEquals("getLogLevel", "INFO", config.getLogLevel()); + Assert.assertTrue("shouldFilterMetrics", config.shouldFilterMetrics()); + Assert.assertEquals("getAllowedMetricsPath", "/custom/path.json", config.getAllowedMetricsPath()); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java new file mode 100644 index 000000000000..6aef81b04551 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/LoggingEmitterTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.emitter.service.UnitEvent; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +public class LoggingEmitterTest +{ + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private List serializedObjects; + private ObjectMapper trackingMapper; + private LoggingEmitter emitter; + + @Before + public void setUp() + { + serializedObjects = new ArrayList<>(); + // A custom ObjectMapper that records every object passed to writeValueAsString. + // This lets us detect which events actually reach the logging step (i.e., were NOT + // filtered out by the allowlist). We use Level.WARN because the WARN case in emit() + // calls writeValueAsString unconditionally (no isWarnEnabled guard), making it a + // reliable probe for whether an event passed the allowlist check. + trackingMapper = new ObjectMapper() + { + @Override + public String writeValueAsString(Object value) throws JsonProcessingException + { + serializedObjects.add(value); + return super.writeValueAsString(value); + } + }; + } + + private LoggingEmitter createEmitter(boolean shouldFilterMetrics, String allowedMetricsPath) + { + emitter = new LoggingEmitter( + new Logger(LoggingEmitter.class), + LoggingEmitter.Level.WARN, + trackingMapper, + shouldFilterMetrics, + allowedMetricsPath + ); + emitter.start(); + return emitter; + } + + @After + public void tearDown() + { + if (emitter != null) { + emitter.close(); + emitter = null; + } + } + + /** + * Without filtering enabled, the emitter should log all events (backward compatibility). + */ + @Test + public void testEmitAllWhenFilteringDisabled() + { + createEmitter(false, null); + + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("some/random/metric", 1).build("test", "localhost")); + + Assert.assertEquals("All events should be serialized (logged)", 3, serializedObjects.size()); + } + + /** + * With filtering enabled and no custom path, the default classpath resource + * (loggingEmitterAllowedMetrics.json) should be loaded. Metrics in the default + * list are emitted; unlisted metrics are dropped. + */ + @Test + public void testFilterWithDefaultResource() + { + createEmitter(true, null); + + // "query/time" is in the default allowed metrics list + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + // "some/unlisted/metric" is NOT in the default list + emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 1).build("test", "localhost")); + + Assert.assertEquals("Only the allowed metric should be serialized", 1, serializedObjects.size()); + } + + /** + * With filtering enabled and a custom file path, only metrics from that file are emitted. + */ + @Test + public void testFilterWithCustomFilePath() throws IOException + { + final File allowFile = createAllowlistFile("{\"query/time\": [], \"query/bytes\": []}"); + createEmitter(true, allowFile.getAbsolutePath()); + + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("query/bytes", 2048).build("test", "localhost")); + + Assert.assertEquals("Only allowed metrics should be serialized", 2, serializedObjects.size()); + } + + /** + * Non-metric events (like UnitEvent) should always pass through the filter, + * even when filtering is enabled. + */ + @Test + public void testNonMetricEventsAlwaysPassThrough() throws IOException + { + final File allowFile = createAllowlistFile("{\"query/time\": []}"); + createEmitter(true, allowFile.getAbsolutePath()); + + // This is NOT a ServiceMetricEvent, so it should bypass the allowlist filter + emitter.emit(new UnitEvent("alerts", 42)); + + Assert.assertEquals("Non-metric events should bypass the allowlist filter", 1, serializedObjects.size()); + } + + /** + * When a custom path is specified but the file is missing, the emitter falls back + * to the default classpath resource and emits successfully. + */ + @Test + public void testMissingCustomPathFallsBackToDefault() + { + createEmitter(true, "/nonexistent/path/to/allowedMetrics.json"); + + // Fallback to default should allow "query/time" (in default list) and drop unlisted metrics + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("some/unlisted/metric", 1).build("test", "localhost")); + + Assert.assertEquals("Fallback to default should allow listed metrics only", 1, serializedObjects.size()); + } + + /** + * An empty allowlist should block all metric events but still pass non-metric events. + */ + @Test + public void testEmptyAllowlistBlocksAllMetrics() throws IOException + { + final File allowFile = createAllowlistFile("{}"); + createEmitter(true, allowFile.getAbsolutePath()); + + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(new UnitEvent("alerts", 42)); + + Assert.assertEquals("Only non-metric event should pass through", 1, serializedObjects.size()); + } + + /** + * When shouldFilterMetrics is false, even if an allowedMetricsPath is provided, filtering is not applied. + */ + @Test + public void testFilterDisabledIgnoresPath() throws IOException + { + final File allowFile = createAllowlistFile("{\"query/time\": []}"); + createEmitter(false, allowFile.getAbsolutePath()); + + emitter.emit(ServiceMetricEvent.builder().setMetric("query/time", 100).build("test", "localhost")); + emitter.emit(ServiceMetricEvent.builder().setMetric("jvm/mem/used", 512).build("test", "localhost")); + + Assert.assertEquals("All events should pass when filtering is disabled", 2, serializedObjects.size()); + } + + private File createAllowlistFile(String jsonContent) throws IOException + { + final File file = tempFolder.newFile("allowedMetrics.json"); + try (Writer writer = new OutputStreamWriter(Files.newOutputStream(file.toPath()), StandardCharsets.UTF_8)) { + writer.write(jsonContent); + } + return file; + } +} diff --git a/processing/src/test/resources/loggingEmitterAllowedMetrics.json b/processing/src/test/resources/loggingEmitterAllowedMetrics.json new file mode 100644 index 000000000000..22a9d0329fc5 --- /dev/null +++ b/processing/src/test/resources/loggingEmitterAllowedMetrics.json @@ -0,0 +1 @@ +{"compact/segmentAnalyzer/fetchAndProcessMillis":[],"compact/task/count":[],"compactTask/availableSlot/count":[],"compactTask/maxSlot/count":[],"coordinator/global/time":[],"coordinator/time":[],"groupBy/maxMergeDictionarySize":[],"groupBy/maxSpilledBytes":[],"groupBy/mergeDictionarySize":[],"groupBy/spilledBytes":[],"groupBy/spilledQueries":[],"ingest/count":[],"ingest/events/duplicate":[],"ingest/events/messageGap":[],"ingest/events/processed":[],"ingest/events/processedWithError":[],"ingest/events/thrownAway":[],"ingest/events/unparseable":[],"ingest/handoff/count":[],"ingest/handoff/failed":[],"ingest/handoff/time":[],"ingest/input/bytes":[],"ingest/kafka/avgLag":[],"ingest/kafka/lag":[],"ingest/kafka/maxLag":[],"ingest/kafka/partitionLag":[],"ingest/merge/cpu":[],"ingest/merge/time":[],"ingest/notices/queueSize":[],"ingest/notices/time":[],"ingest/pause/time":[],"ingest/persists/backPressure":[],"ingest/persists/count":[],"ingest/persists/cpu":[],"ingest/persists/failed":[],"ingest/persists/time":[],"ingest/rows/output":[],"ingest/segments/count":[],"ingest/sink/count":[],"ingest/tombstones/count":[],"interval/compacted/count":[],"interval/skipCompact/count":[],"interval/waitCompact/count":[],"jetty/numOpenConnections":[],"jetty/threadPool/busy":[],"jetty/threadPool/idle":[],"jetty/threadPool/isLowOnThreads":[],"jetty/threadPool/max":[],"jetty/threadPool/min":[],"jetty/threadPool/queueSize":[],"jetty/threadPool/ready":[],"jetty/threadPool/total":[],"jetty/threadPool/utilizationRate":[],"jetty/threadPool/utilized":[],"jvm/bufferpool/capacity":[],"jvm/bufferpool/count":[],"jvm/bufferpool/used":[],"jvm/gc/count":[],"jvm/gc/cpu":[],"jvm/mem/committed":[],"jvm/mem/init":[],"jvm/mem/max":[],"jvm/mem/used":[],"jvm/pool/committed":[],"jvm/pool/init":[],"jvm/pool/max":[],"jvm/pool/used":[],"kafka/consumer/bytesConsumed":[],"kafka/consumer/fetch":[],"kafka/consumer/fetchLatencyAvg":[],"kafka/consumer/fetchLatencyMax":[],"kafka/consumer/fetchRate":[],"kafka/consumer/fetchSizeAvg":[],"kafka/consumer/fetchSizeMax":[],"kafka/consumer/incomingBytes":[],"kafka/consumer/outgoingBytes":[],"kafka/consumer/recordsConsumed":[],"kafka/consumer/recordsLag":[],"kafka/consumer/recordsPerRequestAvg":[],"kill/eligibleUnusedSegments/count":[],"kill/pendingSegments/count":[],"kill/task/count":[],"killTask/availableSlot/count":[],"killTask/maxSlot/count":[],"mergeBuffer/acquisitionTimeNs":[],"mergeBuffer/maxAcquisitionTimeNs":[],"mergeBuffer/pendingRequests":[],"mergeBuffer/queries":[],"mergeBuffer/used":[],"metadata/kill/audit/count":[],"metadata/kill/compaction/count":[],"metadata/kill/datasource/count":[],"metadata/kill/rule/count":[],"metadata/kill/supervisor/count":[],"metadatacache/backfill/count":[],"metadatacache/init/time":[],"metadatacache/refresh/count":[],"metadatacache/refresh/time":[],"metadatacache/schemaPoll/count":[],"metadatacache/schemaPoll/failed":[],"metadatacache/schemaPoll/time":[],"query/bytes":[],"query/cache/delta/averageBytes":[],"query/cache/delta/errors":[],"query/cache/delta/evictions":[],"query/cache/delta/hitRate":[],"query/cache/delta/hits":[],"query/cache/delta/misses":[],"query/cache/delta/numEntries":[],"query/cache/delta/put/error":[],"query/cache/delta/put/ok":[],"query/cache/delta/put/oversized":[],"query/cache/delta/sizeBytes":[],"query/cache/delta/timeouts":[],"query/cache/total/averageBytes":[],"query/cache/total/errors":[],"query/cache/total/evictions":[],"query/cache/total/hitRate":[],"query/cache/total/hits":[],"query/cache/total/misses":[],"query/cache/total/numEntries":[],"query/cache/total/put/error":[],"query/cache/total/put/ok":[],"query/cache/total/put/oversized":[],"query/cache/total/sizeBytes":[],"query/cache/total/timeouts":[],"query/count":[],"query/cpu/time":[],"query/failed/count":[],"query/interrupted/count":[],"query/node/bytes":[],"query/node/time":[],"query/node/ttfb":[],"query/segment/time":[],"query/segmentAndCache/time":[],"query/success/count":[],"query/time":[],"query/timeout/count":[],"query/wait/time":[],"schemacache/finalizedSchemaPayload/count":[],"schemacache/finalizedSegmentMetadata/count":[],"schemacache/inTransitSMQPublishedResults/count":[],"schemacache/inTransitSMQResults/count":[],"schemacache/realtime/count":[],"segment/added/bytes":[],"segment/assignSkipped/count":[],"segment/assigned/count":[],"segment/availableDeepStorageOnly/count":[],"segment/compacted/bytes":[],"segment/compacted/count":[],"segment/count":[],"segment/deleted/count":[],"segment/dropQueue/count":[],"segment/dropSkipped/count":[],"segment/dropped/count":[],"segment/loadQueue/assigned":[],"segment/loadQueue/cancelled":[],"segment/loadQueue/count":[],"segment/loadQueue/failed":[],"segment/loadQueue/size":[],"segment/loadQueue/success":[],"segment/max":[],"segment/moveSkipped/count":[],"segment/moved/bytes":[],"segment/moved/count":[],"segment/nuked/bytes":[],"segment/overShadowed/count":[],"segment/pendingDelete":[],"segment/scan/active":[],"segment/scan/pending":[],"segment/size":[],"segment/skipCompact/bytes":[],"segment/skipCompact/count":[],"segment/unavailable/count":[],"segment/underReplicated/count":[],"segment/unneeded/count":[],"segment/unneededEternityTombstone/count":[],"segment/used":[],"segment/usedPercent":[],"segment/waitCompact/bytes":[],"segment/waitCompact/count":[],"serverview/init/time":[],"serverview/sync/healthy":[],"serverview/sync/unstableTime":[],"service/heartbeat":[],"sqlQuery/bytes":[],"sqlQuery/planningTimeMs":[],"sqlQuery/time":[],"sys/cpu":[],"sys/disk/queue":[],"sys/disk/read/count":[],"sys/disk/read/size":[],"sys/disk/transferTime":[],"sys/disk/write/count":[],"sys/disk/write/size":[],"sys/fs/files/count":[],"sys/fs/files/free":[],"sys/fs/max":[],"sys/fs/used":[],"sys/mem/free":[],"sys/mem/max":[],"sys/mem/used":[],"sys/net/read/dropped":[],"sys/net/read/errors":[],"sys/net/read/packets":[],"sys/net/read/size":[],"sys/net/write/collisions":[],"sys/net/write/errors":[],"sys/net/write/packets":[],"sys/net/write/size":[],"sys/storage/used":[],"sys/swap/free":[],"sys/swap/max":[],"sys/swap/pageIn":[],"sys/swap/pageOut":[],"sys/tcpv4/activeOpens":[],"sys/tcpv4/attemptFails":[],"sys/tcpv4/estabResets":[],"sys/tcpv4/in/errs":[],"sys/tcpv4/in/segs":[],"sys/tcpv4/out/rsts":[],"sys/tcpv4/out/segs":[],"sys/tcpv4/passiveOpens":[],"sys/tcpv4/retrans/segs":[],"sys/uptime":[],"task/action/batch/attempts":[],"task/action/batch/queueTime":[],"task/action/batch/runTime":[],"task/action/batch/size":[],"task/action/failed/count":[],"task/action/run/time":[],"task/action/success/count":[],"task/autoScaler/requiredCount":[],"task/failed/count":[],"task/pending/count":[],"task/pending/time":[],"task/run/time":[],"task/running/count":[],"task/segmentAvailability/wait/time":[],"task/success/count":[],"task/waiting/count":[],"tier/historical/count":[],"tier/replication/factor":[],"tier/required/capacity":[],"tier/total/capacity":[],"zk/connected":[],"zk/reconnect/time":[]}