Class: Resque::Plugins::ResqueSliders::KEWatcher

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/resque-sliders/kewatcher.rb

Overview

KEWatcher class provides a daemon to run on host that are running resque workers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers

#add_to_known_hosts, #check_signal, #del_from_known_hosts, #host_config_key, #key_prefix, #known_hosts_key, #pause?, #queue_values, #redis_del_hash, #redis_get_hash, #redis_get_hash_field, #redis_set_hash, #register_setting, #reload?, #set_signal_flag, #stop?, #unregister_setting

Constructor Details

#initialize(options = {}) ⇒ KEWatcher

Initialize daemon with options from command-line.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/resque-sliders/kewatcher.rb', line 20

def initialize(options={})
  @verbosity = (options[:verbosity] || 0).to_i # verbosity level
  @zombie_term_wait = options[:zombie_term_wait] || 20 # time to wait before TERM
  @zombie_kill_wait = ENV['RESQUE_TERM_TIMEOUT'].to_i + @zombie_term_wait unless ENV['RESQUE_TERM_TIMEOUT'].nil?
  @zombie_kill_wait ||= options[:zombie_kill_wait] || 60 # time to wait before -9
  @hostile_takeover = options[:force] # kill running kewatcher?
  @rakefile = File.expand_path(options[:rakefile]) rescue nil
  @rakefile = File.exists?(@rakefile) ? @rakefile : nil if @rakefile
  @pidfile = File.expand_path(options[:pidfile]) rescue nil
  @pidfile = @pidfile =~ /\.pid$/ ? @pidfile : @pidfile + '.pid' if @pidfile
  save_pid!

  @max_children = options[:max_children] || 10
  @hostname = `hostname -s`.chomp.downcase
  @pids = Hash.new # init pids array to track running children
  @need_queues = Array.new # keep track of pids that are needed
  @dead_queues = Array.new # keep track of pids that are dead
  @zombie_pids = Hash.new # keep track of zombie's we kill and dont watch(), with elapsed time we've waited for it to die
  @async = options[:async] || false # sync and wait by default
  @hupped = 0

  Resque.redis = case options[:config]
    when Hash
      [options[:config]['host'], options[:config]['port'], options[:config]['db'] || 0].join(':')
    else
      options[:config]
  end
end

Instance Attribute Details

#max_childrenObject (readonly)

Returns the value of attribute max_children.



17
18
19
# File 'lib/resque-sliders/kewatcher.rb', line 17

def max_children
  @max_children
end

#pidfileObject (readonly)

Returns the value of attribute pidfile.



17
18
19
# File 'lib/resque-sliders/kewatcher.rb', line 17

def pidfile
  @pidfile
end

#verbosityObject

Verbosity level (Integer)



15
16
17
# File 'lib/resque-sliders/kewatcher.rb', line 15

def verbosity
  @verbosity
end

#zombie_kill_waitObject (readonly)

Returns the value of attribute zombie_kill_wait.



17
18
19
# File 'lib/resque-sliders/kewatcher.rb', line 17

def zombie_kill_wait
  @zombie_kill_wait
end

#zombie_term_waitObject (readonly)

Returns the value of attribute zombie_term_wait.



17
18
19
# File 'lib/resque-sliders/kewatcher.rb', line 17

def zombie_term_wait
  @zombie_term_wait
end

Instance Method Details

#run!(interval = 0.1) ⇒ Object

run the daemon



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/resque-sliders/kewatcher.rb', line 50

def run!(interval=0.1)
  interval = Float(interval)
  if running?
    unless @hostile_takeover
      puts "Already running. Restart Not Forced exiting..."
      exit
    end
    restart_running!
  end
  $0 = "KEWatcher: Starting"
  startup

  count = 0
  old = 0 # know when to tell redis we have new different current pids
  loop do
    break if shutdown?
    count += 1
    log! ["watching:", @pids.keys.join(', '), "(#{@pids.keys.length})"].delete_if { |x| x == (nil || '') }.join(' ') if count % (10 / interval) == 1

    tick = count % (20 / interval) == 1
    (log! "checking signals..."; check_signals) if tick
    if not (paused? || shutdown?)
      queue_diff! if tick # do first and also about every 20 seconds so we can throttle calls to redis

      while @pids.keys.length < @max_children && (@need_queues.length > 0 || @dead_queues.length > 0)
        queue = @dead_queues.shift || @need_queues.shift
        exec_string = ""
        exec_string << 'rake'
        exec_string << " -f #{@rakefile}" if @rakefile
        exec_string << ' environment' if ENV['RAILS_ENV']
        exec_string << ' resque:work'
        env_opts = {"QUEUE" => queue}
        if Resque::Version >= '1.22.0' # when API changed for signals
          term_timeout = @zombie_kill_wait - @zombie_term_wait
          term_timeout = term_timeout > 0 ? term_timeout : 1
          env_opts.merge!({
            'TERM_CHILD' => '1',
            'RESQUE_TERM_TIMEOUT' => term_timeout.to_s # use new signal handling
          })
        end
        exec_args = if RUBY_VERSION < '1.9'
          [exec_string, env_opts.map {|k,v| "#{k}=#{v}"}].flatten.join(' ')
        else
          [env_opts, exec_string] # 1.9.x exec
        end
        pid = fork do
          srand # seed
          exec(*exec_args)
        end
        @pids.store(pid, queue) # store pid and queue its running if fork() ?
        procline
      end
    end

    register_setting('current_children', @pids.keys.length) if old != @pids.length
    old = @pids.length

    procline if tick

    sleep(interval) # microsleep
    kill_zombies! unless shutdown? # need to cleanup ones we've killed
    if @hupped > 0
      log "HUP received; purging children..."
      signal_hup
      do_reload!
      @hupped -= 1
    end

    @pids.keys.each do |pid|
      begin
        # check to see if pid is running, by waiting for it, with a timeout
        # Im sure Ruby 1.9 has some better helpers here
        Timeout::timeout(interval / 100) { Process.wait(pid) }
      rescue Timeout::Error
        # Timeout expired, goto next pid
        next
      rescue Errno::ECHILD
        # if no pid exists to wait for, remove it
        log! (paused? || shutdown?) ? "#{pid} (#{@pids[pid]}) child died; no one cares..." : "#{pid} (#{@pids[pid]}) child died; spawning another..."
        remove pid
        break
      end
    end
  end
end

#running?Boolean

Returns PID if already running, false otherwise



137
138
139
140
# File 'lib/resque-sliders/kewatcher.rb', line 137

def running?
  pid = `ps x -o pid,command|grep [K]EWatcher|awk '{print $1}'`.to_i
  pid == 0 ? false : pid
end