Class: Fluent::DatadogOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_datadog.rb

Defined Under Namespace

Classes: ConnectionFailure

Instance Method Summary collapse

Constructor Details

#initializeDatadogOutput

Returns a new instance of DatadogOutput.



34
35
36
# File 'lib/fluent/plugin/out_datadog.rb', line 34

def initialize
  super
end

Instance Method Details

#clientObject



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_datadog.rb', line 47

def client
  @_socket ||= if @use_ssl
    context    = OpenSSL::SSL::SSLContext.new
    socket     = TCPSocket.new @host, @ssl_port
    ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
    ssl_client.connect
  else
    socket = TCPSocket.new @host, @port
  end

  return @_socket

end

#configure(conf) ⇒ Object



43
44
45
# File 'lib/fluent/plugin/out_datadog.rb', line 43

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches Fluentd.



103
104
105
# File 'lib/fluent/plugin/out_datadog.rb', line 103

def format(tag, time, record)
  return [tag, record].to_msgpack
end

#init_socket(socket) ⇒ Object

not used for now…



62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_datadog.rb', line 62

def init_socket(socket)
  socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)

  begin
    socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 3)
    socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3)
    socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 10)
  rescue
    log.info "DatadogOutput: Fallback on socket options during initialization"
  end

  return socket
end

#send_to_datadog(data) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/fluent/plugin/out_datadog.rb', line 141

def send_to_datadog(data)
  @my_mutex.synchronize do
    retries = 0
    begin
      log.trace "Send nb_event=#{data.size} events to Datadog"

      # Check the connectivity and write messages
      log.info "New attempt to Datadog attempt=#{retries}" if retries > 0

      data.each do |event|
        log.trace "Datadog plugin: about to send event=#{event}"
        client.write(event)
      end

      # Handle some failures
    rescue Errno::EHOSTUNREACH, Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::EPIPE => e

      if retries < @max_retries || max_retries == -1
        @_socket = nil
        a_couple_of_seconds = retries ** 2
        a_couple_of_seconds = 30 unless a_couple_of_seconds < 30
        retries += 1
        log.warn "Could not push logs to Datadog, attempt=#{retries} max_attempts=#{max_retries} wait=#{a_couple_of_seconds}s error=#{e.message}"
        sleep a_couple_of_seconds
        retry
      end
      raise ConnectionFailure, "Could not push logs to Datadog after #{retries} retries, #{e.message}"
    end
  end
end

#shutdownObject



94
95
96
97
98
99
100
# File 'lib/fluent/plugin/out_datadog.rb', line 94

def shutdown
  super
  @running = false
  if @_socket
    @_socket.close()
  end
end

#startObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/fluent/plugin/out_datadog.rb', line 76

def start
  super
  @my_mutex = Mutex.new
  @running = true

  if @tcp_ping_rate > 0
    @timer = Thread.new do
      while @running do
        messages = Array.new
        messages.push("fp\n")
        send_to_datadog(messages)
        sleep(15)
      end
    end
  end

end

#write(chunk) ⇒ Object

NOTE! This method is called by internal thread, not Fluentd’s main thread. ‘chunk’ is a buffer chunk that includes multiple formatted events.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/out_datadog.rb', line 109

def write(chunk)
  messages = Array.new
  log.trace "Datadog plugin: received chunck: #{chunk}"
  chunk.msgpack_each do |tag, record|
    next unless record.is_a? Hash
    next if record.empty?

    log.trace "Datadog plugin: received record: #{record}"

    if @dd_sourcecategory
      record["ddsourcecategory"] = @dd_sourcecategory
    end
    if @dd_source
      record["ddsource"] = @dd_source
    end
    if @dd_tags
      record["ddtags"] = @dd_tags
    end
    if @include_tag_key
      record[@tag_key] = tag
    end
    if @use_json
      messages.push "#{api_key} " + Yajl.dump(record) + "\n"
    else
      next unless record.has_key? "message"
      messages.push "#{api_key} " + record["message"].rstrip() + "\n"
    end
  end
  send_to_datadog(messages)

end