Class: Que::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/que/worker.rb

Constant Summary collapse

VALID_LOG_LEVELS =
[:debug, :info, :warn, :error, :fatal, :unknown].to_set.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_buffer:, result_queue:, priority: nil, start_callback: nil) ⇒ Worker



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/que/worker.rb', line 21

def initialize(
  job_buffer:,
  result_queue:,
  priority: nil,
  start_callback: nil
)

  @priority     = Que.assert([NilClass, Integer], priority)
  @job_buffer   = Que.assert(JobBuffer, job_buffer)
  @result_queue = Que.assert(ResultQueue, result_queue)

  Que.internal_log(:worker_instantiate, self) do
    {
      priority:     priority,
      job_buffer:   job_buffer.object_id,
      result_queue: result_queue.object_id,
    }
  end

  @thread =
    Thread.new do
      # An error causing this thread to exit is a bug in Que, which we want
      # to know about ASAP, so propagate the error if it happens.
      Thread.current.abort_on_exception = true
      start_callback.call(self) if start_callback.respond_to?(:call)
      work_loop
    end
end

Instance Attribute Details

#priorityObject (readonly)

Returns the value of attribute priority.



10
11
12
# File 'lib/que/worker.rb', line 10

def priority
  @priority
end

#threadObject (readonly)

Returns the value of attribute thread.



10
11
12
# File 'lib/que/worker.rb', line 10

def thread
  @thread
end

Instance Method Details

#wait_until_stoppedObject



50
51
52
# File 'lib/que/worker.rb', line 50

def wait_until_stopped
  @thread.join
end