Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Includes:
JobLauncher
Defined in:
lib/delayed/worker.rb

Constant Summary collapse

SLEEP =
60
DEFAULT_WORKER_NAME =
"host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"

Constants included from JobLauncher

JobLauncher::MAX_ACTIVE_JOBS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from JobLauncher

#check_thread_sanity, #initialize_launcher, #jobs_ids_in_execution, #jobs_in_execution, #kill_threads!, #launch, #report_jobs_state

Constructor Details

#initialize(options = {}) ⇒ Worker

A worker will be in a loop trying to execute pending jobs looking in the database for that



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/delayed/worker.rb', line 46

def initialize(options={})
  [:quiet, :name, :min_priority, :max_priority, :job_types, :only_for, :group_by,
   :sleep_time
  ].each do |attr_name|
    send "#{attr_name}=", options.delete(attr_name)
  end
  # Default values
  self.name  = DEFAULT_WORKER_NAME if self.name.nil?
  self.quiet = true                if self.quiet.nil?
  self.sleep_time = SLEEP          if self.sleep_time.nil?

  @options = options
  initialize_launcher
end

Instance Attribute Details

#group_byObject

The jobs will be group by this attribute. Each delayed_job is gonna be executed must respond to ‘:group_by`. The jobs will be group by that and only one job can be in execution.



37
38
39
# File 'lib/delayed/worker.rb', line 37

def group_by
  @group_by
end

#job_typesObject

Constraints for this worker, what kind of jobs is gonna execute?



32
33
34
# File 'lib/delayed/worker.rb', line 32

def job_types
  @job_types
end

#max_priorityObject

Constraints for this worker, what kind of jobs is gonna execute?



32
33
34
# File 'lib/delayed/worker.rb', line 32

def max_priority
  @max_priority
end

#min_priorityObject

Constraints for this worker, what kind of jobs is gonna execute?



32
33
34
# File 'lib/delayed/worker.rb', line 32

def min_priority
  @min_priority
end

#nameObject

Every worker has a unique name which by default is the pid of the process (so you should have only one unless override this in the constructor).

Thread.new { Delayed::Worker.new(:name => "Worker 1").start }
Thread.new { Delayed::Worker.new(:name => "Worker 2").start }

There are some advantages to overriding this with something which survives worker retarts: Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.



29
30
31
# File 'lib/delayed/worker.rb', line 29

def name
  @name
end

#only_forObject

Constraints for this worker, what kind of jobs is gonna execute?



32
33
34
# File 'lib/delayed/worker.rb', line 32

def only_for
  @only_for
end

#quietObject

Whether log, also, to stdout or not



40
41
42
# File 'lib/delayed/worker.rb', line 40

def quiet
  @quiet
end

#sleep_timeObject

Seconds to sleep between each loop running available jobs



43
44
45
# File 'lib/delayed/worker.rb', line 43

def sleep_time
  @sleep_time
end

Instance Method Details

#jobs_to_executeObject



86
87
88
# File 'lib/delayed/worker.rb', line 86

def jobs_to_execute
  Job.find_available constraints.merge(:unless => jobs_ids_in_execution)
end

#say(text) ⇒ Object Also known as: log



90
91
92
93
# File 'lib/delayed/worker.rb', line 90

def say(text)
  puts text unless self.quiet
  logger.info text if logger
end

#startObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/delayed/worker.rb', line 61

def start
  say "===> Starting job worker #{name}"

  trap('TERM') { signal_interrupt }
  trap('INT')  { signal_interrupt }

  loop do
    if group_by
      group_by_loop
    else
      normal_loop
    end
    if self.exit
      log "Exit loop"
      break
    end
  end
  kill_threads!
rescue Exception => e
  log "ERROR on worker loop: #{e}"
ensure
  Job.clear_locks! name
  say "<=== Finishing job worker #{name}"
end