Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source 'https://rubygems.org'
gem 'rake', '10.1.0'
gem 'rack', '~> 1.6'
gem 'minitest'
gem 'minitest-matchers'
gem "yard", "~> 0.9.20"
gem 'single_cov'

Expand Down
119 changes: 94 additions & 25 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,56 @@
module Datadog
class Statsd

class Telemetry
attr_accessor :metrics
Comment thread
hush-hush marked this conversation as resolved.
attr_accessor :events
attr_accessor :service_checks
attr_accessor :bytes_sent
attr_accessor :bytes_dropped
attr_accessor :packets_sent
attr_accessor :packets_dropped
attr_reader :estimate_max_size

def initialize(disabled, tags)
@disabled = disabled
@tags = tags
reset

# estimate_max_size is an estimation or the maximum size of the
# telemetry payload. Since we don't want our packet to go over
# 'max_buffer_bytes', we have to adjust with the size of the telemetry
# (and any tags used). The telemetry payload size will change depending
# on the actual value of metrics: metrics received, packet dropped,
# etc. This is why we add a 63bytes margin: 9 bytes for each of the 7
# telemetry metrics.
@estimate_max_size = @disabled ? 0 : flush().length + 9 * 7
end

def reset
@metrics = 0
@events = 0
@service_checks = 0
@bytes_sent = 0
@bytes_dropped = 0
@packets_sent = 0
@packets_dropped = 0
end

def flush
return '' if @disabled

# using shorthand syntax to reduce the garbage collection
return %Q(
datadog.dogstatsd.client.metrics:#{@metrics}|#{COUNTER_TYPE}|##{@tags}
datadog.dogstatsd.client.events:#{@events}|#{COUNTER_TYPE}|##{@tags}
datadog.dogstatsd.client.service_checks:#{@service_checks}|#{COUNTER_TYPE}|##{@tags}
datadog.dogstatsd.client.bytes_sent:#{@bytes_sent}|#{COUNTER_TYPE}|##{@tags}
datadog.dogstatsd.client.bytes_dropped:#{@bytes_dropped}|#{COUNTER_TYPE}|##{@tags}
datadog.dogstatsd.client.packets_sent:#{@packets_sent}|#{COUNTER_TYPE}|##{@tags}
datadog.dogstatsd.client.packets_dropped:#{@packets_dropped}|#{COUNTER_TYPE}|##{@tags})
Comment thread
remeh marked this conversation as resolved.
end
end

class Connection
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 8125
Expand All @@ -33,14 +83,23 @@ class Connection
# DogStatsd unix socket path. Not used by default.
attr_reader :socket_path

def initialize(telemetry)
@telemetry = telemetry
end

# Close the underlying socket
def close
@socket && @socket.close
end

def write(message)
@logger.debug { "Statsd: #{message}" } if @logger
send_message(message)
payload = message + @telemetry.flush()
send_message(payload)

@telemetry.reset
@telemetry.bytes_sent += payload.length
Comment thread
remeh marked this conversation as resolved.
@telemetry.packets_sent += 1
rescue StandardError => boom
# Try once to reconnect if the socket has been closed
retries ||= 1
Expand All @@ -57,6 +116,8 @@ def write(message)
end
end

@telemetry.bytes_dropped += payload.length
@telemetry.packets_dropped += 1
@logger.error { "Statsd: #{boom.class} #{boom}" } if @logger
nil
end
Expand All @@ -69,7 +130,8 @@ def socket
end

class UDPConnection < Connection
def initialize(host, port, logger)
def initialize(host, port, logger, telemetry)
super(telemetry)
@host = host || ENV.fetch('DD_AGENT_HOST', nil) || DEFAULT_HOST
@port = port || ENV.fetch('DD_DOGSTATSD_PORT', nil) || DEFAULT_PORT
@logger = logger
Expand All @@ -91,7 +153,8 @@ def send_message(message)
class UDSConnection < Connection
class BadSocketError < StandardError; end

def initialize(socket_path, logger)
def initialize(socket_path, logger, telemetry)
super(telemetry)
@socket_path = socket_path
@logger = logger
end
Expand Down Expand Up @@ -186,7 +249,8 @@ def reset
CRITICAL = 2
UNKNOWN = 3

MAX_EVENT_SIZE = 8 * 1024
DEFAULT_BUFFER_SIZE = 8 * 1_024
MAX_EVENT_SIZE = 8 * 1_024

COUNTER_TYPE = 'c'.freeze
GAUGE_TYPE = 'g'.freeze
Expand Down Expand Up @@ -227,23 +291,12 @@ def initialize(
port = nil,
namespace: nil,
tags: nil,
max_buffer_bytes: 8192,
max_buffer_bytes: DEFAULT_BUFFER_SIZE,
socket_path: nil,
logger: nil,
sample_rate: nil
sample_rate: nil,
disable_telemetry: false
)
if socket_path.nil?
@connection = UDPConnection.new(host, port, logger)
else
@connection = UDSConnection.new(socket_path, logger)
end
@logger = logger

@namespace = namespace
@prefix = @namespace ? "#{@namespace}.".freeze : nil

@sample_rate = sample_rate

unless tags.nil? or tags.is_a? Array or tags.is_a? Hash
raise ArgumentError, 'tags must be a Array<String> or a Hash'
end
Expand All @@ -254,7 +307,25 @@ def initialize(
# append the entity id to tags if DD_ENTITY_ID env var is not nil
@tags << 'dd.internal.entity_id:' + escape_tag_content(ENV.fetch('DD_ENTITY_ID', nil)) unless ENV.fetch('DD_ENTITY_ID', nil).nil?

@batch = Batch.new @connection, max_buffer_bytes
# init telemetry
transport_type = socket_path.nil? ? "udp": "uds"
telemetry_tags = (["client:ruby", "client_version:#{VERSION}", "client_transport:#{transport_type}"] + @tags).join(COMMA).freeze
@telemetry = Telemetry.new(disable_telemetry, telemetry_tags)

if socket_path.nil?
@connection = UDPConnection.new(host, port, logger, @telemetry)
else
@connection = UDSConnection.new(socket_path, logger, @telemetry)
end
@logger = logger

@namespace = namespace
@prefix = @namespace ? "#{@namespace}.".freeze : nil

@sample_rate = sample_rate

# we reduce max_buffer_bytes by a the rough estimate of the telemetry payload
@batch = Batch.new(@connection, (max_buffer_bytes - @telemetry.estimate_max_size))
end

# yield a new instance to a block and close it when done
Expand Down Expand Up @@ -413,6 +484,7 @@ def set(stat, value, opts=EMPTY_OPTIONS)
# @example Report a critical service check status
# $statsd.service_check('my.service.check', Statsd::CRITICAL, :tags=>['urgent'])
def service_check(name, status, opts=EMPTY_OPTIONS)
@telemetry.service_checks += 1
send_stat format_service_check(name, status, opts)
end

Expand All @@ -435,6 +507,7 @@ def service_check(name, status, opts=EMPTY_OPTIONS)
# @example Report an awful event:
# $statsd.event('Something terrible happened', 'The end is near if we do nothing', :alert_type=>'warning', :tags=>['end_of_times','urgent'])
def event(title, text, opts=EMPTY_OPTIONS)
@telemetry.events += 1
send_stat format_event(title, text, opts)
end

Expand Down Expand Up @@ -560,6 +633,7 @@ def escape_service_check_message(msg)
end

def send_stats(stat, delta, type, opts=EMPTY_OPTIONS)
@telemetry.metrics += 1
sample_rate = opts[:sample_rate] || @sample_rate || 1
if sample_rate == 1 or rand <= sample_rate
full_stat = ''.dup
Expand Down Expand Up @@ -587,17 +661,12 @@ def send_stats(stat, delta, type, opts=EMPTY_OPTIONS)
full_stat << '#'.freeze
full_stat << tags_string
end

send_stat(full_stat)
end
end

def send_stat(message)
if @batch.open?
@batch.add message
else
@connection.write(message)
end
@batch.open? ? @batch.add(message) : @connection.write(message)
end
end
end
47 changes: 47 additions & 0 deletions spec/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
SingleCov.setup :minitest
end

require "minitest/matchers"
require 'minitest/autorun'
require 'mocha/minitest'
require 'faker'
Expand All @@ -14,12 +15,54 @@
require 'datadog/statsd'
require 'logger'

class TelemetryMatcher

attr_accessor :text

def initialize(text, metrics, events, service_checks, bytes_sent, bytes_dropped, packets_sent, packets_dropped, transport)
telemetry = ["datadog.dogstatsd.client.metrics:#{metrics}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}",
"datadog.dogstatsd.client.events:#{events}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}",
"datadog.dogstatsd.client.service_checks:#{service_checks}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}",
"datadog.dogstatsd.client.bytes_sent:#{bytes_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}",
"datadog.dogstatsd.client.bytes_dropped:#{bytes_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}",
"datadog.dogstatsd.client.packets_sent:#{packets_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}",
"datadog.dogstatsd.client.packets_dropped:#{packets_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}",
].join("\n")
@text = "#{text}\n#{telemetry}"
@last_compare = ''
end

def length
@text.length
end

def matches?(subject)
@last_compare = subject
subject == @text
end

def failure_message_for_should
%(expected:
#{@text}
got:
#{@last_compare}
)
end
end

def equal_with_telemetry(text, metrics: 1, events: 0, service_checks: 0, bytes_sent: 0, bytes_dropped:0, packets_sent: 0, packets_dropped: 0, transport: "udp")
TelemetryMatcher.new(text, metrics, events, service_checks, bytes_sent, bytes_dropped, packets_sent, packets_dropped, transport)
end
MiniTest::Unit::TestCase.register_matcher :equal_with_telemetry, :equal_with_telemetry

class FakeUDPSocket
def initialize
@buffer = []
@error_on_send = nil
end

def send(message, *)
raise @error_on_send if @error_on_send
@buffer.push [message]
end

Expand All @@ -34,4 +77,8 @@ def to_s
def inspect
"<FakeUDPSocket: #{@buffer.inspect}>"
end

def error_on_send(err)
@error_on_send = err
end
end
Loading