Class: Resque::Plugins::Telework::Manager

Inherits:
Object
  • Object
show all
Includes:
Redis
Defined in:
lib/resque-telework/manager.rb

Instance Method Summary collapse

Methods included from Redis

#acks_key, #acks_pop, #acks_push, #aliases, #aliases_add, #aliases_key, #aliases_rem, #alive_key, #autos_add, #autos_by_id, #autos_key, #autos_rem, #check_redis, #cmds_key, #cmds_pop, #cmds_push, #comments, #comments_add, #comments_key, #comments_rem, #configuration, #daemons_state, #find_revision, #fmt_date, #get_by_id, #hosts, #hosts_add, #hosts_key, #hosts_rem, #i_am_alive, #i_am_dead, #ids_key, #is_alive, #key_prefix, #last_seen, #last_seen_key, #logs_add, #logs_by_id, #logs_key, #nb_keys, #notes_del, #notes_key, #notes_pop, #notes_push, #queue_length, #queue_list, #queue_prefix, #reconcile, #redis_interface_key, #register_revision, #revisions, #revisions_add, #revisions_key, #status_key, #status_push, #statuses, #tasks, #tasks_add, #tasks_by_id, #tasks_key, #tasks_rem, #text_to_html, #unique_id, #workers, #workers_add, #workers_by_id, #workers_delall, #workers_key, #workers_rem

Constructor Details

#initialize(cfg) ⇒ Manager

Returns a new instance of Manager.



8
9
10
11
12
13
14
15
# File 'lib/resque-telework/manager.rb', line 8

def initialize(cfg)
  @RUN_DAEMON= true
  @HOST= cfg['hostname']
  @SLEEP= cfg['daemon_pooling_interval']
  @WORKERS= {}
  @STOPPED= []
  @AUTO= {}
end

Instance Method Details

#check_autoObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/resque-telework/manager.rb', line 147

def check_auto
  @AUTO.keys.each do |id|
    auto= @AUTO[id]
    autos_add( @HOST, id, auto )
    next unless auto['last_action']+auto['auto_delay'].to_i <= Time.now
    auto= status_auto( id, @AUTO[id] )  # Compute the new status..
    ql= get_queue_length( auto['queue'] )
    ideal= [(ql.to_f / auto['auto_max_waiting_job_per_worker'].to_f).ceil, auto['auto_worker_min'].to_i].max
    count= auto['worker_count'].to_i
    case ideal <=> (count-auto['worker_void'])
    when 0  # Do nothing
    when 1 # Increase number of workers if possible
      inc= [ideal-auto['worker_run'], auto['worker_void']].min
      manage_auto( auto, 'VOID', 'START', inc ) if inc>0
    when -1  # Decrease number of workers if possible
      dec= [auto['worker_run']-ideal, auto['worker_run']].min
      manage_auto( auto, 'RUN', 'QUIT', dec ) if dec>0
    end
  end
end

#check_processesObject



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/resque-telework/manager.rb', line 244

def check_processes
  #workers_delall( @HOST )
  @WORKERS.keys.each do |id|
    remove= false
    unexpected_death= false
    begin # Zombie hunt..
      res= Process.waitpid(@WORKERS[id]['pid'], Process::WNOHANG)
      remove= true if res 
    rescue # Not a child.. so the process is already dead (we don't know why, maybe someone did a kill -9)
      unexpected_death= true
      remove= true
    end
    if remove
      workers_rem( @HOST, id )
      if unexpected_death
        send_status( 'Error', "Worker #{id} (PID #{@WORKERS[id]['pid']}) has unexpectedly ended" )
      else
        send_status( 'Info', "Worker #{id} (PID #{@WORKERS[id]['pid']}) has exited" ) if @STOPPED.index(id)
        send_status( 'Error', "Worker #{id} (PID #{@WORKERS[id]['pid']}) has unexpectedly exited" ) unless @STOPPED.index(id)
        @STOPPED.delete(id)
      end
      @WORKERS.delete(id)
    else
      update_log_snapshot(id)
      workers_add( @HOST, id, @WORKERS[id] )
    end            
  end
end

#do_command(cmd) ⇒ Object

Execute a command synchronously



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/resque-telework/manager.rb', line 79

def do_command( cmd )
  case cmd['command']
  when 'start_worker'
    start_worker( cmd, find_revision(cmd['revision']) )
  when 'signal_worker'
    manage_worker( cmd )
  when 'start_auto'
    start_auto( cmd, find_revision(cmd['revision']) )
  when 'stop_auto'
    stop_auto( cmd )
  when 'stop_daemon'
    @RUN_DAEMON= false
  when 'kill_daemon'
    send_status( 'Error', "A kill request has been received, the daemon on #{@HOST} is now brutally terminating by calling exit()")
    i_am_dead
    exit # Bye
  else
    send_status( 'Error', "Unknown command '#{cmd['command']}'" )
  end
end

#get_queue_length(qs) ⇒ Object



273
274
275
276
277
# File 'lib/resque-telework/manager.rb', line 273

def get_queue_length( qs )
  ql= qs.split(",")
  l= ql.include?("*") ? queue_list : ql
  l.inject(0) { |a,e| a+queue_length(e) }
end

#get_tail(f, size) ⇒ Object



298
299
300
# File 'lib/resque-telework/manager.rb', line 298

def get_tail( f, size )
  `tail -n #{size} #{f}`
end

#health_infoObject

Health info



60
61
62
63
64
65
66
67
68
# File 'lib/resque-telework/manager.rb', line 60

def health_info
  require "sys/cpu"
  load= Sys::CPU.load_avg
  { :cpu_load_1mins => load[0],
    :cpu_load_5mins => load[1],
    :cpu_load_15mins => load[2] }
rescue
  {}
end

#manage_auto(auto, status, action, n0) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/resque-telework/manager.rb', line 129

def manage_auto( auto, status, action, n0 )
  n= 0
  auto['worker_status'].each_with_index do |s, i|
    if s==status
      cmd= auto.clone
      cmd['worker_id']= auto['worker_id'][i]
      if 'START'==action 
        start_worker( cmd, cmd['rev_info'], true )
      else
        cmd['action']= action
        manage_worker( cmd )
      end
      n+= 1
    end
    break if n==n0
  end
end

#manage_worker(cmd) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/resque-telework/manager.rb', line 227

def manage_worker ( cmd )
  id= cmd['worker_id']
  sig= cmd['action'] # Can be QUIT, KILL, CONT, PAUSE
  info= @WORKERS[id]
  send_status( 'Error', "Worker #{id} was not found on this host" ) unless info
  return unless info
  status= sig
  sig= 'USR2' if 'PAUSE'==sig # Pause a Resque worker using USR2 signal
  status= 'RUN' if status=='CONT'
  send_status( 'Info', "Signaling worker #{id} (PID #{info['pid']}) using signal #{sig}" )
  Process.kill( sig, info['pid'] ) # Signaling...
  @STOPPED << id if 'QUIT'==sig || 'KILL'==sig
  info['status']= status
  workers_add( @HOST, id, info )
  @WORKERS[id]= info
end

#send_status(severity, message) ⇒ Object

Add a status message on the status queue



71
72
73
74
75
76
# File 'lib/resque-telework/manager.rb', line 71

def send_status( severity, message )
  puts "Telework: #{severity}: #{message}"
  info= { 'host'=> @HOST, 'severity' => severity, 'message'=> message,
          'date'=> Time.now }
  status_push(info)
end

#startObject

The manager (e.g. daemon) main loop



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/resque-telework/manager.rb', line 18

def start
  send_status( 'Info', "Daemon (PID #{Process.pid}) starting on host #{@HOST}" )
  unless check_redis # Check the Redis interface version
   err= "Telework: Error: Redis interface version mismatch, exiting"
   puts err # We can't use send_status() as it relies on Redis so we just show a message
   raise err
  end
  if is_alive(@HOST)  # Only one deamon can be run on a given host at the moment (this may change)
    send_status( 'Error', "There is already a daemon running on #{@HOST}")
    send_status( 'Error', "This daemon (PID #{Process.pid}) cannot be started and will terminare now")
    exit
  end
  loop do                                # The main loop
    while @RUN_DAEMON do                 # If there is no request to stop
      i_am_alive(health_info)            # Notify the system that the daemon is alive
      check_processes                    # Check the status of the child processes (to catch zombies)
      while cmd= cmds_pop( @HOST ) do    # Pop a command in the command queue
        do_command(cmd)                  # Execute it
      end
      check_auto                         # Deal with the task in auto mode
      sleep @SLEEP                       # Sleep
    end
                                         # A stop request has been received
    send_status( 'Info', "A stop request has been received and the #{@HOST} daemon will now terminate") if @WORKERS.empty?
    break if @WORKERS.empty?
    send_status( 'Error', "A stop request has been received by the #{@HOST} daemon but there are still running worker(s) so it will keep running") unless @WORKERS.empty?
    @RUN_DAEMON= true
  end
  i_am_dead
rescue Interrupt         # Control-C
  send_status( 'Info', "Interruption for #{@HOST} daemon, exiting gracefully") if @WORKERS.empty?
  send_status( 'Error', "Interruption for #{@HOST} daemon, exiting, running workers may now unexpectedly terminate") unless @WORKERS.empty?
rescue SystemExit        # Exit has been called
  send_status( 'Info', "Exit called in #{@HOST} daemon") if @WORKERS.empty?
  send_status( 'Error', "Exit called in #{@HOST} daemon but workers are still running") unless @WORKERS.empty?
rescue Exception => e    # Other exceptions
  send_status( 'Error', "Exception #{e.message}")
  puts "Backtrace: #{e.backtrace}"
  send_status( 'Error', "Exception should not be raised in the #{@HOST} daemon, please submit a bug report")
end

#start_auto(cmd0, rev_info) ⇒ Object

Start auto session



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/resque-telework/manager.rb', line 169

def start_auto( cmd0, rev_info )
  auto_def= { 'auto_max_waiting_job_per_worker' => 1,'auto_worker_min' => 0, 'auto_delay' => 15 }
  cmd= auto_def.merge( cmd0 )
  id= cmd['task_id']
  if @AUTO[id]
    send_status( 'Error', "Task #{id} is already running in auto mode")
    return
  end
  send_status( 'Info', "Task #{id} is now in auto mode")
  auto= cmd                       # Should be defined in cmd: task_id, worker_count, worker_id, queue, rails_env, exec
  auto['rev_info']= rev_info
  # Get status for the workers
  auto['last_action']= Time.now - auto['auto_delay'].to_i
  @AUTO[id]= auto       
end

#start_worker(cmd, rev_info, auto = false) ⇒ Object

Start a task



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/resque-telework/manager.rb', line 186

def start_worker( cmd, rev_info, auto=false )
  # Retrieving args
  path= rev_info['revision_path']
  log_path= rev_info['revision_log_path']
  log_path||= "."
  rev= rev_info['revision']
  id= cmd['worker_id']
  queuel= cmd['queue'].gsub(/,/, '_').gsub(/\*/, 'STAR')
  # Starting the job
  env= {}
  env["QUEUE"]= cmd['queue']
  # env["COUNT"]= cmd['worker_count'] if cmd['worker_count']
  env["RAILS_ENV"]= cmd['rails_env'] if "(default)" != cmd['rails_env']
  env["BUNDLE_GEMFILE"] = path+"/Gemfile" if ENV["BUNDLE_GEMFILE"]           # To make sure we use the new gems
  opt= { :in => "/dev/null", 
         :out => "#{log_path}/telework_#{id}_#{queuel}_stdout.log", 
         :err => "#{log_path}/telework_#{id}_#{queuel}_stderr.log", 
         :chdir => path,
         :unsetenv_others => false }
  exec= cmd['exec']
  pid= spawn( env, exec, opt) # Start it!
  info= { 'pid' => pid, 'status' => 'RUN', 'environment' => env, 'options' => opt, 'revision_info' => rev_info }
  # Log snapshot
  info['log_snapshot_period']= cmd['log_snapshot_period'].to_i if cmd['log_snapshot_period']
  info['log_snapshort_lines']= cmd['log_snapshot_lines'].to_i if cmd['log_snapshot_lines']
  info['mode']= auto ? 'Auto' : 'Manual'
  @WORKERS[id]= info
  workers_add( @HOST, id, info )
  send_status( 'Info', "Starting worker #{id} (PID #{pid})" )
  # Create an helper file
  intro = "# Telework: starting worker #{id} on host #{@HOST} at #{Time.now.strftime("%a %b %e %R %Y")}"
  env.keys.each { |v| intro+= "\n# Telework: environment variable '#{v}' set to '#{env[v]}'" }
  intro+= "\n# Telework: command line is: #{exec}"
  intro+= "\n# Telework: path is: #{path}"
  intro+= "\n# Telework: log file for stdout is: #{opt[:out]}"
  intro+= "\n# Telework: log file for stderr is: #{opt[:err]}"
  intro+= "\n# Telework: PID is: #{pid}"
  intro+= "\n"
  File.open("#{log_path}/telework_#{id}.log", 'w') { |f| f.write(intro) }
end

#status_auto(id, auto) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/resque-telework/manager.rb', line 112

def status_auto( id, auto )
  n= nvoid= nrun= 0
  auto['worker_status']= []
  auto['worker_id'].each do |id|
    s= @WORKERS[id] ? @WORKERS[id]['status'] : 'VOID'
    nvoid+= 1 if 'VOID'==s
    nrun+= 1 if 'RUN'==s
    n+= 1
    auto['worker_status'] << s
  end
  auto['worker_run']= nrun
  auto['worker_void']= nvoid
  auto['worker_unknown']= n-nrun-nvoid
  @AUTO[id]= auto
  auto
end

#stop_auto(auto) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/resque-telework/manager.rb', line 100

def stop_auto( auto )
  id= auto['task_id']
  auto['worker_id'].each do |wid|
    if @WORKERS[wid]
      manage_worker( { 'worker_id' => wid, 'action' => 'QUIT'} )
    end
  end
  @AUTO.delete(auto['task_id'])
  autos_rem( @HOST, id )
  send_status( 'Info', "Task #{id} is now in manual mode")
end

#update_log_snapshot(id) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/resque-telework/manager.rb', line 279

def update_log_snapshot( id )
  ls= @WORKERS[id]['log_snapshot_period']
  return unless ls
  last= @WORKERS[id]['last_log_snapshot']
  last||= 0
  now= Time.now.to_i
  if now >= last+ls
    size= @WORKERS[id]['log_snapshot_lines']
    size||= 20
    # Getting the logs
    logerr= get_tail( @WORKERS[id]['options'][:err], size )
    logout= get_tail( @WORKERS[id]['options'][:out], size )
    # Write back
    info= { :date => Time.now, :log_stderr => logerr, :log_stdout => logout }
    logs_add( @HOST, id, info )
    @WORKERS[id]['last_log_snapshot']= now
  end 
end