Class: ScalyrThreaded::ScalyrOut

Inherits:
Fluent::Plugin::Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_scalyr_threaded.rb

Instance Method Summary collapse

Instance Method Details

#build_add_events_body(chunk) ⇒ Object



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 320

def build_add_events_body( chunk )

  #requests
  requests = Array.new

  #set of unique scalyr threads for this chunk
  current_threads = Hash.new

  #byte count
  total_bytes = 0

  #create a Scalyr event object for each record in the chunk
  events = Array.new
  chunk.msgpack_each {|(tag, sec, nsec, record)|

    timestamp = self.to_nanos( sec, nsec )

    thread_id = 0

    @sync.synchronize {
      #ensure timestamp is at least 1 nanosecond greater than the last one
      timestamp = [timestamp, @last_timestamp + 1].max
      @last_timestamp = timestamp

      #get thread id or add a new one if we haven't seen this tag before
      if @thread_ids.key? tag
        thread_id = @thread_ids[tag]
      else
        thread_id = @next_id
        @thread_ids[tag] = thread_id
        @next_id += 1
      end
    }

    #then update the map of threads for this chunk
    current_threads[tag] = thread_id

    #add a logfile field if one doesn't exist
    if !record.key? "logfile"
      record["logfile"] = "/fluentd/#{tag}"
    end

    #append to list of events
    event = { :thread => thread_id.to_s,
              :ts => timestamp,
              :attrs => record
            }

    #get json string of event to keep track of how many bytes we are sending

    begin
      event_json = event.to_json
    rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e
      $log.warn "#{e.class}: #{e.message}"

      # Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex)
      time = Fluent::EventTime.new( sec, nsec )
      router.emit_error_event(tag, time, record, e)

      event[:attrs].each do |key, value|
        $log.debug "\t#{key} (#{value.encoding.name}): '#{value}'"
        event[:attrs][key] = value.encode("UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>").force_encoding('UTF-8')
      end
      event_json = event.to_json
    end

    #generate new request if json size of events in the array exceed maximum request buffer size
    append_event = true
    if total_bytes + event_json.bytesize > @max_request_buffer
      #make sure we always have at least one event
      if events.size == 0
        events << event
        append_event = false
      end
      request = self.create_request( events, current_threads )
      requests << request

      total_bytes = 0
      current_threads = Hash.new
      events = Array.new
    end

    #if we haven't consumed the current event already
    #add it to the end of our array and keep track of the json bytesize
    if append_event
      events << event
      total_bytes += event_json.bytesize
    end

  }

  #create a final request with any left over events
  request = self.create_request( events, current_threads )
  requests << request

end

#compat_parameters_default_chunk_keyObject

support for version 0.14.0:



62
63
64
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 62

def compat_parameters_default_chunk_key
  ""
end

#configure(conf) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 70

def configure( conf )

  if conf.elements('buffer').empty?
    $log.warn "Pre 0.14.0 configuration file detected.  Please consider updating your configuration file"
  end

  compat_parameters_buffer( conf, default_chunk_key: '' )

  super

  if @buffer.chunk_limit_size > 1024*1024
    $log.warn "Buffer chunk size is greater than 1Mb.  This may result in requests being rejected by Scalyr"
  end

  if @max_request_buffer > (1024*1024*3)
    $log.warn "Maximum request buffer > 3Mb.  This may result in requests being rejected by Scalyr"
  end

  @message_encoding = nil
  if @force_message_encoding.to_s != ''
    begin
      @message_encoding = Encoding.find( @force_message_encoding )
      $log.debug "Forcing message encoding to '#{@force_message_encoding}'"
    rescue ArgumentError
      $log.warn "No encoding '#{@force_message_encoding}' found.  Ignoring"
    end
  end

  #evaluate any statements in string value of the server_attributes object
  if @server_attributes
    new_attributes = {}
    @server_attributes.each do |key, value|
      if value.is_a?( String )
        m = /^\#{(.*)}$/.match( value )
        if m
          new_attributes[key] = eval( m[1] )
        else
          new_attributes[key] = value
        end
      end
    end
    @server_attributes = new_attributes
  end

  # See if we should use the hostname as the server_attributes.serverHost
  if @use_hostname_for_serverhost

    # ensure server_attributes is not nil
    if @server_attributes.nil?
      @server_attributes = {}
    end

    # only set serverHost if it doesn't currently exist in server_attributes
    # Note: Use strings rather than symbols for the key, because keys coming
    # from the config file will be strings
    if !@server_attributes.key? 'serverHost'
      @server_attributes['serverHost'] = Socket.gethostname
    end
  end

  @scalyr_server << '/' unless @scalyr_server.end_with?('/')

  @add_events_uri = URI @scalyr_server + "addEvents"

  num_threads = @buffer_config.flush_thread_count

  #forcibly limit the number of threads to 1 for now, to ensure requests always have incrementing timestamps
  # raise Fluent::ConfigError, "num_threads is currently limited to 1. You specified #{num_threads}." if num_threads > 1
end

#create_request(events, current_threads) ⇒ Object



417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 417

def create_request( events, current_threads )
  #build the scalyr thread objects
  threads = Array.new
  current_threads.each do |tag, id|
    threads << { :id => id.to_s,
                 :name => "Fluentd: #{tag}"
               }
  end

  current_time = self.to_millis( Fluent::Engine.now )

  body = { :token => @api_write_token,
              :client_timestamp => current_time.to_s,
              :session => @session,
              :events => events,
              :threads => threads
            }

  #add server_attributes hash if it exists
  if @server_attributes
    body[:sessionInfo] = @server_attributes
  end

  { :body => body.to_json, :record_count => events.size }
end

#format(tag, time, record) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 154

def format( tag, time, record )
  begin

    if time.nil?
      time = Fluent::Engine.now
    end

    # handle timestamps that are not EventTime types
    if time.is_a?( Integer )
      time = Fluent::EventTime.new( time )
    elsif time.is_a?( Float )
      components = time.divmod 1 #get integer and decimal components
      sec = components[0].to_i
      nsec = (components[1] * 10**9).to_i
      time = Fluent::EventTime.new( sec, nsec )
    end

    if @message_field != "message"
      if record.key? @message_field
        if record.key? "message"
          $log.warn "Overwriting log record field 'message'.  You are seeing this warning because in your fluentd config file you have configured the '#{@message_field}' field to be converted to the 'message' field, but the log record already contains a field called 'message' and this is now being overwritten."
        end
        record["message"] = record[@message_field]
        record.delete( @message_field )
      end
    end

    if @message_encoding and record.key? "message" and record["message"]
      if @replace_invalid_utf8 and @message_encoding == Encoding::UTF_8
        record["message"] = record["message"].encode("UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>").force_encoding('UTF-8')
      else
        record["message"].force_encoding( @message_encoding )
      end
    end
    [tag, time.sec, time.nsec, record].to_msgpack

  rescue JSON::GeneratorError
    $log.warn "Unable to format message due to JSON::GeneratorError.  Record is:\n\t#{record.to_s}"
    raise
  end
end

#formatted_to_msgpack_binaryObject



66
67
68
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 66

def formatted_to_msgpack_binary
  true
end

#handle_response(response) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 281

def handle_response( response )
  $log.debug "Response Code: #{response.code}"
  $log.debug "Response Body: #{response.body}"

  response_hash = Hash.new

  begin
    response_hash = JSON.parse( response.body )
  rescue
    response_hash["status"] = "Invalid JSON response from server"
  end

  #make sure the JSON reponse has a "status" field
  if !response_hash.key? "status"
    $log.debug "JSON response does not contain status message"
    raise ScalyrThreaded::ServerError.new "JSON response does not contain status message"
  end

  status = response_hash["status"]

  #4xx codes are handled separately
  if response.code =~ /^4\d\d/
    raise ScalyrThreaded::Client4xxError.new status
  else
    if status != "success"
      if status =~ /discardBuffer/
        $log.warn "Received 'discardBuffer' message from server.  Buffer dropped."
      elsif status =~ %r"/client/"i
        raise ScalyrThreaded::ClientError.new status
      else #don't check specifically for server, we assume all non-client errors are server errors
        raise ScalyrThreaded::ServerError.new status
      end
    elsif !response.code.include? "200" #response code is a string not an int
      raise ScalyrThreaded::ServerError
    end
  end

end

#post_request(uri, body) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 239

def post_request( uri, body )

  https = Net::HTTP.new( uri.host, uri.port )
  https.use_ssl = true

  #verify peers to prevent potential MITM attacks
  if @ssl_verify_peer
    https.ca_file = @ssl_ca_bundle_path
    https.verify_mode = OpenSSL::SSL::VERIFY_PEER
    https.verify_depth = @ssl_verify_depth
  end

  #use compression if enabled
  encoding = nil

  if @compression_type
    if @compression_type == 'deflate'
      encoding = 'deflate'
      body = Zlib::Deflate.deflate(body, @compression_level)
    elsif @compression_type == 'bz2'
      encoding = 'bz2'
      io = StringIO.new
      bz2 = RBzip2.default_adapter::Compressor.new io
      bz2.write body
      bz2.close
      body = io.string
    end
  end

  post = Net::HTTP::Post.new uri.path
  post.add_field( 'Content-Type', 'application/json' )

  if @compression_type
    post.add_field( 'Content-Encoding', encoding )
  end

  post.body = body

  https.request( post )

end

#startObject



140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 140

def start
  super
  $log.info "Scalyr Threaded Fluentd Plugin ID - #{self.plugin_id()}"
  #Generate a session id.  This will be called once for each <match> in fluent.conf that uses scalyr
  @session = SecureRandom.uuid

  @sync = Mutex.new
  #the following variables are all under the control of the above mutex
    @thread_ids = Hash.new #hash of tags -> id
    @next_id = 1 #incrementing thread id for the session
    @last_timestamp = 0 #timestamp of most recent event in nanoseconds since epoch

end

#to_millis(timestamp) ⇒ Object

explicit function to convert to milliseconds will make things easier to maintain if/when fluentd supports higher than second resolutions



235
236
237
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 235

def to_millis( timestamp )
  (timestamp.sec * 10**3) + (timestamp.nsec / 10**6)
end

#to_nanos(seconds, nsec) ⇒ Object

explicit function to convert to nanoseconds will make things easier to maintain if/when fluentd supports higher than second resolutions



229
230
231
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 229

def to_nanos( seconds, nsec )
  (seconds * 10**9) + nsec
end

#write(chunk) ⇒ Object

called by fluentd when a chunk of log messages is ready



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/fluent/plugin/out_scalyr_threaded.rb', line 197

def write( chunk )
  begin
    $log.debug "Size of chunk is: #{chunk.size}"
    requests = self.build_add_events_body( chunk )
    $log.debug "Chunk split into #{requests.size} request(s)."

    requests.each_with_index { |request, index|
      $log.debug "Request #{index + 1}/#{requests.size}: #{request[:body].bytesize} bytes"
      begin
        response = self.post_request( @add_events_uri, request[:body] )
        self.handle_response( response )
      rescue OpenSSL::SSL::SSLError => e
        if e.message.include? "certificate verify failed"
          $log.warn "SSL certificate verification failed.  Please make sure your certificate bundle is configured correctly and points to a valid file. You can configure this with the ssl_ca_bundle_path configuration option. The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'"
        end
        $log.warn e.message
        $log.warn "Discarding buffer chunk without retrying or logging to <secondary>"
      rescue ScalyrThreaded::Client4xxError => e
        $log.warn "4XX status code received for request #{index + 1}/#{requests.size}.  Discarding buffer without retrying or logging.\n\t#{response.code} - #{e.message}\n\tChunk Size: #{chunk.size}\n\tLog messages this request: #{request[:record_count]}\n\tJSON payload size: #{request[:body].bytesize}\n\tSample: #{request[:body][0,1024]}..."

      end
    }

  rescue JSON::GeneratorError
    $log.warn "Unable to format message due to JSON::GeneratorError."
    raise
  end
end