Class: Delayed::Worker
- Inherits:
-
Object
- Object
- Delayed::Worker
- Includes:
- JobLauncher
- Defined in:
- lib/delayed/worker.rb
Constant Summary collapse
- SLEEP =
60
- DEFAULT_WORKER_NAME =
"host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
Constants included from JobLauncher
Instance Attribute Summary collapse
-
#group_by ⇒ Object
The jobs will be group by this attribute.
-
#job_types ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?.
-
#max_priority ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?.
-
#min_priority ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?.
-
#name ⇒ Object
Every worker has a unique name which by default is the pid of the process (so you should have only one unless override this in the constructor).
-
#only_for ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?.
-
#quiet ⇒ Object
Whether log, also, to stdout or not.
-
#sleep_time ⇒ Object
Seconds to sleep between each loop running available jobs.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Worker
constructor
A worker will be in a loop trying to execute pending jobs looking in the database for that.
- #jobs_to_execute ⇒ Object
- #say(text) ⇒ Object (also: #log)
- #start ⇒ Object
Methods included from JobLauncher
#check_thread_sanity, #initialize_launcher, #jobs_ids_in_execution, #jobs_in_execution, #kill_threads!, #launch, #report_jobs_state
Constructor Details
#initialize(options = {}) ⇒ Worker
A worker will be in a loop trying to execute pending jobs looking in the database for that
46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/delayed/worker.rb', line 46 def initialize(={}) [:quiet, :name, :min_priority, :max_priority, :job_types, :only_for, :group_by, :sleep_time ].each do |attr_name| send "#{attr_name}=", .delete(attr_name) end # Default values self.name = DEFAULT_WORKER_NAME if self.name.nil? self.quiet = true if self.quiet.nil? self.sleep_time = SLEEP if self.sleep_time.nil? = initialize_launcher end |
Instance Attribute Details
#group_by ⇒ Object
The jobs will be group by this attribute. Each delayed_job is gonna be executed must respond to ‘:group_by`. The jobs will be group by that and only one job can be in execution.
37 38 39 |
# File 'lib/delayed/worker.rb', line 37 def group_by @group_by end |
#job_types ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?
32 33 34 |
# File 'lib/delayed/worker.rb', line 32 def job_types @job_types end |
#max_priority ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?
32 33 34 |
# File 'lib/delayed/worker.rb', line 32 def max_priority @max_priority end |
#min_priority ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?
32 33 34 |
# File 'lib/delayed/worker.rb', line 32 def min_priority @min_priority end |
#name ⇒ Object
Every worker has a unique name which by default is the pid of the process (so you should have only one unless override this in the constructor).
Thread.new { Delayed::Worker.new(:name => "Worker 1").start }
Thread.new { Delayed::Worker.new(:name => "Worker 2").start }
There are some advantages to overriding this with something which survives worker retarts: Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
29 30 31 |
# File 'lib/delayed/worker.rb', line 29 def name @name end |
#only_for ⇒ Object
Constraints for this worker, what kind of jobs is gonna execute?
32 33 34 |
# File 'lib/delayed/worker.rb', line 32 def only_for @only_for end |
#quiet ⇒ Object
Whether log, also, to stdout or not
40 41 42 |
# File 'lib/delayed/worker.rb', line 40 def quiet @quiet end |
#sleep_time ⇒ Object
Seconds to sleep between each loop running available jobs
43 44 45 |
# File 'lib/delayed/worker.rb', line 43 def sleep_time @sleep_time end |
Instance Method Details
#jobs_to_execute ⇒ Object
86 87 88 |
# File 'lib/delayed/worker.rb', line 86 def jobs_to_execute Job.find_available constraints.merge(:unless => jobs_ids_in_execution) end |
#say(text) ⇒ Object Also known as: log
90 91 92 93 |
# File 'lib/delayed/worker.rb', line 90 def say(text) puts text unless self.quiet logger.info text if logger end |
#start ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/delayed/worker.rb', line 61 def start say "===> Starting job worker #{name}" trap('TERM') { signal_interrupt } trap('INT') { signal_interrupt } loop do if group_by group_by_loop else normal_loop end if self.exit log "Exit loop" break end end kill_threads! rescue Exception => e log "ERROR on worker loop: #{e}" ensure Job.clear_locks! name say "<=== Finishing job worker #{name}" end |