Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Breaking changes

* port / host / socket_path readers are now on statsd.connection
* port / host / tags / namespace can no longer be set on the instance to allow thread-safety [#87][] by [@grosser][]
* replace global logger with instance option [#90][] by [@grosser][]
* make format_service_check private [#89][] [@grosser][]
Expand Down
159 changes: 86 additions & 73 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,87 @@
# statsd = Datadog::Statsd.new 'localhost', 8125, :namespace => 'account'
# statsd.increment 'activate'
# @example Create a statsd client with global tags
# statsd = Datadog::Statsd.new 'localhost', 8125, :tags => 'tag1:true'
# statsd = Datadog::Statsd.new 'localhost', 8125, tags: 'tag1:true'
module Datadog
class Statsd

DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 8125
class Connection
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 8125

# StatsD host. Defaults to 127.0.0.1.
attr_reader :host

# StatsD port. Defaults to 8125.
attr_reader :port

# DogStatsd unix socket path. Not used by default.
attr_reader :socket_path

def initialize(host, port, socket_path, logger)
@host = host || DEFAULT_HOST
@port = port || DEFAULT_PORT
@socket_path = socket_path
@logger = logger
end

def write(message)
@logger.debug { "Statsd: #{message}" } if @logger
if @socket_path.nil?
socket.send(message, 0)
else
socket.sendmsg_nonblock(message)
end
rescue StandardError => boom
# Give up on this socket if it looks like it is bad
bad_socket = !@socket_path.nil? && (
boom.is_a?(Errno::ECONNREFUSED) ||
boom.is_a?(Errno::ECONNRESET) ||
boom.is_a?(Errno::ENOENT)
)
if bad_socket
@socket = nil
return
end

# Try once to reconnect if the socket has been closed
retries ||= 1
if retries <= 1 && boom.is_a?(IOError) && boom.message =~ /closed stream/i
retries += 1
begin
@socket = connect
retry
rescue StandardError => e
boom = e
end
end

@logger.error { "Statsd: #{boom.class} #{boom}" } if @logger
nil
end

# Close the underlying socket
def close
@socket.close
end

private

def socket
@socket ||= connect
end

def connect
if @socket_path.nil?
socket = UDPSocket.new
socket.connect(@host, @port)
else
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
socket.connect(Socket.pack_sockaddr_un(@socket_path))
end
socket
end
end

# Create a dictionary to assign a key to every parameter's name, except for tags (treated differently)
# Goal: Simple and fast to add some other parameters
Expand Down Expand Up @@ -60,15 +135,6 @@ class Statsd
# A namespace to prepend to all statsd calls. Defaults to no namespace.
attr_reader :namespace

# StatsD host. Defaults to 127.0.0.1.
attr_reader :host

# StatsD port. Defaults to 8125.
attr_reader :port

# DogStatsd unix socket path. Not used by default.
attr_reader :socket_path

# Global tags to be added to every statsd call. Defaults to no tags.
attr_reader :tags

Expand All @@ -78,8 +144,8 @@ class Statsd
# Maximum buffer size in bytes before it is flushed
attr_reader :max_buffer_bytes

# Logger
attr_reader :logger
# Connection
attr_reader :connection

# @param [String] host your statsd host
# @param [Integer] port your statsd port
Expand All @@ -89,18 +155,16 @@ class Statsd
# @option [Integer] max_buffer_bytes max bytes to buffer when using #batch
# @option [String] socket_path unix socket path
def initialize(
host = DEFAULT_HOST,
port = DEFAULT_PORT,
host = nil,
port = nil,
namespace: nil,
tags: nil,
max_buffer_bytes: 8192,
socket_path: nil,
logger: nil
)
@host = host || DEFAULT_HOST
@port = port || DEFAULT_PORT
@connection = Connection.new(host, port, socket_path, logger)
@logger = logger
@socket_path = socket_path

@namespace = namespace
@prefix = @namespace ? "#{@namespace}.".freeze : nil
Expand Down Expand Up @@ -308,7 +372,7 @@ def batch

# Close the underlying socket
def close
@socket.close
@connection.close
end

private
Expand Down Expand Up @@ -447,66 +511,15 @@ def send_stat(message)
@buffer << message
@buffer_bytes += message_bytes
else
send_to_socket(message)
@connection.write(message)
end
end

def flush_buffer
return if @buffer_bytes == 0
send_to_socket(@buffer)
@connection.write(@buffer)
@buffer = String.new
@buffer_bytes = 0
end

def connect_to_socket
if @socket_path.nil?
socket = UDPSocket.new
socket.connect(@host, @port)
else
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
socket.connect(Socket.pack_sockaddr_un(@socket_path))
end
socket
end

# TODO: rename to `socket` which breaks a lot of tests
def sock
@socket ||= connect_to_socket
end

def send_to_socket(message)
@logger.debug { "Statsd: #{message}" } if @logger
if @socket_path.nil?
sock.send(message, 0)
else
sock.sendmsg_nonblock(message)
end
rescue StandardError => boom
# Give up on this socket if it looks like it is bad
bad_socket = !@socket_path.nil? && (
boom.is_a?(Errno::ECONNREFUSED) ||
boom.is_a?(Errno::ECONNRESET) ||
boom.is_a?(Errno::ENOENT)
)
if bad_socket
@socket = nil
return
end

# Try once to reconnect if the socket has been closed
retries ||= 1
if retries <= 1 && boom.is_a?(IOError) && boom.message =~ /closed stream/i
retries += 1
begin
@socket = connect_to_socket
retry
rescue StandardError => e
boom = e
end
end

@logger.error { "Statsd: #{boom.class} #{boom}" } if @logger
nil
end
end
end
Loading