Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 59 additions & 44 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,23 @@ class Statsd

# Create a dictionary to assign a key to every parameter's name, except for tags (treated differently)
# Goal: Simple and fast to add some other parameters
OPTS_KEYS = [
['date_happened', 'd'],
['hostname', 'h'],
['aggregation_key', 'k'],
['priority', 'p'],
['source_type_name', 's'],
['alert_type', 't']
]
OPTS_KEYS = {
:date_happened => :d,
:hostname => :h,
:aggregation_key => :k,
:priority => :p,
:source_type_name => :s,
:alert_type => :t,
}

# Service check options
SC_OPT_KEYS = [
['timestamp', 'd:'],
['hostname', 'h:'],
['tags', '#'],
['message', 'm:']
]
SC_OPT_KEYS = {
:timestamp => 'd:'.freeze,
:hostname => 'h:'.freeze,
:tags => '#'.freeze,
:message => 'm:'.freeze,
}

OK = 0
WARNING = 1
CRITICAL = 2
Expand Down Expand Up @@ -90,15 +91,15 @@ def initialize(host = DEFAULT_HOST, port = DEFAULT_PORT, opts = {}, max_buffer_s

def namespace=(namespace) #:nodoc:
@namespace = namespace
@prefix = namespace.nil? ? nil : "#{namespace}."
@prefix = namespace.nil? ? nil : "#{namespace}.".freeze
end

def host=(host) #:nodoc:
@host = host || '127.0.0.1'
@host = host || DEFAULT_HOST
end

def port=(port) #:nodoc:
@port = port || 8125
@port = port || DEFAULT_PORT
end

def tags=(tags) #:nodoc:
Expand Down Expand Up @@ -234,23 +235,24 @@ def service_check(name, status, opts={})
service_check_string = format_service_check(name, status, opts)
send_to_socket service_check_string
end

def format_service_check(name, status, opts={})
sc_string = "_sc|#{name}|#{status}"

SC_OPT_KEYS.each do |name_key|
if opts[name_key[0].to_sym]
if name_key[0] == 'tags'
tags = opts[:tags].map {|tag| escape_tag_content(tag) }
tags = "#{tags.join(",")}" unless tags.empty?
sc_string << "|##{tags}"
elsif name_key[0] == 'message'
message = remove_pipes(opts[:message])
escaped_message = escape_service_check_message(message)
sc_string << "|m:#{escaped_message}"
else
value = remove_pipes(opts[name_key[0].to_sym])
sc_string << "|#{name_key[1]}#{value}"
end
SC_OPT_KEYS.each do |key, shorthand_key|
next unless opts[key]

if key == :tags
tags = opts[:tags].map {|tag| escape_tag_content(tag) }
tags = "#{tags.join(COMMA)}" unless tags.empty?
sc_string << "|##{tags}"
elsif key == :message
message = remove_pipes(opts[:message])
escaped_message = escape_service_check_message(message)
sc_string << "|m:#{escaped_message}"
else
value = remove_pipes(opts[key])
sc_string << "|#{shorthand_key}#{value}"
end
end
return sc_string
Expand Down Expand Up @@ -303,38 +305,51 @@ def format_event(title, text, opts={})

# We construct the string to be sent by adding '|key:value' parts to it when needed
# All pipes ('|') in the metadata are removed. Title and Text can keep theirs
OPTS_KEYS.each do |name_key|
if name_key[0] != 'tags' && opts[name_key[0].to_sym]
value = remove_pipes(opts[name_key[0].to_sym])
event_string_data << "|#{name_key[1]}:#{value}"
OPTS_KEYS.each do |key, shorthand_key|
if key != :tags && opts[key]
value = remove_pipes(opts[key])
event_string_data << "|#{shorthand_key}:#{value}"
end
end

# Tags are joined and added as last part to the string to be sent
full_tags = (tags + (opts[:tags] || [])).map {|tag| escape_tag_content(tag) }
unless full_tags.empty?
event_string_data << "|##{full_tags.join(',')}"
event_string_data << "|##{full_tags.join(COMMA)}"
end

raise "Event #{title} payload is too big (more that 8KB), event discarded" if event_string_data.length > 8 * 1024
raise "Event #{title} payload is too big (more that 8KB), event discarded" if event_string_data.length > 8192 # 8 * 1024 = 8192
return event_string_data
end

private

NEW_LINE = "\n".freeze
ESC_NEW_LINE = "\\n".freeze
COMMA = ",".freeze
BLANK = "".freeze
PIPE = "|".freeze
DOT = ".".freeze
DOUBLE_COLON = "::".freeze
UNDERSCORE = "_".freeze

private_constant :NEW_LINE, :ESC_NEW_LINE, :COMMA, :BLANK, :PIPE, :DOT,
:DOUBLE_COLON, :UNDERSCORE

def escape_event_content(msg)
msg.gsub "\n", "\\n"
msg.gsub NEW_LINE, ESC_NEW_LINE
end

def escape_tag_content(tag)
remove_pipes(tag).gsub ",", ""
remove_pipes(tag).gsub COMMA, BLANK
end

def remove_pipes(msg)
msg.gsub "|", ""
msg.gsub PIPE, BLANK
end

def escape_service_check_message(msg)
msg.gsub('m:', 'm\:').gsub "\n", "\\n"
escape_event_content(msg).gsub('m:'.freeze, 'm\:'.freeze)
end

def time_since(stat, start, opts)
Expand All @@ -345,10 +360,10 @@ def send_stats(stat, delta, type, opts={})
sample_rate = opts[:sample_rate] || 1
if sample_rate == 1 or rand < sample_rate
# Replace Ruby module scoping with '.' and reserved chars (: | @) with underscores.
stat = stat.to_s.gsub('::', '.').tr(':|@', '_')
stat = stat.to_s.gsub(DOUBLE_COLON, DOT).tr(':|@'.freeze, UNDERSCORE)
rate = "|@#{sample_rate}" unless sample_rate == 1
ts = (tags || []) + (opts[:tags] || []).map {|tag| escape_tag_content(tag)}
tags = "|##{ts.join(",")}" unless ts.empty?
tags = "|##{ts.join(COMMA)}" unless ts.empty?
send_stat "#{@prefix}#{stat}:#{delta}|#{type}#{rate}#{tags}"
end
end
Expand All @@ -361,7 +376,7 @@ def send_to_buffer(message)
end

def flush_buffer()
send_to_socket(@buffer.join("\n"))
send_to_socket(@buffer.join(NEW_LINE))
@buffer = Array.new
end

Expand Down