Class: LogStash::ShutdownWatcher
- Inherits:
- 
      Object
      
        - Object
- LogStash::ShutdownWatcher
 
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/shutdown_watcher.rb
Constant Summary collapse
- CHECK_EVERY =
          second 
- 1
- REPORT_EVERY =
          checks 
- 5
- ABORT_AFTER =
          stalled reports 
- 3
Instance Attribute Summary collapse
- 
  
    
      #abort_threshold  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute abort_threshold. 
- 
  
    
      #cycle_period  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute cycle_period. 
- 
  
    
      #report_every  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute report_every. 
Class Method Summary collapse
- .start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
- .unsafe_shutdown=(boolean) ⇒ Object
- .unsafe_shutdown? ⇒ Boolean
Instance Method Summary collapse
- #force_exit ⇒ Object
- 
  
    
      #initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER)  ⇒ ShutdownWatcher 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of ShutdownWatcher. 
- #logger ⇒ Object
- #pipeline_report_snapshot ⇒ Object
- 
  
    
      #shutdown_stalled?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    A pipeline shutdown is stalled if * at least REPORT_EVERY reports have been created * the inflight event count is in monotonically increasing * there are worker threads running which aren’t blocked on SizedQueue pop/push * the stalled thread list is constant in the previous REPORT_EVERY reports. 
- #start ⇒ Object
Methods included from Util::Loggable
Constructor Details
#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher
Returns a new instance of ShutdownWatcher.
| 13 14 15 16 17 18 19 | # File 'lib/logstash/shutdown_watcher.rb', line 13 def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) @pipeline = pipeline @cycle_period = cycle_period @report_every = report_every @abort_threshold = abort_threshold @reports = [] end | 
Instance Attribute Details
#abort_threshold ⇒ Object (readonly)
Returns the value of attribute abort_threshold.
| 11 12 13 | # File 'lib/logstash/shutdown_watcher.rb', line 11 def abort_threshold @abort_threshold end | 
#cycle_period ⇒ Object (readonly)
Returns the value of attribute cycle_period.
| 11 12 13 | # File 'lib/logstash/shutdown_watcher.rb', line 11 def cycle_period @cycle_period end | 
#report_every ⇒ Object (readonly)
Returns the value of attribute report_every.
| 11 12 13 | # File 'lib/logstash/shutdown_watcher.rb', line 11 def report_every @report_every end | 
Class Method Details
.start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
| 29 30 31 32 | # File 'lib/logstash/shutdown_watcher.rb', line 29 def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) controller = self.new(pipeline, cycle_period, report_every, abort_threshold) Thread.new(controller) { |controller| controller.start } end | 
.unsafe_shutdown=(boolean) ⇒ Object
| 21 22 23 | # File 'lib/logstash/shutdown_watcher.rb', line 21 def self.unsafe_shutdown=(boolean) @unsafe_shutdown = boolean end | 
.unsafe_shutdown? ⇒ Boolean
| 25 26 27 | # File 'lib/logstash/shutdown_watcher.rb', line 25 def self.unsafe_shutdown? @unsafe_shutdown end | 
Instance Method Details
#force_exit ⇒ Object
| 90 91 92 | # File 'lib/logstash/shutdown_watcher.rb', line 90 def force_exit exit(-1) end | 
#logger ⇒ Object
| 34 35 36 | # File 'lib/logstash/shutdown_watcher.rb', line 34 def logger self.class.logger end | 
#pipeline_report_snapshot ⇒ Object
| 66 67 68 | # File 'lib/logstash/shutdown_watcher.rb', line 66 def pipeline_report_snapshot @pipeline.reporter.snapshot end | 
#shutdown_stalled? ⇒ Boolean
A pipeline shutdown is stalled if
- 
at least REPORT_EVERY reports have been created 
- 
the inflight event count is in monotonically increasing 
- 
there are worker threads running which aren’t blocked on SizedQueue pop/push 
- 
the stalled thread list is constant in the previous REPORT_EVERY reports 
| 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | # File 'lib/logstash/shutdown_watcher.rb', line 75 def shutdown_stalled? return false unless @reports.size == @report_every # # is stalled if inflight count is either constant or increasing stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report| prev_report.inflight_count <= next_report.inflight_count end if stalled_event_count @reports.each_cons(2).all? do |prev_report, next_report| prev_report.stalling_threads == next_report.stalling_threads end else false end end | 
#start ⇒ Object
| 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | # File 'lib/logstash/shutdown_watcher.rb', line 38 def start sleep(@cycle_period) cycle_number = 0 stalled_count = 0 Stud.interval(@cycle_period) do break unless @pipeline.thread.alive? @reports << pipeline_report_snapshot @reports.delete_at(0) if @reports.size > @report_every # expire old report if cycle_number == (@report_every - 1) # it's report time! logger.warn(@reports.last.to_s) if shutdown_stalled? logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0 stalled_count += 1 if self.class.unsafe_shutdown? && @abort_threshold == stalled_count logger.fatal("Forcefully quitting logstash..") force_exit() break end else stalled_count = 0 end end cycle_number = (cycle_number + 1) % @report_every end end |