Class: LogStash::Outputs::CloudWatchLogs::Buffer
- Inherits:
-
Object
- Object
- LogStash::Outputs::CloudWatchLogs::Buffer
- Defined in:
- lib/logstash/outputs/cloudwatchlogs.rb
Overview
This class buffers series of single item to batches and puts batches to a SizedQueue for consumption. A buffer includes an ongoing batch and an out queue. An item is added to the ongoing batch first, when the ongoing batch becomes to ready for consumption and then is added to out queue/emptified. An ongoing batch becomes to comsumption ready if the number of items is going to exceed max_batch_count, or the size of items is going to exceed max_batch_size, with the addition of one more item, or the batch has opend more than buffer_duration milliseconds and has at least one item.
Constant Summary collapse
- CLOSE_BATCH =
:close
Instance Attribute Summary collapse
-
#in_batch ⇒ Object
readonly
Returns the value of attribute in_batch.
-
#in_count ⇒ Object
readonly
Returns the value of attribute in_count.
-
#in_size ⇒ Object
readonly
Returns the value of attribute in_size.
-
#out_queue ⇒ Object
readonly
Returns the value of attribute out_queue.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the buffer.
-
#deq(&proc) ⇒ Object
Deques ready for consumption batches.
-
#enq(item) ⇒ Object
Enques an item to buffer.
-
#initialize(options = {}) ⇒ Buffer
constructor
Creates a new buffer.
Constructor Details
#initialize(options = {}) ⇒ Buffer
Creates a new buffer
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 272 def initialize( = {}) @max_batch_count = .fetch(:max_batch_count) @max_batch_size = .fetch(:max_batch_size) @buffer_duration = .fetch(:buffer_duration) @out_queue_size = .fetch(:out_queue_size, 10) @logger = .fetch(:logger, nil) @size_of_item_proc = .fetch(:size_of_item_proc) @in_batch = Array.new @in_count = 0 @in_size = 0 @out_queue = SizedQueue.new(@out_queue_size) @batch_update_mutex = Mutex.new @last_batch_time = Time.now if @buffer_duration > 0 @scheduled_batcher = Thread.new do loop do sleep(@buffer_duration / 1000.0) enq(:scheduled) end end end end |
Instance Attribute Details
#in_batch ⇒ Object (readonly)
Returns the value of attribute in_batch.
269 270 271 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269 def in_batch @in_batch end |
#in_count ⇒ Object (readonly)
Returns the value of attribute in_count.
269 270 271 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269 def in_count @in_count end |
#in_size ⇒ Object (readonly)
Returns the value of attribute in_size.
269 270 271 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269 def in_size @in_size end |
#out_queue ⇒ Object (readonly)
Returns the value of attribute out_queue.
269 270 271 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269 def out_queue @out_queue end |
Instance Method Details
#close ⇒ Object
Closes the buffer
Adds current batch to the queue and adds CLOSE_BATCH to queue. Waits until consumer completes.
321 322 323 324 325 326 327 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 321 def close while @in_size != 0 do enq(:close) sleep(1) end @out_queue.enq(CLOSE_BATCH) end |
#deq(&proc) ⇒ Object
Deques ready for consumption batches
The caller blocks on this call until the buffer is closed.
332 333 334 335 336 337 338 339 340 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 332 def deq(&proc) loop do batch = @out_queue.deq if batch == CLOSE_BATCH break end proc.call(batch) end end |
#enq(item) ⇒ Object
Enques an item to buffer
-
If ongoing batch is not full with this addition, adds item to batch.
-
If ongoing batch is full with this addition, adds item to batch and add batch to out queue.
-
If ongoing batch is going to overflow with this addition, adds batch to out queue,
and then adds the item to the new batch
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 301 def enq(item) @batch_update_mutex.synchronize do if item == :scheduled || item == :close add_current_batch_to_out_queue(item) return end status = try_add_item(item) if status != 0 add_current_batch_to_out_queue(:add) if status == -1 try_add_item(item) end end end end |