diff --git a/src/StackExchange.Metrics/Handlers/SignalFxMetricHandler.cs b/src/StackExchange.Metrics/Handlers/SignalFxMetricHandler.cs index a78a884..14a6239 100644 --- a/src/StackExchange.Metrics/Handlers/SignalFxMetricHandler.cs +++ b/src/StackExchange.Metrics/Handlers/SignalFxMetricHandler.cs @@ -123,7 +123,7 @@ private IMetricHandler GetHandler() if (_baseUri.Scheme == "udp") { - return new StatsdMetricHandler(_baseUri.Host, (ushort)_baseUri.Port); + return new BufferedStatsdMetricHandler(_baseUri.Host, (ushort)_baseUri.Port); } if (_baseUri.Scheme == Uri.UriSchemeHttp || _baseUri.Scheme == Uri.UriSchemeHttps) diff --git a/src/StackExchange.Metrics/Handlers/StatsdMetricHandler.cs b/src/StackExchange.Metrics/Handlers/StatsdMetricHandler.cs index d320d53..8695dc4 100644 --- a/src/StackExchange.Metrics/Handlers/StatsdMetricHandler.cs +++ b/src/StackExchange.Metrics/Handlers/StatsdMetricHandler.cs @@ -13,10 +13,127 @@ namespace StackExchange.Metrics.Handlers { + /// + /// Implements by sending data to a statsd endpoint. + /// + public class StatsdMetricHandler : IMetricHandler + { + string _host; + ushort _port; + IMetricHandler _activeHandler; + + /// + /// Constructs a new pointing at the specified host and port. + /// + /// + /// Host of a statsd endpoint. + /// + /// + /// Port of a stats endpoint. + /// + /// + /// If the host or port are set to null/0 this handler will be deactivated (every operation is a no-op). + /// Otherwise it sends data to a StatsD endpoint via UDP. + /// + public StatsdMetricHandler(string host, ushort port) + { + _host = host; + _port = port; + _activeHandler = GetHandler(); + } + + /// + /// Gets or sets the maximum number of payloads we can keep before we consider our buffers full. + /// + public long MaxPayloadCount + { + get + { + if (_activeHandler is BufferedStatsdMetricHandler bufferedHandler) + { + return bufferedHandler.MaxPayloadCount; + } + return 0; + } + set + { + if (_activeHandler is BufferedStatsdMetricHandler bufferedHandler) + { + bufferedHandler.MaxPayloadCount = value; + } + } + } + + /// + /// Host to which we should send metrics to. + /// + public string Host + { + get => _host; + set + { + _host = value; + var oldHandler = _activeHandler; + if (oldHandler != null) + { + oldHandler.Dispose(); + } + + _activeHandler = GetHandler(); + } + } + + /// + /// Port to which we should send metrics to. + /// + public ushort Port + { + get => _port; + set + { + _port = value; + var oldHandler = _activeHandler; + if (oldHandler != null) + { + oldHandler.Dispose(); + } + + _activeHandler = GetHandler(); + } + } + + /// + public IMetricReadingBatch BeginBatch() => _activeHandler.BeginBatch(); + + /// + public void SerializeMetadata(IEnumerable metadata) => _activeHandler.SerializeMetadata(metadata); + + /// + public void SerializeMetric(in MetricReading reading) => _activeHandler.SerializeMetric(reading); + + /// + public ValueTask FlushAsync(TimeSpan delayBetweenRetries, int maxRetries, Action afterSend, + Action exceptionHandler) => + _activeHandler?.FlushAsync(delayBetweenRetries, maxRetries, afterSend, exceptionHandler) ?? default(ValueTask); + + /// + public void Dispose() => _activeHandler.Dispose(); + + private IMetricHandler GetHandler() + { + if (_host == null || _port == 0) + { + return NoOpMetricHandler.Instance; + } + + return new BufferedStatsdMetricHandler(_host, _port); + } + } + /// /// Implements by sending data to a statsd endpoint. /// - public class StatsdMetricHandler : BufferedMetricHandler + internal class BufferedStatsdMetricHandler : BufferedMetricHandler { const int ValueDecimals = 5; static readonly byte[] s_counter = Encoding.UTF8.GetBytes("c"); @@ -34,16 +151,7 @@ public class StatsdMetricHandler : BufferedMetricHandler PayloadTypeMetadata _metricMetadata; PayloadTypeMetadata _metadataMetadata; - /// - /// Constructs a new pointing at the specified host and port. - /// - /// - /// Host of a statsd endpoint. - /// - /// - /// Port of a stats endpoint. - /// - public StatsdMetricHandler(string host, ushort port) + internal BufferedStatsdMetricHandler(string host, ushort port) { _host = host; _port = port; @@ -412,6 +520,7 @@ private static List> GetBufferList(in ReadOnlySequence sequence.First.GetArray() }; } + var list = new List>(); foreach (var b in sequence) { @@ -457,6 +566,7 @@ private static Socket CreateSocket(EndPoint endpoint) { socket.Bind(new IPEndPoint(IPAddress.IPv6Any, 0)); } + return socket; } } diff --git a/src/StackExchange.Metrics/MetricsCollectorExtensions.cs b/src/StackExchange.Metrics/MetricsCollectorExtensions.cs index 5e16af9..bd773e3 100644 --- a/src/StackExchange.Metrics/MetricsCollectorExtensions.cs +++ b/src/StackExchange.Metrics/MetricsCollectorExtensions.cs @@ -46,6 +46,9 @@ public async static Task DumpAsync(this MetricsCollector collector, TextWriter t case SignalFxMetricHandler signalFxHandler: await textWriter.WriteLineAsync(signalFxHandler.BaseUri?.AbsoluteUri ?? "null"); break; + case StatsdMetricHandler statsdHandler: + await textWriter.WriteLineAsync(statsdHandler.Host + ":" + statsdHandler.Port); + break; case LocalMetricHandler h: await textWriter.WriteLineAsync("Local"); break; diff --git a/tests/StackExchange.Metrics.Tests/StatsdMetricHandlerTests.cs b/tests/StackExchange.Metrics.Tests/StatsdMetricHandlerTests.cs index 82fee89..c390f1e 100644 --- a/tests/StackExchange.Metrics.Tests/StatsdMetricHandlerTests.cs +++ b/tests/StackExchange.Metrics.Tests/StatsdMetricHandlerTests.cs @@ -109,5 +109,27 @@ public static Task ReceiveStatsdOverUdpAsync(ushort port, ITestOutputHel resetEvent.Wait(); return tcs.Task; } + + [Fact] + public async Task DeactivatedUdpUri_Counter_ReceivesNoStatsd() + { + const ushort port = 1234; + var handler = new StatsdMetricHandler(null, port); + var utcNow = DateTime.UtcNow; + var reading = new MetricReading("test.metrics", MetricType.Counter, _rng.Next(), new Dictionary { ["host"] = "test!" }.ToImmutableDictionary(), utcNow); + + handler.SerializeMetric(reading); + + void AfterSendAssertion(AfterSendInfo i) + { + Assert.Equal(0, i.BytesWritten); + Assert.Equal("", i.Endpoint); + Assert.True(i.Successful); + } + + await handler.FlushAsync( + TimeSpan.Zero, 0, AfterSendAssertion, ex => _output.WriteLine(ex.ToString()) + ); + } } }