Class: LogStash::Outputs::DatadogLogs
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::DatadogLogs
- Defined in:
- lib/logstash/outputs/datadog_logs.rb
Overview
DatadogLogs lets you send logs to Datadog based on LogStash events.
Defined Under Namespace
Classes: DatadogClient, DatadogHTTPClient, DatadogTCPClient, RetryableError
Constant Summary collapse
- DD_MAX_BATCH_LENGTH =
Respect limit documented at docs.datadoghq.com/api/?lang=bash#logs
500- DD_MAX_BATCH_SIZE =
5000000- DD_TRUNCATION_SUFFIX =
"...TRUNCATED..."
Instance Method Summary collapse
-
#batch_http_events(encoded_events, max_batch_length, max_request_size) ⇒ Object
Group HTTP events in batches.
-
#close ⇒ Object
Logstash shutdown hook.
-
#format_http_event_batch(batched_events) ⇒ Object
Format HTTP events.
-
#format_tcp_event(payload, api_key, max_request_size) ⇒ Object
Format TCP event.
-
#gzip_compress(payload, compression_level) ⇒ Object
Compress logs with GZIP.
- #max(a, b) ⇒ Object
- #multi_receive(events) ⇒ Object
-
#new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy) ⇒ Object
Build a new transport client.
-
#process_encoded_payload(payload) ⇒ Object
Process and send each encoded payload.
- #register ⇒ Object
-
#truncate(event, max_length) ⇒ Object
Truncate events over the provided max length, appending a marker when truncated.
Instance Method Details
#batch_http_events(encoded_events, max_batch_length, max_request_size) ⇒ Object
Group HTTP events in batches
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 95 def batch_http_events(encoded_events, max_batch_length, max_request_size) batches = [] current_batch = [] current_batch_size = 0 encoded_events.each_with_index do |event, i| encoded_event = event.last current_event_size = encoded_event.bytesize # If this unique log size is bigger than the request size, truncate it if current_event_size > max_request_size encoded_event = truncate(encoded_event, max_request_size) current_event_size = encoded_event.bytesize end if (i > 0 and i % max_batch_length == 0) or (current_batch_size + current_event_size > max_request_size) batches << current_batch current_batch = [] current_batch_size = 0 end current_batch_size += encoded_event.bytesize current_batch << encoded_event end batches << current_batch batches end |
#close ⇒ Object
Logstash shutdown hook
47 48 49 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 47 def close @client.close end |
#format_http_event_batch(batched_events) ⇒ Object
Format HTTP events
90 91 92 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 90 def format_http_event_batch(batched_events) "[#{batched_events.join(',')}]" end |
#format_tcp_event(payload, api_key, max_request_size) ⇒ Object
Format TCP event
81 82 83 84 85 86 87 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 81 def format_tcp_event(payload, api_key, max_request_size) formatted_payload = "#{api_key} #{payload}" if (formatted_payload.bytesize > max_request_size) return truncate(formatted_payload, max_request_size) end formatted_payload end |
#gzip_compress(payload, compression_level) ⇒ Object
Compress logs with GZIP
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 136 def gzip_compress(payload, compression_level) gz = StringIO.new gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz, compression_level) begin z.write(payload) ensure z.close end gz.string end |
#max(a, b) ⇒ Object
131 132 133 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 131 def max(a, b) a > b ? a : b end |
#multi_receive(events) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 53 def multi_receive(events) return if events.empty? encoded_events = @codec.multi_encode(events) begin if @use_http batches = batch_http_events(encoded_events, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE) batches.each do |batched_event| process_encoded_payload(format_http_event_batch(batched_event)) end else encoded_events.each do |encoded_event| process_encoded_payload(format_tcp_event(encoded_event.last, @api_key, DD_MAX_BATCH_SIZE)) end end rescue => e @logger.error("Uncaught processing exception in datadog forwarder #{e.message}") end end |
#new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy) ⇒ Object
Build a new transport client
149 150 151 152 153 154 155 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 149 def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy) if use_http DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy else DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port end end |
#process_encoded_payload(payload) ⇒ Object
Process and send each encoded payload
73 74 75 76 77 78 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 73 def process_encoded_payload(payload) if @use_compression and @use_http payload = gzip_compress(payload, @compression_level) end @client.send_retries(payload, @max_retries, @max_backoff) end |
#register ⇒ Object
42 43 44 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 42 def register @client = new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression, @force_v1_routes, @http_proxy) end |
#truncate(event, max_length) ⇒ Object
Truncate events over the provided max length, appending a marker when truncated
122 123 124 125 126 127 128 129 |
# File 'lib/logstash/outputs/datadog_logs.rb', line 122 def truncate(event, max_length) if event.length > max_length event = event[0..max_length - 1] event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length - 1] = DD_TRUNCATION_SUFFIX return event end event end |