Class: LogStash::Outputs::DatadogLogs

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/datadog_logs.rb

Overview

DatadogLogs lets you send logs to Datadog based on LogStash events.

Defined Under Namespace

Classes: DatadogClient, DatadogHTTPClient, DatadogTCPClient, RetryableError

Constant Summary collapse

DD_MAX_BATCH_LENGTH =

Respect limit documented at docs.datadoghq.com/api/?lang=bash#logs

500
DD_MAX_BATCH_SIZE =
5000000
DD_TRUNCATION_SUFFIX =
"...TRUNCATED..."

Instance Method Summary collapse

Instance Method Details

#batch_http_events(encoded_events, max_batch_length, max_request_size) ⇒ Object

Group HTTP events in batches



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/logstash/outputs/datadog_logs.rb', line 95

def batch_http_events(encoded_events, max_batch_length, max_request_size)
  batches = []
  current_batch = []
  current_batch_size = 0
  encoded_events.each_with_index do |event, i|
    encoded_event = event.last
    current_event_size = encoded_event.bytesize
    # If this unique log size is bigger than the request size, truncate it
    if current_event_size > max_request_size
      encoded_event = truncate(encoded_event, max_request_size)
      current_event_size = encoded_event.bytesize
    end

    if (i > 0 and i % max_batch_length == 0) or (current_batch_size + current_event_size > max_request_size)
      batches << current_batch
      current_batch = []
      current_batch_size = 0
    end

    current_batch_size += encoded_event.bytesize
    current_batch << encoded_event
  end
  batches << current_batch
  batches
end

#closeObject

Logstash shutdown hook



47
48
49
# File 'lib/logstash/outputs/datadog_logs.rb', line 47

def close
  @client.close
end

#format_http_event_batch(batched_events) ⇒ Object

Format HTTP events



90
91
92
# File 'lib/logstash/outputs/datadog_logs.rb', line 90

def format_http_event_batch(batched_events)
  "[#{batched_events.join(',')}]"
end

#format_tcp_event(payload, api_key, max_request_size) ⇒ Object

Format TCP event



81
82
83
84
85
86
87
# File 'lib/logstash/outputs/datadog_logs.rb', line 81

def format_tcp_event(payload, api_key, max_request_size)
  formatted_payload = "#{api_key} #{payload}"
  if (formatted_payload.bytesize > max_request_size)
    return truncate(formatted_payload, max_request_size)
  end
  formatted_payload
end

#gzip_compress(payload, compression_level) ⇒ Object

Compress logs with GZIP



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/logstash/outputs/datadog_logs.rb', line 136

def gzip_compress(payload, compression_level)
  gz = StringIO.new
  gz.set_encoding("BINARY")
  z = Zlib::GzipWriter.new(gz, compression_level)
  begin
    z.write(payload)
  ensure
    z.close
  end
  gz.string
end

#max(a, b) ⇒ Object



131
132
133
# File 'lib/logstash/outputs/datadog_logs.rb', line 131

def max(a, b)
  a > b ? a : b
end

#multi_receive(events) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/outputs/datadog_logs.rb', line 53

def multi_receive(events)
  return if events.empty?
  encoded_events = @codec.multi_encode(events)
  begin
    if @use_http
      batches = batch_http_events(encoded_events, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE)
      batches.each do |batched_event|
        process_encoded_payload(format_http_event_batch(batched_event))
      end
    else
      encoded_events.each do |encoded_event|
        process_encoded_payload(format_tcp_event(encoded_event.last, @api_key, DD_MAX_BATCH_SIZE))
      end
    end
  rescue => e
    @logger.error("Uncaught processing exception in datadog forwarder #{e.message}")
  end
end

#new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy) ⇒ Object

Build a new transport client



149
150
151
152
153
154
155
# File 'lib/logstash/outputs/datadog_logs.rb', line 149

def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy)
  if use_http
    DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy
  else
    DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port
  end
end

#process_encoded_payload(payload) ⇒ Object

Process and send each encoded payload



73
74
75
76
77
78
# File 'lib/logstash/outputs/datadog_logs.rb', line 73

def process_encoded_payload(payload)
  if @use_compression and @use_http
    payload = gzip_compress(payload, @compression_level)
  end
  @client.send_retries(payload, @max_retries, @max_backoff)
end

#registerObject



42
43
44
# File 'lib/logstash/outputs/datadog_logs.rb', line 42

def register
  @client = new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression, @force_v1_routes, @http_proxy)
end

#truncate(event, max_length) ⇒ Object

Truncate events over the provided max length, appending a marker when truncated



122
123
124
125
126
127
128
129
# File 'lib/logstash/outputs/datadog_logs.rb', line 122

def truncate(event, max_length)
  if event.length > max_length
    event = event[0..max_length - 1]
    event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length - 1] = DD_TRUNCATION_SUFFIX
    return event
  end
  event
end