5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/bunny_carrot/consumer.rb', line 5
def self.subscribe(queue_name, worker, pool_size: 1, block: false)
root_supervisor = Concurrent::Supervisor.new
queue_supervisor = Concurrent::Supervisor.new(restart_strategy: :one_for_all)
business_pool_supervisor = Concurrent::Supervisor.new
root_supervisor.add_worker(queue_supervisor)
root_supervisor.add_worker(business_pool_supervisor)
business_actor, business_pool = BusinessActor.pool(pool_size)
business_pool.each do |actor|
actor.add_observer(BusinessActorObserver)
business_pool_supervisor.add_worker(actor)
end
consumer_actor = ConsumerActor.new(business_actor, pool_size)
queue_supervisor.add_worker(consumer_actor)
root_supervisor.run!
until root_supervisor.running? && queue_supervisor.running? && business_pool_supervisor.running?
sleep(0.1)
end
logger.info 'Start consuming...'
consumer_actor.post(queue_name, worker)
loop { sleep(1) } if block
end
|