Class: LogStash::Outputs::JSONBatch
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::JSONBatch
- Includes:
- PluginMixins::HttpClient, Stud::Buffer
- Defined in:
- lib/logstash/outputs/json_batch.rb
Instance Method Summary collapse
- #flush(events, close = false) ⇒ Object
- #multi_receive(events) ⇒ Object
-
#receive(event, async_type = :background) ⇒ Object
This module currently does not support parallel requests as that would circumvent the batching.
- #register ⇒ Object
Instance Method Details
#flush(events, close = false) ⇒ Object
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/logstash/outputs/json_batch.rb', line 65 def flush(events, close=false) documents = [] #this is the array of hashes that we push to Fusion as documents events.each do |event| document = event.to_hash() documents.push(document) end make_request(documents) end |
#multi_receive(events) ⇒ Object
76 77 78 |
# File 'lib/logstash/outputs/json_batch.rb', line 76 def multi_receive(events) events.each {|event| buffer_receive(event)} end |
#receive(event, async_type = :background) ⇒ Object
This module currently does not support parallel requests as that would circumvent the batching
60 61 62 |
# File 'lib/logstash/outputs/json_batch.rb', line 60 def receive(event, async_type=:background) buffer_receive(event) end |
#register ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/logstash/outputs/json_batch.rb', line 30 def register # Handle this deprecated option. TODO: remove the option #@ssl_certificate_validation = @verify_ssl if @verify_ssl # We count outstanding requests with this queue # This queue tracks the requests to create backpressure # When this queue is empty no new requests may be sent, # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } @total = 0 @total_failed = 0 @requests = Array.new buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) logger.info("Initialized json_batch with settings", :flush_size => @flush_size, :idle_flush_time => @idle_flush_time, :request_tokens => @pool_max, :url => @url, :headers => request_headers, :retry_individual => @retry_individual) end |