Class: LogStash::Outputs::SumoLogic::MessageQueue

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/logstash/outputs/sumologic/message_queue.rb

Constant Summary

Constants included from Common

Common::CARBON2, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::STATS_TAG

Instance Method Summary collapse

Methods included from Common

#log_dbg, #log_err, #log_info, #log_warn, #set_logger

Constructor Details

#initialize(stats, config) ⇒ MessageQueue

Returns a new instance of MessageQueue.



10
11
12
13
14
15
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 10

def initialize(stats, config)
  @queue_max = (config["queue_max"] ||= 1) < 1 ? 1 : config["queue_max"]
  @queue = SizedQueue::new(@queue_max)
  log_info("initialize memory queue", :max => @queue_max)
  @stats = stats
end

Instance Method Details

#deqObject

def enq



28
29
30
31
32
33
34
35
36
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 28

def deq()
  obj = @queue.deq()
  @stats.record_deque(obj)
  log_dbg("dequeue",
    :objects_in_queue => size,
    :bytes_in_queue => @stats.current_queue_bytes,
    :size => obj.bytesize)
  obj
end

#drainObject

def deq



38
39
40
41
42
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 38

def drain()
  @queue.size.times.map {
    deq()
  }
end

#enq(obj) ⇒ Object

def initialize



17
18
19
20
21
22
23
24
25
26
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 17

def enq(obj)
  if (obj.bytesize > 0)
    @queue.enq(obj)
    @stats.record_enque(obj)
    log_dbg("enqueue",
      :objects_in_queue => size,
      :bytes_in_queue => @stats.current_queue_bytes,
      :size => obj.bytesize)
  end
end

#sizeObject

def drain



44
45
46
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 44

def size()
  @queue.size()
end