Class: Sidekiq::Recycler

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/recycler.rb

Constant Summary collapse

@@mutex =

used to avoid race conditions when recycling

Mutex.new
@@recycled =

avoid extra spam after hard limit reached

false

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Recycler

Returns a new instance of Recycler.



11
12
13
14
# File 'lib/sidekiq/recycler.rb', line 11

def initialize(opts={})
  @mem_limit      = opts[:mem_limit] || 300_000 # default is 300mb
  @hard_limit_sec = opts[:hard_limit_sec] || 300 # default to 300 sec
end

Instance Method Details

#call(worker, job, queue) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/sidekiq/recycler.rb', line 16

def call(worker, job, queue)
  begin
    yield

  ensure
    # check mem usage here
    rss = `ps -o rss= -p #{$$}`.to_i
    if rss > @mem_limit then

      # handle race conditions with many jobs/threads completing
      # at the same time
      return if !@@mutex.try_lock or @@recycled
      @@recycled = true

      Sidekiq.logger.warn "Recycler threshold reached: #{rss} > #{@mem_limit}"
      Sidekiq.logger.warn "Attempting to stop gracefully"

      # soft_limit_sec = @soft_limit_sec
      hard_limit_sec = @hard_limit_sec
      launcher = nil

      Thread.new do
        Celluloid::Actor.all.each do |actor|
          # tell sidekiq to exit gracefully
          # stops accepting new work and kills all waiting ("ready") threads
          if actor.kind_of? Sidekiq::Launcher then
            launcher = actor
            Thread.new do
              actor.manager.async.stop
            end
          end
        end
      end

      Thread.new do
        # wait until all threads have exited
        while true do
          sleep 1
          next if launcher.nil?
          if launcher.manager.ready.empty? and launcher.manager.busy.empty? then
            Sidekiq.logger.info "All threads stopped; exiting now!"
            exit
          end
        end
      end

      Thread.new do
        # wait for hard limit sec then kill
        sleep hard_limit_sec
        Sidekiq.logger.warn "Hard limit of #{hard_limit_sec}sec reached; sending TERM signal"
        if !launcher.nil? then
          launcher.stop
        else
          Process.kill("TERM", $$)
        end
      end

    end
  end
end