Class: LogStash::Outputs::SumoLogic::Piler

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/logstash/outputs/sumologic/piler.rb

Constant Summary

Constants included from Common

Common::CARBON2, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::STATS_TAG

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Common

#log_dbg, #log_err, #log_info, #log_warn, #set_logger

Constructor Details

#initialize(queue, stats, config) ⇒ Piler

Returns a new instance of Piler.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/logstash/outputs/sumologic/piler.rb', line 13

def initialize(queue, stats, config)
  
  @interval = config["interval"] ||= 0
  @pile_max = config["pile_max"] ||= 0
  @queue = queue
  @stats = stats
  @stopping = Concurrent::AtomicBoolean.new(false)
  @is_pile = (@interval > 0 && @pile_max > 0)

  if (@is_pile)
    @pile = Array.new
    @pile_size = 0
    @semaphore = Mutex.new
  end

end

Instance Attribute Details

#is_pileObject (readonly)

Returns the value of attribute is_pile.



11
12
13
# File 'lib/logstash/outputs/sumologic/piler.rb', line 11

def is_pile
  @is_pile
end

Instance Method Details

#input(entry) ⇒ Object

def stop



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/outputs/sumologic/piler.rb', line 52

def input(entry)
  if (@stopping.true?)
    log_warn "piler is shutting down, message ignored", "message" => entry
  elsif (@is_pile)
    @semaphore.synchronize {
      if @pile_size + entry.bytesize > @pile_max
        @queue.enq(@pile.join($/))
        @pile.clear
        @pile_size = 0
        @stats.record_clear_pile()
      end
      @pile << entry
      @pile_size += entry.bytesize
      @stats.record_input(entry)
    }
  else
    @queue.enq(entry)
  end # if
end

#startObject

def initialize



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/logstash/outputs/sumologic/piler.rb', line 30

def start()
  @stopping.make_false()
  if (@is_pile)
    @piler_t = Thread.new { 
      while @stopping.false?
        Stud.stoppable_sleep(@interval) { @stopping.true? }
        log_dbg("timeout, enqueue pile now")
        enq_and_clear()
      end # while
    }
  end # if
end

#stopObject

def start



43
44
45
46
47
48
49
50
# File 'lib/logstash/outputs/sumologic/piler.rb', line 43

def stop()
  @stopping.make_true()
  if (@is_pile)
    log_info "shutting down piler..."
    @piler_t.join
    log_info "piler is fully shutted down"
  end
end