Class: Resque::Plugins::ResqueSliders::KEWatcher

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/resque-sliders/kewatcher.rb

Overview

KEWatcher class provides a daemon to run on host that are running resque workers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers

#check_signal, #host_config_key, #key_prefix, #pause?, #queue_values, #redis_del_hash, #redis_get_hash, #redis_get_hash_field, #redis_set_hash, #register_setting, #reload?, #set_signal_flag, #stop?, #unregister_setting

Constructor Details

#initialize(options = {}) ⇒ KEWatcher

Initialize daemon with options from command-line.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/resque-sliders/kewatcher.rb', line 18

def initialize(options={})
  @verbosity = (options[:verbosity] || 0).to_i
  @ttime = options[:ttime] || 2
  @zombie_wait = options[:wait] || 30
  @hostile_takeover = options[:force]
  @rakefile = File.expand_path(options[:rakefile]) rescue nil
  @rakefile = File.exists?(@rakefile) ? @rakefile : nil if @rakefile
  @pidfile = File.expand_path(options[:pidfile]) rescue nil
  @pidfile = @pidfile =~ /\.pid/ ? @pidfile : @pidfile + '.pid' if @pidfile
  save_pid!

  @max_children = (options[:max_children] || 5).to_i
  @hostname = `hostname -s`.chomp.downcase
  @pids = Hash.new # init pids array to track running children
  @need_queues = Array.new # keep track of pids that are needed
  @dead_queues = Array.new # keep track of pids that are dead
  @zombie_pids = Hash.new # keep track of zombie's we kill and dont watch(), with elapsed time we've waited for it to die

  Resque.redis = case options[:config]
    when Hash
      [options[:config]['host'], options[:config]['port'], options[:config]['db'] || 0].join(':')
    else
      options[:config]
  end
end

Instance Attribute Details

#verbosityObject

Verbosity level (Integer)



15
16
17
# File 'lib/resque-sliders/kewatcher.rb', line 15

def verbosity
  @verbosity
end

Instance Method Details

#run!(interval = 0.1) ⇒ Object

run the daemon



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/resque-sliders/kewatcher.rb', line 45

def run!(interval=0.1)
  interval = Float(interval)
  if running?
    (puts "Already running. Restart Not Forced exiting..."; exit) unless @hostile_takeover
    restart_running!
  end
  $0 = "KEWatcher: Starting"
  startup

  count = 0
  old = 0 # know when to tell redis we have new different current pids
  loop do
    break if shutdown?
    count += 1
    log! ["watching:", @pids.keys.join(', '), "(#{@pids.keys.length})"].delete_if { |x| x == (nil || '') }.join(' ') if count % (10 / interval) == 1

    tick = count % (20 / interval) == 1 ? true : false
    (log! "checking signals..."; check_signals) if tick
    if not (paused? || shutdown?)
      queue_diff! if tick # do first and also about every 20 seconds so we can throttle calls to redis

      while @pids.keys.length < @max_children && (@need_queues.length > 0 || @dead_queues.length > 0)
        queue = @dead_queues.shift || @need_queues.shift
        pid = fork do
          exec_string = "rake#{' -f ' + @rakefile if @rakefile}#{' environment' if ENV['RAILS_ENV']} resque:work"
          if RUBY_VERSION < '1.9'
            exec(exec_string + " QUEUE=#{queue}") # 1.8.x exec
          else
            exec({"QUEUE"=>queue}, exec_string) # 1.9.x exec
          end
        end
        @pids.store(pid, queue) # store offset if linux fork() ?
        procline
      end
    end

    register_setting('current_children', @pids.keys.length) if old != @pids.length
    old = @pids.length

    procline if tick

    sleep(interval) # microsleep
    kill_zombies! unless shutdown? # need to cleanup ones we've killed

    @pids.keys.each do |pid|
      begin
        # check to see if pid is running, by waiting for it, with a timeout
        # Im sure Ruby 1.9 has some better helpers here
        Timeout::timeout(interval / 100) { Process.wait(pid) }
      rescue Timeout::Error
        # Timeout expired, goto next pid
        next
      rescue Errno::ECHILD
        # if no pid exists to wait for, remove it
        log! (paused? || shutdown?) ? "#{pid} (#{@pids[pid]}) child died; no one cares..." : "#{pid} (#{@pids[pid]}) child died; spawning another..."
        remove pid
        break
      end
    end
  end
end

#running?Boolean

Returns PID if already running, false otherwise

Returns:

  • (Boolean)


108
109
110
111
# File 'lib/resque-sliders/kewatcher.rb', line 108

def running?
  pid = `ps x -o pid,command|grep [K]EWatcher|awk '{print $1}'`.to_i
  pid == 0 ? false : pid
end