Skip to content

Commit 055cda2

Browse files
authored
Merge pull request #94 from grosser/grosser/threads
extract connection for separation of concerns
2 parents aeea888 + 6123046 commit 055cda2

3 files changed

Lines changed: 199 additions & 203 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Breaking changes
66

7+
* port / host / socket_path readers are now on statsd.connection
78
* port / host / tags / namespace can no longer be set on the instance to allow thread-safety [#87][] by [@grosser][]
89
* replace global logger with instance option [#90][] by [@grosser][]
910
* make format_service_check private [#89][] [@grosser][]

lib/datadog/statsd.rb

Lines changed: 86 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,87 @@
1616
# statsd = Datadog::Statsd.new 'localhost', 8125, :namespace => 'account'
1717
# statsd.increment 'activate'
1818
# @example Create a statsd client with global tags
19-
# statsd = Datadog::Statsd.new 'localhost', 8125, :tags => 'tag1:true'
19+
# statsd = Datadog::Statsd.new 'localhost', 8125, tags: 'tag1:true'
2020
module Datadog
2121
class Statsd
2222

23-
DEFAULT_HOST = '127.0.0.1'
24-
DEFAULT_PORT = 8125
23+
class Connection
24+
DEFAULT_HOST = '127.0.0.1'
25+
DEFAULT_PORT = 8125
26+
27+
# StatsD host. Defaults to 127.0.0.1.
28+
attr_reader :host
29+
30+
# StatsD port. Defaults to 8125.
31+
attr_reader :port
32+
33+
# DogStatsd unix socket path. Not used by default.
34+
attr_reader :socket_path
35+
36+
def initialize(host, port, socket_path, logger)
37+
@host = host || DEFAULT_HOST
38+
@port = port || DEFAULT_PORT
39+
@socket_path = socket_path
40+
@logger = logger
41+
end
42+
43+
def write(message)
44+
@logger.debug { "Statsd: #{message}" } if @logger
45+
if @socket_path.nil?
46+
socket.send(message, 0)
47+
else
48+
socket.sendmsg_nonblock(message)
49+
end
50+
rescue StandardError => boom
51+
# Give up on this socket if it looks like it is bad
52+
bad_socket = !@socket_path.nil? && (
53+
boom.is_a?(Errno::ECONNREFUSED) ||
54+
boom.is_a?(Errno::ECONNRESET) ||
55+
boom.is_a?(Errno::ENOENT)
56+
)
57+
if bad_socket
58+
@socket = nil
59+
return
60+
end
61+
62+
# Try once to reconnect if the socket has been closed
63+
retries ||= 1
64+
if retries <= 1 && boom.is_a?(IOError) && boom.message =~ /closed stream/i
65+
retries += 1
66+
begin
67+
@socket = connect
68+
retry
69+
rescue StandardError => e
70+
boom = e
71+
end
72+
end
73+
74+
@logger.error { "Statsd: #{boom.class} #{boom}" } if @logger
75+
nil
76+
end
77+
78+
# Close the underlying socket
79+
def close
80+
@socket.close
81+
end
82+
83+
private
84+
85+
def socket
86+
@socket ||= connect
87+
end
88+
89+
def connect
90+
if @socket_path.nil?
91+
socket = UDPSocket.new
92+
socket.connect(@host, @port)
93+
else
94+
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
95+
socket.connect(Socket.pack_sockaddr_un(@socket_path))
96+
end
97+
socket
98+
end
99+
end
25100

26101
# Create a dictionary to assign a key to every parameter's name, except for tags (treated differently)
27102
# Goal: Simple and fast to add some other parameters
@@ -60,15 +135,6 @@ class Statsd
60135
# A namespace to prepend to all statsd calls. Defaults to no namespace.
61136
attr_reader :namespace
62137

63-
# StatsD host. Defaults to 127.0.0.1.
64-
attr_reader :host
65-
66-
# StatsD port. Defaults to 8125.
67-
attr_reader :port
68-
69-
# DogStatsd unix socket path. Not used by default.
70-
attr_reader :socket_path
71-
72138
# Global tags to be added to every statsd call. Defaults to no tags.
73139
attr_reader :tags
74140

@@ -78,8 +144,8 @@ class Statsd
78144
# Maximum buffer size in bytes before it is flushed
79145
attr_reader :max_buffer_bytes
80146

81-
# Logger
82-
attr_reader :logger
147+
# Connection
148+
attr_reader :connection
83149

84150
# @param [String] host your statsd host
85151
# @param [Integer] port your statsd port
@@ -89,18 +155,16 @@ class Statsd
89155
# @option [Integer] max_buffer_bytes max bytes to buffer when using #batch
90156
# @option [String] socket_path unix socket path
91157
def initialize(
92-
host = DEFAULT_HOST,
93-
port = DEFAULT_PORT,
158+
host = nil,
159+
port = nil,
94160
namespace: nil,
95161
tags: nil,
96162
max_buffer_bytes: 8192,
97163
socket_path: nil,
98164
logger: nil
99165
)
100-
@host = host || DEFAULT_HOST
101-
@port = port || DEFAULT_PORT
166+
@connection = Connection.new(host, port, socket_path, logger)
102167
@logger = logger
103-
@socket_path = socket_path
104168

105169
@namespace = namespace
106170
@prefix = @namespace ? "#{@namespace}.".freeze : nil
@@ -308,7 +372,7 @@ def batch
308372

309373
# Close the underlying socket
310374
def close
311-
@socket.close
375+
@connection.close
312376
end
313377

314378
private
@@ -447,66 +511,15 @@ def send_stat(message)
447511
@buffer << message
448512
@buffer_bytes += message_bytes
449513
else
450-
send_to_socket(message)
514+
@connection.write(message)
451515
end
452516
end
453517

454518
def flush_buffer
455519
return if @buffer_bytes == 0
456-
send_to_socket(@buffer)
520+
@connection.write(@buffer)
457521
@buffer = String.new
458522
@buffer_bytes = 0
459523
end
460-
461-
def connect_to_socket
462-
if @socket_path.nil?
463-
socket = UDPSocket.new
464-
socket.connect(@host, @port)
465-
else
466-
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
467-
socket.connect(Socket.pack_sockaddr_un(@socket_path))
468-
end
469-
socket
470-
end
471-
472-
# TODO: rename to `socket` which breaks a lot of tests
473-
def sock
474-
@socket ||= connect_to_socket
475-
end
476-
477-
def send_to_socket(message)
478-
@logger.debug { "Statsd: #{message}" } if @logger
479-
if @socket_path.nil?
480-
sock.send(message, 0)
481-
else
482-
sock.sendmsg_nonblock(message)
483-
end
484-
rescue StandardError => boom
485-
# Give up on this socket if it looks like it is bad
486-
bad_socket = !@socket_path.nil? && (
487-
boom.is_a?(Errno::ECONNREFUSED) ||
488-
boom.is_a?(Errno::ECONNRESET) ||
489-
boom.is_a?(Errno::ENOENT)
490-
)
491-
if bad_socket
492-
@socket = nil
493-
return
494-
end
495-
496-
# Try once to reconnect if the socket has been closed
497-
retries ||= 1
498-
if retries <= 1 && boom.is_a?(IOError) && boom.message =~ /closed stream/i
499-
retries += 1
500-
begin
501-
@socket = connect_to_socket
502-
retry
503-
rescue StandardError => e
504-
boom = e
505-
end
506-
end
507-
508-
@logger.error { "Statsd: #{boom.class} #{boom}" } if @logger
509-
nil
510-
end
511524
end
512525
end

0 commit comments

Comments
 (0)