Class: Bunny::ConsumerWorkPool

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/consumer_work_pool.rb

Overview

Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels or threads.

Every channel its own consumer pool.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) ⇒ ConsumerWorkPool

Returns a new instance of ConsumerWorkPool.



22
23
24
25
26
27
28
29
30
31
# File 'lib/bunny/consumer_work_pool.rb', line 22

def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60)
  @size  = size
  @abort_on_exception = abort_on_exception
  @shutdown_timeout = shutdown_timeout
  @shutdown_mutex = ::Mutex.new
  @shutdown_conditional = ::ConditionVariable.new
  @queue = ::Queue.new
  @paused = false
  @running = false
end

Instance Attribute Details

#abort_on_exceptionObject (readonly)

Returns the value of attribute abort_on_exception.



20
21
22
# File 'lib/bunny/consumer_work_pool.rb', line 20

def abort_on_exception
  @abort_on_exception
end

#sizeObject (readonly)

Returns the value of attribute size.



19
20
21
# File 'lib/bunny/consumer_work_pool.rb', line 19

def size
  @size
end

#threadsObject (readonly)

API



18
19
20
# File 'lib/bunny/consumer_work_pool.rb', line 18

def threads
  @threads
end

Instance Method Details

#backlogObject



54
55
56
# File 'lib/bunny/consumer_work_pool.rb', line 54

def backlog
  @queue.length
end

#busy?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/bunny/consumer_work_pool.rb', line 58

def busy?
  !@queue.empty?
end

#join(timeout = nil) ⇒ Object



79
80
81
# File 'lib/bunny/consumer_work_pool.rb', line 79

def join(timeout = nil)
  (@threads || []).each { |t| t.join(timeout) }
end

#killObject



95
96
97
98
99
# File 'lib/bunny/consumer_work_pool.rb', line 95

def kill
  @running = false

  (@threads || []).each { |t| t.kill }
end

#pauseObject



83
84
85
86
# File 'lib/bunny/consumer_work_pool.rb', line 83

def pause
  @running = false
  @paused = true
end

#resumeObject



88
89
90
91
92
93
# File 'lib/bunny/consumer_work_pool.rb', line 88

def resume
  @running = true
  @paused = false

  @threads.each { |t| t.run }
end

#running?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/bunny/consumer_work_pool.rb', line 50

def running?
  @running
end

#shutdown(wait_for_workers = false) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/bunny/consumer_work_pool.rb', line 62

def shutdown(wait_for_workers = false)
  was_running = running?
  @running = false

  @size.times do
    submit do |*args|
      throw :terminate
    end
  end

  return if !(wait_for_workers && @shutdown_timeout && was_running)

  @shutdown_mutex.synchronize do
    @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout) if busy?
  end
end

#startObject



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/bunny/consumer_work_pool.rb', line 38

def start
  @threads = []

  @size.times do
    t = Thread.new(&method(:run_loop))
    t.abort_on_exception = true if abort_on_exception
    @threads << t
  end

  @running = true
end

#submit(callable = nil, &block) ⇒ Object



34
35
36
# File 'lib/bunny/consumer_work_pool.rb', line 34

def submit(callable = nil, &block)
  @queue.push(callable || block)
end