Class: Deliver::QueueWorker

Inherits:
Object
  • Object
show all
Defined in:
deliver/lib/deliver/queue_worker.rb

Overview

This dispatches jobs to worker threads and make it work in parallel. It’s suitable for I/O bounds works and not for CPU bounds works. Use this when you have all the items that you’ll process in advance. Simply enqueue them to this and call ‘QueueWorker#start`.

Constant Summary collapse

NUMBER_OF_THREADS =
Helper.test? ? 1 : [ENV.fetch("DELIVER_NUMBER_OF_THREADS", 10).to_i, 10].min

Instance Method Summary collapse

Constructor Details

#initialize(concurrency = NUMBER_OF_THREADS, &block) ⇒ QueueWorker

Returns a new instance of QueueWorker.

Parameters:

  • concurrency (Numeric) (defaults to: NUMBER_OF_THREADS)
    • A number of threads to be created

  • block (Proc)
    • A task you want to execute with enqueued items



13
14
15
16
17
# File 'deliver/lib/deliver/queue_worker.rb', line 13

def initialize(concurrency = NUMBER_OF_THREADS, &block)
  @concurrency = concurrency
  @block = block
  @queue = Queue.new
end

Instance Method Details

#batch_enqueue(jobs) ⇒ Object

Parameters:

  • jobs (Array<Object>)
    • An array of arbitary object that keeps parameters

Raises:

  • (ArgumentError)


25
26
27
28
# File 'deliver/lib/deliver/queue_worker.rb', line 25

def batch_enqueue(jobs)
  raise(ArgumentError, "Enqueue Array instead of #{jobs.class}") unless jobs.kind_of?(Array)
  jobs.each { |job| enqueue(job) }
end

#enqueue(job) ⇒ Object

Parameters:

  • job (Object)
    • An arbitary object that keeps parameters



20
21
22
# File 'deliver/lib/deliver/queue_worker.rb', line 20

def enqueue(job)
  @queue.push(job)
end

#startObject

Call this after you enqueuned all the jobs you want to process This method blocks current thread until all the enqueued jobs are processed



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'deliver/lib/deliver/queue_worker.rb', line 32

def start
  @queue.close

  threads = []
  @concurrency.times do
    threads << Thread.new do
      job = @queue.pop
      while job
        @block.call(job)
        job = @queue.pop
      end
    end
  end

  threads.each(&:join)
end