Class: Deliver::QueueWorker

Inherits:
Object
  • Object
show all
Defined in:
deliver/lib/deliver/queue_worker.rb

Overview

This dispatches jobs to worker threads and make it work in parallel. It’s suitable for I/O bounds works and not for CPU bounds works. Use this when you have all the items that you’ll process in advance. Simply enqueue them to this and call ‘QueueWorker#start`.

Instance Method Summary collapse

Constructor Details

#initialize(concurrency, &block) ⇒ QueueWorker

Returns a new instance of QueueWorker.

Parameters:

  • concurrency (Numeric)
    • A number of threads to be created

  • block (Proc)
    • A task you want to execute with enqueued items



11
12
13
14
15
# File 'deliver/lib/deliver/queue_worker.rb', line 11

def initialize(concurrency, &block)
  @concurrency = concurrency
  @block = block
  @queue = Queue.new
end

Instance Method Details

#enqueue(job) ⇒ Object

Parameters:

  • job (Object)
    • An arbitary object that keeps parameters



18
19
20
# File 'deliver/lib/deliver/queue_worker.rb', line 18

def enqueue(job)
  @queue.push(job)
end

#startObject

Call this after you enqueuned all the jobs you want to process This method blocks current thread until all the enqueued jobs are processed



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'deliver/lib/deliver/queue_worker.rb', line 24

def start
  threads = []
  @concurrency.times do
    threads << Thread.new do
      while running? && !empty?
        job = @queue.pop
        @block.call(job) if job
      end
    end
  end

  wait_for_complete
  threads.each(&:join)
end