Class: PerfectQueue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(engine, conf) ⇒ Worker

Returns a new instance of Worker.



127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/perfectqueue/worker.rb', line 127

def initialize(engine, conf)
  @engine = engine
  @log = @engine.log

  @run_class = conf[:run_class]
  @monitor = MonitorThread.new(engine, conf)

  @token = nil
  @task = nil
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#process(token, task) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/perfectqueue/worker.rb', line 165

def process(token, task)
  @log.info "processing task id=#{task.id}"

  @monitor.set(token)
  success = false
  begin
    run = @run_class.new(task)

    if run.respond_to?(:kill)
      @monitor.set_kill_proc run.method(:kill)
    end

    run.run

    @log.info "finished id=#{task.id}"
    success = true

  rescue
    @log.info "failed id=#{task.id}: #{$!}"

  ensure
    @monitor.reset(success)
  end
end

#runObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/perfectqueue/worker.rb', line 145

def run
  while true
    @mutex.synchronize {
      while true
        return if @engine.finished?
        break if @token
        @cond.wait(@mutex)
      end
    }
    begin
      process(@token, @task)
    ensure
      @token = nil
      @engine.release_worker(self)
    end
  end
rescue
  @engine.stop($!)
end

#shutdownObject



195
196
197
198
# File 'lib/perfectqueue/worker.rb', line 195

def shutdown
  @monitor.shutdown
  @thread.join
end

#startObject



140
141
142
143
# File 'lib/perfectqueue/worker.rb', line 140

def start
  @thread = Thread.new(&method(:run))
  @monitor.start
end

#stopObject



190
191
192
193
# File 'lib/perfectqueue/worker.rb', line 190

def stop
  submit(nil, nil)
  @monitor.stop
end

#submit(token, task) ⇒ Object



200
201
202
203
204
205
206
# File 'lib/perfectqueue/worker.rb', line 200

def submit(token, task)
  @mutex.synchronize {
    @token = token
    @task = task
    @cond.broadcast
  }
end