Class: Que::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/que/worker.rb

Constant Summary collapse

VALID_LOG_LEVELS =
[:debug, :info, :warn, :error, :fatal, :unknown].to_set.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_buffer:, result_queue:, priority: nil, start_callback: nil) ⇒ Worker

Returns a new instance of Worker.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/que/worker.rb', line 21

def initialize(
  job_buffer:,
  result_queue:,
  priority: nil,
  start_callback: nil
)

  @priority     = Que.assert([NilClass, Integer], priority)
  @job_buffer   = Que.assert(JobBuffer, job_buffer)
  @result_queue = Que.assert(ResultQueue, result_queue)

  Que.internal_log(:worker_instantiate, self) do
    {
      priority:     priority,
      job_buffer:   job_buffer.object_id,
      result_queue: result_queue.object_id,
    }
  end

  @thread =
    Thread.new do
      # An error causing this thread to exit is a bug in Que, which we want
      # to know about ASAP, so propagate the error if it happens.
      Thread.current.abort_on_exception = true
      start_callback.call(self) if start_callback.respond_to?(:call)
      work_loop
    end
end

Instance Attribute Details

#priorityObject (readonly)

Returns the value of attribute priority.



10
11
12
# File 'lib/que/worker.rb', line 10

def priority
  @priority
end

#threadObject (readonly)

Returns the value of attribute thread.



10
11
12
# File 'lib/que/worker.rb', line 10

def thread
  @thread
end

Instance Method Details

#wait_until_stoppedObject



50
51
52
# File 'lib/que/worker.rb', line 50

def wait_until_stopped
  @thread.join
end