Class: PerfectQueue::Worker
- Inherits:
-
Object
- Object
- PerfectQueue::Worker
- Defined in:
- lib/perfectqueue/worker.rb
Instance Method Summary collapse
-
#initialize(engine, conf) ⇒ Worker
constructor
A new instance of Worker.
- #process(token, task) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #submit(token, task) ⇒ Object
Constructor Details
#initialize(engine, conf) ⇒ Worker
Returns a new instance of Worker.
127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/perfectqueue/worker.rb', line 127 def initialize(engine, conf) @engine = engine @log = @engine.log @run_class = conf[:run_class] @monitor = MonitorThread.new(engine, conf) @token = nil @task = nil @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#process(token, task) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/perfectqueue/worker.rb', line 165 def process(token, task) @log.info "processing task id=#{task.id}" @monitor.set(token) success = false begin run = @run_class.new(task) if run.respond_to?(:kill) @monitor.set_kill_proc run.method(:kill) end run.run @log.info "finished id=#{task.id}" success = true rescue @log.info "failed id=#{task.id}: #{$!}" ensure @monitor.reset(success) end end |
#run ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/perfectqueue/worker.rb', line 145 def run while true @mutex.synchronize { while true return if @engine.finished? break if @token @cond.wait(@mutex) end } begin process(@token, @task) ensure @token = nil @engine.release_worker(self) end end rescue @engine.stop($!) end |
#shutdown ⇒ Object
195 196 197 198 |
# File 'lib/perfectqueue/worker.rb', line 195 def shutdown @monitor.shutdown @thread.join end |
#start ⇒ Object
140 141 142 143 |
# File 'lib/perfectqueue/worker.rb', line 140 def start @thread = Thread.new(&method(:run)) @monitor.start end |
#stop ⇒ Object
190 191 192 193 |
# File 'lib/perfectqueue/worker.rb', line 190 def stop submit(nil, nil) @monitor.stop end |
#submit(token, task) ⇒ Object
200 201 202 203 204 205 206 |
# File 'lib/perfectqueue/worker.rb', line 200 def submit(token, task) @mutex.synchronize { @token = token @task = task @cond.broadcast } end |