Class: Fluent::DatadogOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_datadog.rb

Defined Under Namespace

Classes: ConnectionFailure

Instance Method Summary collapse

Constructor Details

#initializeDatadogOutput

Returns a new instance of DatadogOutput.



36
37
38
# File 'lib/fluent/plugin/out_datadog.rb', line 36

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



45
46
47
# File 'lib/fluent/plugin/out_datadog.rb', line 45

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches Fluentd.



91
92
93
94
95
96
# File 'lib/fluent/plugin/out_datadog.rb', line 91

def format(tag, time, record)
  # When Fluent::EventTime is msgpack'ed it gets converted to int with seconds
  # precision only. We explicitly convert it to floating point number, which
  # is compatible with Time.at below.
  return [tag, time.to_f, record].to_msgpack
end

#get_attribute_tags(record, attribute_tags) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/fluent/plugin/out_datadog.rb', line 220

def get_attribute_tags(record, attribute_tags)
  if not attribute_tags.nil? and not record.nil?
    tags = Array.new
    # attributes are separated by ,
    for attribute_tag in attribute_tags.split(",")
      # attribute split into key:attribute
      keyattr = attribute_tag.split(":")
      key = keyattr.at(0)
      if keyattr.length == 2
        attr = keyattr.at(1)
      else
        attr = key
      end

      # split attribute name by .
      subrecord = record
      attrparts = attr.split(".")
      for index in (0...attrparts.length - 1)
        subrecord = subrecord[attrparts[index]] unless subrecord[attrparts[index]].nil?
      end
      tags.push(key + ":" + subrecord[attrparts.at(attrparts.length - 1)]) unless subrecord[attrparts.at(attrparts.length - 1)].nil?

      if key == "source"
        record["ddsource"] = subrecord[attrparts.at(attrparts.length - 1)]
      elsif key == "hostname"
        record["host"] = subrecord[attrparts.at(attrparts.length - 1)]
      end
    end
    return tags.join(",")
  end
  return nil
end

#get_container_tags(record) ⇒ Object

Collect docker and kubernetes tags for your logs using ‘filter_kubernetes_metadata` plugin, for more information about the attribute names, check: github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter/blob/master/lib/fluent/plugin/filter_kubernetes_metadata.rb#L265



190
191
192
193
194
195
# File 'lib/fluent/plugin/out_datadog.rb', line 190

def get_container_tags(record)
  return [
    get_kubernetes_tags(record),
    get_docker_tags(record)
  ].compact.join(",")
end

#get_docker_tags(record) ⇒ Object



210
211
212
213
214
215
216
217
218
# File 'lib/fluent/plugin/out_datadog.rb', line 210

def get_docker_tags(record)
  if record.key?('docker') and not record.fetch('docker').nil?
    docker = record['docker']
    tags = Array.new
    tags.push("container_id:" + docker['container_id']) unless docker['container_id'].nil?
    return tags.join(",")
  end
  return nil
end

#get_kubernetes_tags(record) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/fluent/plugin/out_datadog.rb', line 197

def get_kubernetes_tags(record)
  if record.key?('kubernetes') and not record.fetch('kubernetes').nil?
    kubernetes = record['kubernetes']
    tags = Array.new
    tags.push("image_name:" + kubernetes['container_image']) unless kubernetes['container_image'].nil?
    tags.push("container_name:" + kubernetes['container_name']) unless kubernetes['container_name'].nil?
    tags.push("kube_namespace:" + kubernetes['namespace_name']) unless kubernetes['namespace_name'].nil?
    tags.push("pod_name:" + kubernetes['pod_name']) unless kubernetes['pod_name'].nil?
    return tags.join(",")
  end
  return nil
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/fluent/plugin/out_datadog.rb', line 49

def multi_workers_ready?
  true
end

#new_clientObject



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_datadog.rb', line 53

def new_client
  if @use_ssl
    context    = OpenSSL::SSL::SSLContext.new
    socket     = TCPSocket.new @host, @ssl_port
    ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
    ssl_client.connect
    return ssl_client
  else
    return TCPSocket.new @host, @port
  end
end

#send_to_datadog(events) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/plugin/out_datadog.rb', line 153

def send_to_datadog(events)
  @my_mutex.synchronize do
    events.each do |event|
      log.trace "Datadog plugin: about to send event=#{event}"
      retries = 0
      begin
        log.info "New attempt to Datadog attempt=#{retries}" if retries > 1
        @client ||= new_client
        @client.write(event)
      rescue => e
        @client.close rescue nil
        @client = nil

        if retries == 0
          # immediately retry, in case it's just a server-side close
          retries += 1
          retry
        end

        if retries < @max_retries || @max_retries == -1
          a_couple_of_seconds = retries ** 2
          a_couple_of_seconds = 30 unless a_couple_of_seconds < 30
          retries += 1
          log.warn "Could not push event to Datadog, attempt=#{retries} max_attempts=#{max_retries} wait=#{a_couple_of_seconds}s error=#{e}"
          sleep a_couple_of_seconds
          retry
        end
        raise ConnectionFailure, "Could not push event to Datadog after #{retries} retries, #{e}"
      end
    end
  end
end

#shutdownObject



82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_datadog.rb', line 82

def shutdown
  super
  @running = false
  if @client
    @client.close
  end
end

#startObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_datadog.rb', line 65

def start
  super
  @my_mutex = Mutex.new
  @running = true

  if @tcp_ping_rate > 0
    @timer = Thread.new do
      while @running do
        messages = Array.new
        messages.push("fp\n")
        send_to_datadog(messages)
        sleep(@tcp_ping_rate)
      end
    end
  end
end

#write(chunk) ⇒ Object

NOTE! This method is called by internal thread, not Fluentd’s main thread. ‘chunk’ is a buffer chunk that includes multiple formatted events.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fluent/plugin/out_datadog.rb', line 100

def write(chunk)
  messages = Array.new

  chunk.msgpack_each do |tag, time, record|
    next unless record.is_a? Hash
    next if record.empty?

    if @dd_sourcecategory
      record["ddsourcecategory"] = @dd_sourcecategory
    end
    if @dd_source
      record["ddsource"] = @dd_source
    end
    if @dd_tags
      record["ddtags"] = @dd_tags
    end

    if @include_tag_key
      record[@tag_key] = tag
    end
    # If @timestamp_key already exists, we don't overwrite it.
    if @timestamp_key and record[@timestamp_key].nil? and time
      record[@timestamp_key] = Time.at(time).utc.iso8601(3)
    end

    attribute_tags = get_attribute_tags(record, @dd_attribute_tags)
    if not attribute_tags.empty?
      if record["ddtags"].nil? || record["ddtags"].empty?
        record["ddtags"] = attribute_tags
      else
        record["ddtags"] = record["ddtags"] + "," + attribute_tags
      end
    end

    container_tags = get_container_tags(record)
    if not container_tags.empty?
      if record["ddtags"].nil? || record["ddtags"].empty?
        record["ddtags"] = container_tags
      else
        record["ddtags"] = record["ddtags"] + "," + container_tags
      end
    end

    if @use_json
      messages.push "#{api_key} " + Yajl.dump(record) + "\n"
    else
      next unless record.has_key? "message"
      messages.push "#{api_key} " + record["message"].strip + "\n"
    end
  end
  send_to_datadog(messages)
end