Class: PerfectQueue::Worker
- Inherits:
-
Object
- Object
- PerfectQueue::Worker
- Defined in:
- lib/perfectqueue/worker.rb
Instance Method Summary collapse
-
#initialize(engine, conf) ⇒ Worker
constructor
A new instance of Worker.
- #process(token, task) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #submit(token, task) ⇒ Object
Constructor Details
#initialize(engine, conf) ⇒ Worker
Returns a new instance of Worker.
145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/perfectqueue/worker.rb', line 145 def initialize(engine, conf) @engine = engine @log = @engine.log @run_class = conf[:run_class] @monitor = MonitorThread.new(engine, conf) @token = nil @task = nil @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#process(token, task) ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/perfectqueue/worker.rb', line 188 def process(token, task) @log.info "processing task id=#{task.id}" @monitor.set(token, task.id) success = false begin run = @run_class.new(task) if run.respond_to?(:kill) @monitor.set_kill_proc run.method(:kill) end run.run @log.info "finished id=#{task.id}" success = true rescue @log.info "failed id=#{task.id}: #{$!.class}: #{$!}" $!.backtrace.each {|bt| @log.debug " #{bt}" } ensure @monitor.reset(success) end end |
#run ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/perfectqueue/worker.rb', line 163 def run @monitor.start begin while true @mutex.synchronize { while true return if @engine.finished? break if @token @cond.wait(@mutex) end } begin process(@token, @task) ensure @token = nil @engine.release_worker(self) end end ensure @monitor.stop end rescue @engine.stop($!) end |
#shutdown ⇒ Object
220 221 222 223 |
# File 'lib/perfectqueue/worker.rb', line 220 def shutdown @monitor.shutdown @thread.join end |
#start ⇒ Object
158 159 160 161 |
# File 'lib/perfectqueue/worker.rb', line 158 def start @log.debug "running worker." @thread = Thread.new(&method(:run)) end |
#stop ⇒ Object
216 217 218 |
# File 'lib/perfectqueue/worker.rb', line 216 def stop submit(nil, nil) end |
#submit(token, task) ⇒ Object
225 226 227 228 229 230 231 |
# File 'lib/perfectqueue/worker.rb', line 225 def submit(token, task) @mutex.synchronize { @token = token @task = task @cond.broadcast } end |