Class: LogStash::ShutdownWatcher
- Inherits:
- 
      Object
      
        - Object
- LogStash::ShutdownWatcher
 
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/shutdown_watcher.rb
Constant Summary collapse
- CHECK_EVERY =
          second 
- 1
- REPORT_EVERY =
          checks 
- 5
- ABORT_AFTER =
          stalled reports 
- 3
Instance Attribute Summary collapse
- 
  
    
      #abort_threshold  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute abort_threshold. 
- 
  
    
      #cycle_period  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute cycle_period. 
- 
  
    
      #report_every  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute report_every. 
Class Method Summary collapse
- .start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
- .unsafe_shutdown=(boolean) ⇒ Object
- .unsafe_shutdown? ⇒ Boolean
Instance Method Summary collapse
- #attempts_count ⇒ Object
- #force_exit ⇒ Object
- 
  
    
      #initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER)  ⇒ ShutdownWatcher 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of ShutdownWatcher. 
- #logger ⇒ Object
- #pipeline_report_snapshot ⇒ Object
- 
  
    
      #shutdown_stalled?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    A pipeline shutdown is stalled if * at least REPORT_EVERY reports have been created * the inflight event count is in monotonically increasing * there are worker threads running which aren’t blocked on SizedQueue pop/push * the stalled thread list is constant in the previous REPORT_EVERY reports. 
- #start ⇒ Object
- #stop! ⇒ Object
- #stopped? ⇒ Boolean
Methods included from Util::Loggable
Constructor Details
#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher
Returns a new instance of ShutdownWatcher.
| 15 16 17 18 19 20 21 22 23 | # File 'lib/logstash/shutdown_watcher.rb', line 15 def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) @pipeline = pipeline @cycle_period = cycle_period @report_every = report_every @abort_threshold = abort_threshold @reports = [] @attempts_count = Concurrent::AtomicFixnum.new(0) @running = Concurrent::AtomicBoolean.new(false) end | 
Instance Attribute Details
#abort_threshold ⇒ Object (readonly)
Returns the value of attribute abort_threshold.
| 13 14 15 | # File 'lib/logstash/shutdown_watcher.rb', line 13 def abort_threshold @abort_threshold end | 
#cycle_period ⇒ Object (readonly)
Returns the value of attribute cycle_period.
| 13 14 15 | # File 'lib/logstash/shutdown_watcher.rb', line 13 def cycle_period @cycle_period end | 
#report_every ⇒ Object (readonly)
Returns the value of attribute report_every.
| 13 14 15 | # File 'lib/logstash/shutdown_watcher.rb', line 13 def report_every @report_every end | 
Class Method Details
.start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
| 33 34 35 36 | # File 'lib/logstash/shutdown_watcher.rb', line 33 def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) controller = self.new(pipeline, cycle_period, report_every, abort_threshold) Thread.new(controller) { |controller| controller.start } end | 
.unsafe_shutdown=(boolean) ⇒ Object
| 25 26 27 | # File 'lib/logstash/shutdown_watcher.rb', line 25 def self.unsafe_shutdown=(boolean) @unsafe_shutdown = boolean end | 
.unsafe_shutdown? ⇒ Boolean
| 29 30 31 | # File 'lib/logstash/shutdown_watcher.rb', line 29 def self.unsafe_shutdown? @unsafe_shutdown end | 
Instance Method Details
#attempts_count ⇒ Object
| 42 43 44 | # File 'lib/logstash/shutdown_watcher.rb', line 42 def attempts_count @attempts_count.value end | 
#force_exit ⇒ Object
| 111 112 113 | # File 'lib/logstash/shutdown_watcher.rb', line 111 def force_exit exit(-1) end | 
#logger ⇒ Object
| 38 39 40 | # File 'lib/logstash/shutdown_watcher.rb', line 38 def logger self.class.logger end | 
#pipeline_report_snapshot ⇒ Object
| 87 88 89 | # File 'lib/logstash/shutdown_watcher.rb', line 87 def pipeline_report_snapshot @pipeline.reporter.snapshot end | 
#shutdown_stalled? ⇒ Boolean
A pipeline shutdown is stalled if
- 
at least REPORT_EVERY reports have been created 
- 
the inflight event count is in monotonically increasing 
- 
there are worker threads running which aren’t blocked on SizedQueue pop/push 
- 
the stalled thread list is constant in the previous REPORT_EVERY reports 
| 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | # File 'lib/logstash/shutdown_watcher.rb', line 96 def shutdown_stalled? return false unless @reports.size == @report_every # # is stalled if inflight count is either constant or increasing stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report| prev_report.inflight_count <= next_report.inflight_count end if stalled_event_count @reports.each_cons(2).all? do |prev_report, next_report| prev_report.stalling_threads == next_report.stalling_threads end else false end end | 
#start ⇒ Object
| 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | # File 'lib/logstash/shutdown_watcher.rb', line 54 def start sleep(@cycle_period) cycle_number = 0 stalled_count = 0 running! Stud.interval(@cycle_period) do @attempts_count.increment break if stopped? break unless @pipeline.thread.alive? @reports << pipeline_report_snapshot @reports.delete_at(0) if @reports.size > @report_every # expire old report if cycle_number == (@report_every - 1) # it's report time! logger.warn(@reports.last.to_s) if shutdown_stalled? logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0 stalled_count += 1 if self.class.unsafe_shutdown? && @abort_threshold == stalled_count logger.fatal("Forcefully quitting logstash..") force_exit() break end else stalled_count = 0 end end cycle_number = (cycle_number + 1) % @report_every end ensure stop! end | 
#stop! ⇒ Object
| 46 47 48 | # File 'lib/logstash/shutdown_watcher.rb', line 46 def stop! @running.make_false end | 
#stopped? ⇒ Boolean
| 50 51 52 | # File 'lib/logstash/shutdown_watcher.rb', line 50 def stopped? @running.false? end |