Class: Delayed::Threaded::Worker
- Inherits:
-
Worker
- Object
- Worker
- Delayed::Threaded::Worker
- Includes:
- SleepCalculator
- Defined in:
- lib/delayed/threaded/worker.rb
Overview
Threaded DJ worker implementation.
-
inspired by Delayed::Command
-
no daemons dependency + thread-safe
Defined Under Namespace
Classes: Config
Class Method Summary collapse
Instance Method Summary collapse
- #exit! ⇒ Object
-
#name ⇒ Object
e.g.
- #start ⇒ Object
- #stop ⇒ Object
- #stop? ⇒ Boolean
- #thread_id ⇒ Object
- #to_s ⇒ Object
Methods included from SleepCalculator
Class Method Details
.lifecycle ⇒ Object
17 |
# File 'lib/delayed/threaded/worker.rb', line 17 def self.lifecycle; Delayed::Worker.lifecycle end |
.setup_lifecycle ⇒ Object
20 |
# File 'lib/delayed/threaded/worker.rb', line 20 def self.setup_lifecycle; Delayed::Worker.setup_lifecycle end |
Instance Method Details
#exit! ⇒ Object
131 132 133 134 135 136 137 138 |
# File 'lib/delayed/threaded/worker.rb', line 131 def exit! return if @exit # #stop? say "Stoping job worker" @exit = true # #stop if Delayed.const_defined?(:Job) && Delayed::Job.respond_to?(:clear_locks!) Delayed::Job.clear_locks!(name) end end |
#name ⇒ Object
e.g. :
def self.sleep_delay=(value)
(Thread.current[:delayed_threaded_worker_config] ||= Config.new).sleep_delay = value
end
def self.min_priority
if (config = Thread.current[:delayed_threaded_worker_config]) && config.key?(:sleep_delay)
config.sleep_delay
else
superclass.sleep_delay # Delayed::Worker.sleep_delay
end
end
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/delayed/threaded/worker.rb', line 93 def name if (@name ||= nil).nil? # super - [prefix]host:hostname pid:process_pid begin @name = "#{super} thread:#{thread_id}".freeze rescue @name = "#{@name_prefix}thread:#{thread_id}".freeze end end @name end |
#start ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/delayed/threaded/worker.rb', line 145 def start say "Starting job worker" trap loop do result = nil realtime = Benchmark.realtime do result = work_off end count = result.sum break if @exit if count.zero? sleep(self.class.sleep_delay) else say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] end break if @exit end end |
#stop ⇒ Object
171 |
# File 'lib/delayed/threaded/worker.rb', line 171 def stop; @exit = true; end |
#stop? ⇒ Boolean
170 |
# File 'lib/delayed/threaded/worker.rb', line 170 def stop?; !!@exit; end |
#thread_id ⇒ Object
108 109 110 111 112 113 114 115 116 |
# File 'lib/delayed/threaded/worker.rb', line 108 def thread_id if ( name = Thread.current.name || '' ).empty? name = java.lang.Thread.currentThread.getName if name.size > 100 && match = name.match(/(.*?)\:\s.*?[\/\\]+/) name = match[1] end end name end |
#to_s ⇒ Object
105 |
# File 'lib/delayed/threaded/worker.rb', line 105 def to_s; name; end |