Class: LogStash::Outputs::CloudWatchLogs::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/cloudwatchlogs.rb

Overview

This class buffers series of single item to batches and puts batches to a SizedQueue for consumption. A buffer includes an ongoing batch and an out queue. An item is added to the ongoing batch first, when the ongoing batch becomes to ready for consumption and then is added to out queue/emptified. An ongoing batch becomes to comsumption ready if the number of items is going to exceed max_batch_count, or the size of items is going to exceed max_batch_size, with the addition of one more item, or the batch has opend more than buffer_duration milliseconds and has at least one item.

Constant Summary collapse

CLOSE_BATCH =
:close

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Buffer

Creates a new buffer



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 272

def initialize(options = {})
  @max_batch_count = options.fetch(:max_batch_count)
  @max_batch_size = options.fetch(:max_batch_size)
  @buffer_duration = options.fetch(:buffer_duration)
  @out_queue_size = options.fetch(:out_queue_size, 10)
  @logger = options.fetch(:logger, nil)
  @size_of_item_proc = options.fetch(:size_of_item_proc)
  @in_batch = Array.new
  @in_count = 0
  @in_size = 0
  @out_queue = SizedQueue.new(@out_queue_size)
  @batch_update_mutex = Mutex.new
  @last_batch_time = Time.now
  if @buffer_duration > 0
    @scheduled_batcher = Thread.new do
      loop do
        sleep(@buffer_duration / 1000.0)
        enq(:scheduled)
      end
    end
  end
end

Instance Attribute Details

#in_batchObject (readonly)

Returns the value of attribute in_batch.



269
270
271
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269

def in_batch
  @in_batch
end

#in_countObject (readonly)

Returns the value of attribute in_count.



269
270
271
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269

def in_count
  @in_count
end

#in_sizeObject (readonly)

Returns the value of attribute in_size.



269
270
271
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269

def in_size
  @in_size
end

#out_queueObject (readonly)

Returns the value of attribute out_queue.



269
270
271
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 269

def out_queue
  @out_queue
end

Instance Method Details

#closeObject

Closes the buffer

Adds current batch to the queue and adds CLOSE_BATCH to queue. Waits until consumer completes.



321
322
323
324
325
326
327
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 321

def close
  while @in_size != 0 do
    enq(:close)
    sleep(1)
  end
  @out_queue.enq(CLOSE_BATCH)
end

#deq(&proc) ⇒ Object

Deques ready for consumption batches

The caller blocks on this call until the buffer is closed.



332
333
334
335
336
337
338
339
340
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 332

def deq(&proc)
  loop do
    batch = @out_queue.deq
    if batch == CLOSE_BATCH
      break
    end
    proc.call(batch)
  end
end

#enq(item) ⇒ Object

Enques an item to buffer

  • If ongoing batch is not full with this addition, adds item to batch.

  • If ongoing batch is full with this addition, adds item to batch and add batch to out queue.

  • If ongoing batch is going to overflow with this addition, adds batch to out queue,

and then adds the item to the new batch



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 301

def enq(item)
  @batch_update_mutex.synchronize do
    if item == :scheduled || item == :close
      add_current_batch_to_out_queue(item)
      return
    end
    status = try_add_item(item)
    if status != 0
      add_current_batch_to_out_queue(:add)
      if status == -1
        try_add_item(item)
      end
    end
  end
end