Class: LogStash::ShutdownWatcher

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/shutdown_watcher.rb

Constant Summary collapse

CHECK_EVERY =

second

1
REPORT_EVERY =

checks

5
ABORT_AFTER =

stalled reports

3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #slow_logger

Constructor Details

#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher

Returns a new instance of ShutdownWatcher.



13
14
15
16
17
18
19
# File 'lib/logstash/shutdown_watcher.rb', line 13

def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
  @pipeline = pipeline
  @cycle_period = cycle_period
  @report_every = report_every
  @abort_threshold = abort_threshold
  @reports = []
end

Instance Attribute Details

#abort_thresholdObject (readonly)

Returns the value of attribute abort_threshold.



11
12
13
# File 'lib/logstash/shutdown_watcher.rb', line 11

def abort_threshold
  @abort_threshold
end

#cycle_periodObject (readonly)

Returns the value of attribute cycle_period.



11
12
13
# File 'lib/logstash/shutdown_watcher.rb', line 11

def cycle_period
  @cycle_period
end

#report_everyObject (readonly)

Returns the value of attribute report_every.



11
12
13
# File 'lib/logstash/shutdown_watcher.rb', line 11

def report_every
  @report_every
end

Class Method Details

.start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object



29
30
31
32
# File 'lib/logstash/shutdown_watcher.rb', line 29

def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
  controller = self.new(pipeline, cycle_period, report_every, abort_threshold)
  Thread.new(controller) { |controller| controller.start }
end

.unsafe_shutdown=(boolean) ⇒ Object



21
22
23
# File 'lib/logstash/shutdown_watcher.rb', line 21

def self.unsafe_shutdown=(boolean)
  @unsafe_shutdown = boolean
end

.unsafe_shutdown?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/logstash/shutdown_watcher.rb', line 25

def self.unsafe_shutdown?
  @unsafe_shutdown
end

Instance Method Details

#force_exitObject



90
91
92
# File 'lib/logstash/shutdown_watcher.rb', line 90

def force_exit
  exit(-1)
end

#loggerObject



34
35
36
# File 'lib/logstash/shutdown_watcher.rb', line 34

def logger
  self.class.logger
end

#pipeline_report_snapshotObject



66
67
68
# File 'lib/logstash/shutdown_watcher.rb', line 66

def pipeline_report_snapshot
  @pipeline.reporter.snapshot
end

#shutdown_stalled?Boolean

A pipeline shutdown is stalled if

  • at least REPORT_EVERY reports have been created

  • the inflight event count is in monotonically increasing

  • there are worker threads running which aren’t blocked on SizedQueue pop/push

  • the stalled thread list is constant in the previous REPORT_EVERY reports

Returns:

  • (Boolean)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/logstash/shutdown_watcher.rb', line 75

def shutdown_stalled?
  return false unless @reports.size == @report_every #
  # is stalled if inflight count is either constant or increasing
  stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report|
    prev_report.inflight_count <= next_report.inflight_count
  end
  if stalled_event_count
    @reports.each_cons(2).all? do |prev_report, next_report|
      prev_report.stalling_threads == next_report.stalling_threads
    end
  else
    false
  end
end

#startObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/shutdown_watcher.rb', line 38

def start
  sleep(@cycle_period)
  cycle_number = 0
  stalled_count = 0
  Stud.interval(@cycle_period) do
    break unless @pipeline.thread.alive?
    @reports << pipeline_report_snapshot
    @reports.delete_at(0) if @reports.size > @report_every # expire old report
    if cycle_number == (@report_every - 1) # it's report time!
      logger.warn(@reports.last.to_s)

      if shutdown_stalled?
        logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0
        stalled_count += 1

        if self.class.unsafe_shutdown? && @abort_threshold == stalled_count
          logger.fatal("Forcefully quitting logstash..")
          force_exit()
          break
        end
      else
        stalled_count = 0
      end
    end
    cycle_number = (cycle_number + 1) % @report_every
  end
end