Class: LogStash::Outputs::JSONBatch

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::HttpClient, Stud::Buffer
Defined in:
lib/logstash/outputs/json_batch.rb

Instance Method Summary collapse

Instance Method Details

#flush(events, close = false) ⇒ Object



65
66
67
68
69
70
71
72
73
74
# File 'lib/logstash/outputs/json_batch.rb', line 65

def flush(events, close=false)
  documents = []  #this is the array of hashes that we push to Fusion as documents

  events.each do |event|
      document = event.to_hash()
      documents.push(document)
  end

  make_request(documents)
end

#multi_receive(events) ⇒ Object



76
77
78
# File 'lib/logstash/outputs/json_batch.rb', line 76

def multi_receive(events)
  events.each {|event| buffer_receive(event)}
end

#receive(event, async_type = :background) ⇒ Object

This module currently does not support parallel requests as that would circumvent the batching



60
61
62
# File 'lib/logstash/outputs/json_batch.rb', line 60

def receive(event, async_type=:background)
  buffer_receive(event)
end

#registerObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/logstash/outputs/json_batch.rb', line 30

def register
  # Handle this deprecated option. TODO: remove the option
  #@ssl_certificate_validation = @verify_ssl if @verify_ssl

  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times {|t| @request_tokens << true }
  @total = 0
  @total_failed = 0
  @requests = Array.new

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
  logger.info("Initialized json_batch with settings", 
    :flush_size => @flush_size,
    :idle_flush_time => @idle_flush_time,
    :request_tokens => @pool_max,
    :url => @url,
    :headers => request_headers,
    :retry_individual => @retry_individual)

end