Class: PerfectQueue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(engine, conf) ⇒ Worker

Returns a new instance of Worker.



145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/perfectqueue/worker.rb', line 145

def initialize(engine, conf)
  @engine = engine
  @log = @engine.log

  @run_class = conf[:run_class]
  @monitor = MonitorThread.new(engine, conf)

  @token = nil
  @task = nil
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#process(token, task) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/perfectqueue/worker.rb', line 188

def process(token, task)
  @log.info "processing task id=#{task.id}"

  @monitor.set(token, task.id)
  success = false
  begin
    run = @run_class.new(task)

    if run.respond_to?(:kill)
      @monitor.set_kill_proc run.method(:kill)
    end

    run.run

    @log.info "finished id=#{task.id}"
    success = true

  rescue
    @log.info "failed id=#{task.id}: #{$!.class}: #{$!}"
    $!.backtrace.each {|bt|
      @log.debug "  #{bt}"
    }

  ensure
    @monitor.reset(success)
  end
end

#runObject



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/perfectqueue/worker.rb', line 163

def run
  @monitor.start
  begin
    while true
      @mutex.synchronize {
        while true
          return if @engine.finished?
          break if @token
          @cond.wait(@mutex)
        end
      }
      begin
        process(@token, @task)
      ensure
        @token = nil
        @engine.release_worker(self)
      end
    end
  ensure
    @monitor.stop
  end
rescue
  @engine.stop($!)
end

#shutdownObject



220
221
222
223
# File 'lib/perfectqueue/worker.rb', line 220

def shutdown
  @monitor.shutdown
  @thread.join
end

#startObject



158
159
160
161
# File 'lib/perfectqueue/worker.rb', line 158

def start
  @log.debug "running worker."
  @thread = Thread.new(&method(:run))
end

#stopObject



216
217
218
# File 'lib/perfectqueue/worker.rb', line 216

def stop
  submit(nil, nil)
end

#submit(token, task) ⇒ Object



225
226
227
228
229
230
231
# File 'lib/perfectqueue/worker.rb', line 225

def submit(token, task)
  @mutex.synchronize {
    @token = token
    @task = task
    @cond.broadcast
  }
end