Class: LogStash::ShutdownWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/shutdown_watcher.rb

Constant Summary collapse

CHECK_EVERY =

second

1
REPORT_EVERY =

checks

5
ABORT_AFTER =

stalled reports

3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher

Returns a new instance of ShutdownWatcher.



12
13
14
15
16
17
18
# File 'lib/logstash/shutdown_watcher.rb', line 12

def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
  @pipeline = pipeline
  @cycle_period = cycle_period
  @report_every = report_every
  @abort_threshold = abort_threshold
  @reports = []
end

Instance Attribute Details

#abort_thresholdObject (readonly)

Returns the value of attribute abort_threshold.



10
11
12
# File 'lib/logstash/shutdown_watcher.rb', line 10

def abort_threshold
  @abort_threshold
end

#cycle_periodObject (readonly)

Returns the value of attribute cycle_period.



10
11
12
# File 'lib/logstash/shutdown_watcher.rb', line 10

def cycle_period
  @cycle_period
end

#report_everyObject (readonly)

Returns the value of attribute report_every.



10
11
12
# File 'lib/logstash/shutdown_watcher.rb', line 10

def report_every
  @report_every
end

Class Method Details

.loggerObject



32
33
34
# File 'lib/logstash/shutdown_watcher.rb', line 32

def self.logger
  @logger ||= Cabin::Channel.get(LogStash)
end

.logger=(logger) ⇒ Object



28
29
30
# File 'lib/logstash/shutdown_watcher.rb', line 28

def self.logger=(logger)
  @logger = logger
end

.start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object



36
37
38
39
# File 'lib/logstash/shutdown_watcher.rb', line 36

def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
  controller = self.new(pipeline, cycle_period, report_every, abort_threshold)
  Thread.new(controller) { |controller| controller.start }
end

.unsafe_shutdown=(boolean) ⇒ Object



20
21
22
# File 'lib/logstash/shutdown_watcher.rb', line 20

def self.unsafe_shutdown=(boolean)
  @unsafe_shutdown = boolean
end

.unsafe_shutdown?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/logstash/shutdown_watcher.rb', line 24

def self.unsafe_shutdown?
  @unsafe_shutdown
end

Instance Method Details

#force_exitObject



97
98
99
# File 'lib/logstash/shutdown_watcher.rb', line 97

def force_exit
  exit(-1)
end

#loggerObject



41
42
43
# File 'lib/logstash/shutdown_watcher.rb', line 41

def logger
  self.class.logger
end

#pipeline_report_snapshotObject



73
74
75
# File 'lib/logstash/shutdown_watcher.rb', line 73

def pipeline_report_snapshot
  @pipeline.reporter.snapshot
end

#shutdown_stalled?Boolean

A pipeline shutdown is stalled if

  • at least REPORT_EVERY reports have been created

  • the inflight event count is in monotonically increasing

  • there are worker threads running which aren’t blocked on SizedQueue pop/push

  • the stalled thread list is constant in the previous REPORT_EVERY reports

Returns:

  • (Boolean)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/logstash/shutdown_watcher.rb', line 82

def shutdown_stalled?
  return false unless @reports.size == @report_every #
  # is stalled if inflight count is either constant or increasing
  stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report|
    prev_report.inflight_count <= next_report.inflight_count
  end
  if stalled_event_count
    @reports.each_cons(2).all? do |prev_report, next_report|
      prev_report.stalling_threads == next_report.stalling_threads
    end
  else
    false
  end
end

#startObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/shutdown_watcher.rb', line 45

def start
  sleep(@cycle_period)
  cycle_number = 0
  stalled_count = 0
  Stud.interval(@cycle_period) do
    break unless @pipeline.thread.alive?
    @reports << pipeline_report_snapshot
    @reports.delete_at(0) if @reports.size > @report_every # expire old report
    if cycle_number == (@report_every - 1) # it's report time!
      logger.warn(@reports.last)

      if shutdown_stalled?
        logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0
        stalled_count += 1

        if self.class.unsafe_shutdown? && @abort_threshold == stalled_count
          logger.fatal("Forcefully quitting logstash..")
          force_exit()
          break
        end
      else
        stalled_count = 0
      end
    end
    cycle_number = (cycle_number + 1) % @report_every
  end
end