Class: GorgonBunny::ConsumerWorkPool

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb

Overview

Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels or threads.

Every channel its own consumer pool.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = 1) ⇒ ConsumerWorkPool

Returns a new instance of ConsumerWorkPool.



19
20
21
22
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 19

def initialize(size = 1)
  @size  = size
  @queue = ::Queue.new
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



17
18
19
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 17

def size
  @size
end

#threadsObject (readonly)

API



16
17
18
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 16

def threads
  @threads
end

Instance Method Details

#join(timeout = nil) ⇒ Object



54
55
56
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 54

def join(timeout = nil)
  @threads.each { |t| t.join(timeout) }
end

#killObject



70
71
72
73
74
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 70

def kill
  @running = false

  @threads.each { |t| t.kill }
end

#pauseObject



58
59
60
61
62
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 58

def pause
  @running = false

  @threads.each { |t| t.stop }
end

#resumeObject



64
65
66
67
68
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 64

def resume
  @running = true

  @threads.each { |t| t.run }
end

#running?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 40

def running?
  @running
end

#shutdownObject



44
45
46
47
48
49
50
51
52
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 44

def shutdown
  @running = false

  @size.times do
    submit do |*args|
      throw :terminate
    end
  end
end

#startObject



29
30
31
32
33
34
35
36
37
38
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 29

def start
  @threads = []

  @size.times do
    t = Thread.new(&method(:run_loop))
    @threads << t
  end

  @running = true
end

#submit(callable = nil, &block) ⇒ Object



25
26
27
# File 'lib/gorgon_bunny/lib/gorgon_bunny/consumer_work_pool.rb', line 25

def submit(callable = nil, &block)
  @queue.push(callable || block)
end