Class: Resque::Plugins::ResqueSliders::KEWatcher
- Inherits:
-
Object
- Object
- Resque::Plugins::ResqueSliders::KEWatcher
- Includes:
- Helpers
- Defined in:
- lib/resque-sliders/kewatcher.rb
Overview
KEWatcher class provides a daemon to run on host that are running resque workers.
Instance Attribute Summary collapse
-
#max_children ⇒ Object
readonly
Returns the value of attribute max_children.
-
#pidfile ⇒ Object
readonly
Returns the value of attribute pidfile.
-
#verbosity ⇒ Object
Verbosity level (Integer).
-
#zombie_kill_wait ⇒ Object
readonly
Returns the value of attribute zombie_kill_wait.
-
#zombie_term_wait ⇒ Object
readonly
Returns the value of attribute zombie_term_wait.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ KEWatcher
constructor
Initialize daemon with options from command-line.
-
#run!(interval = 0.1) ⇒ Object
run the daemon.
-
#running? ⇒ Boolean
Returns PID if already running, false otherwise.
Methods included from Helpers
#add_to_known_hosts, #check_signal, #del_from_known_hosts, #host_config_key, #key_prefix, #known_hosts_key, #pause?, #queue_values, #redis_del_hash, #redis_get_hash, #redis_get_hash_field, #redis_set_hash, #register_setting, #reload?, #set_signal_flag, #stop?, #unregister_setting
Constructor Details
#initialize(options = {}) ⇒ KEWatcher
Initialize daemon with options from command-line.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/resque-sliders/kewatcher.rb', line 20 def initialize(={}) @verbosity = ([:verbosity] || 0).to_i # verbosity level @zombie_term_wait = [:zombie_term_wait] || 20 # time to wait before TERM @zombie_kill_wait = ENV['RESQUE_TERM_TIMEOUT'].to_i + @zombie_term_wait unless ENV['RESQUE_TERM_TIMEOUT'].nil? @zombie_kill_wait ||= [:zombie_kill_wait] || 60 # time to wait before -9 @hostile_takeover = [:force] # kill running kewatcher? @rakefile = File.([:rakefile]) rescue nil @rakefile = File.exists?(@rakefile) ? @rakefile : nil if @rakefile @pidfile = File.([:pidfile]) rescue nil @pidfile = @pidfile =~ /\.pid$/ ? @pidfile : @pidfile + '.pid' if @pidfile save_pid! @max_children = [:max_children] || 10 @hostname = `hostname -s`.chomp.downcase @pids = Hash.new # init pids array to track running children @need_queues = Array.new # keep track of pids that are needed @dead_queues = Array.new # keep track of pids that are dead @zombie_pids = Hash.new # keep track of zombie's we kill and dont watch(), with elapsed time we've waited for it to die @async = [:async] || false # sync and wait by default @hupped = 0 Resque.redis = case [:config] when Hash [[:config]['host'], [:config]['port'], [:config]['db'] || 0].join(':') else [:config] end end |
Instance Attribute Details
#max_children ⇒ Object (readonly)
Returns the value of attribute max_children.
17 18 19 |
# File 'lib/resque-sliders/kewatcher.rb', line 17 def max_children @max_children end |
#pidfile ⇒ Object (readonly)
Returns the value of attribute pidfile.
17 18 19 |
# File 'lib/resque-sliders/kewatcher.rb', line 17 def pidfile @pidfile end |
#verbosity ⇒ Object
Verbosity level (Integer)
15 16 17 |
# File 'lib/resque-sliders/kewatcher.rb', line 15 def verbosity @verbosity end |
#zombie_kill_wait ⇒ Object (readonly)
Returns the value of attribute zombie_kill_wait.
17 18 19 |
# File 'lib/resque-sliders/kewatcher.rb', line 17 def zombie_kill_wait @zombie_kill_wait end |
#zombie_term_wait ⇒ Object (readonly)
Returns the value of attribute zombie_term_wait.
17 18 19 |
# File 'lib/resque-sliders/kewatcher.rb', line 17 def zombie_term_wait @zombie_term_wait end |
Instance Method Details
#run!(interval = 0.1) ⇒ Object
run the daemon
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/resque-sliders/kewatcher.rb', line 50 def run!(interval=0.1) interval = Float(interval) if running? unless @hostile_takeover puts "Already running. Restart Not Forced exiting..." exit end restart_running! end $0 = "KEWatcher: Starting" startup count = 0 old = 0 # know when to tell redis we have new different current pids loop do break if shutdown? count += 1 log! ["watching:", @pids.keys.join(', '), "(#{@pids.keys.length})"].delete_if { |x| x == (nil || '') }.join(' ') if count % (10 / interval) == 1 tick = count % (20 / interval) == 1 (log! "checking signals..."; check_signals) if tick if not (paused? || shutdown?) queue_diff! if tick # do first and also about every 20 seconds so we can throttle calls to redis while @pids.keys.length < @max_children && (@need_queues.length > 0 || @dead_queues.length > 0) queue = @dead_queues.shift || @need_queues.shift exec_string = "" exec_string << 'rake' exec_string << " -f #{@rakefile}" if @rakefile exec_string << ' environment' if ENV['RAILS_ENV'] exec_string << ' resque:work' env_opts = {"QUEUE" => queue} if Resque::Version >= '1.22.0' # when API changed for signals term_timeout = @zombie_kill_wait - @zombie_term_wait term_timeout = term_timeout > 0 ? term_timeout : 1 env_opts.merge!({ 'TERM_CHILD' => '1', 'RESQUE_TERM_TIMEOUT' => term_timeout.to_s # use new signal handling }) end exec_args = if RUBY_VERSION < '1.9' [exec_string, env_opts.map {|k,v| "#{k}=#{v}"}].flatten.join(' ') else [env_opts, exec_string] # 1.9.x exec end pid = fork do srand # seed exec(*exec_args) end @pids.store(pid, queue) # store pid and queue its running if fork() ? procline end end register_setting('current_children', @pids.keys.length) if old != @pids.length old = @pids.length procline if tick sleep(interval) # microsleep kill_zombies! unless shutdown? # need to cleanup ones we've killed if @hupped > 0 log "HUP received; purging children..." signal_hup do_reload! @hupped -= 1 end @pids.keys.each do |pid| begin # check to see if pid is running, by waiting for it, with a timeout # Im sure Ruby 1.9 has some better helpers here Timeout::timeout(interval / 100) { Process.wait(pid) } rescue Timeout::Error # Timeout expired, goto next pid next rescue Errno::ECHILD # if no pid exists to wait for, remove it log! (paused? || shutdown?) ? "#{pid} (#{@pids[pid]}) child died; no one cares..." : "#{pid} (#{@pids[pid]}) child died; spawning another..." remove pid break end end end end |
#running? ⇒ Boolean
Returns PID if already running, false otherwise
137 138 139 140 |
# File 'lib/resque-sliders/kewatcher.rb', line 137 def running? pid = `ps x -o pid,command|grep [K]EWatcher|awk '{print $1}'`.to_i pid == 0 ? false : pid end |