Class: LogStash::Outputs::SumoLogic::Piler
- Inherits:
-
Object
- Object
- LogStash::Outputs::SumoLogic::Piler
- Includes:
- Common
- Defined in:
- lib/logstash/outputs/sumologic/piler.rb
Constant Summary
Constants included from Common
Common::CARBON2, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::STATS_TAG
Instance Attribute Summary collapse
-
#is_pile ⇒ Object
readonly
Returns the value of attribute is_pile.
Instance Method Summary collapse
-
#initialize(queue, stats, config) ⇒ Piler
constructor
A new instance of Piler.
-
#input(entry) ⇒ Object
def stop.
-
#start ⇒ Object
def initialize.
-
#stop ⇒ Object
def start.
Methods included from Common
#log_dbg, #log_err, #log_info, #log_warn, #set_logger
Constructor Details
#initialize(queue, stats, config) ⇒ Piler
Returns a new instance of Piler.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 13 def initialize(queue, stats, config) @interval = config["interval"] ||= 0 @pile_max = config["pile_max"] ||= 0 @queue = queue @stats = stats @stopping = Concurrent::AtomicBoolean.new(false) @is_pile = (@interval > 0 && @pile_max > 0) if (@is_pile) @pile = Array.new @pile_size = 0 @semaphore = Mutex.new end end |
Instance Attribute Details
#is_pile ⇒ Object (readonly)
Returns the value of attribute is_pile.
11 12 13 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 11 def is_pile @is_pile end |
Instance Method Details
#input(entry) ⇒ Object
def stop
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 52 def input(entry) if (@stopping.true?) log_warn "piler is shutting down, message ignored", "message" => entry elsif (@is_pile) @semaphore.synchronize { if @pile_size + entry.bytesize > @pile_max @queue.enq(@pile.join($/)) @pile.clear @pile_size = 0 @stats.record_clear_pile() end @pile << entry @pile_size += entry.bytesize @stats.record_input(entry) } else @queue.enq(entry) end # if end |
#start ⇒ Object
def initialize
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 30 def start() @stopping.make_false() if (@is_pile) @piler_t = Thread.new { while @stopping.false? Stud.stoppable_sleep(@interval) { @stopping.true? } log_dbg("timeout, enqueue pile now") enq_and_clear() end # while } end # if end |
#stop ⇒ Object
def start
43 44 45 46 47 48 49 50 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 43 def stop() @stopping.make_true() if (@is_pile) log_info "shutting down piler..." @piler_t.join log_info "piler is fully shutted down" end end |