Class: BunnyCarrot::Consumer

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/bunny_carrot/consumer.rb

Class Method Summary collapse

Methods included from Logger

included, #logger

Class Method Details

.subscribe(queue_name, worker, pool_size: 1, block: false) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/bunny_carrot/consumer.rb', line 5

def self.subscribe(queue_name, worker, pool_size: 1, block: false)
  root_supervisor          = Concurrent::Supervisor.new
  queue_supervisor         = Concurrent::Supervisor.new(restart_strategy: :one_for_all)
  business_pool_supervisor = Concurrent::Supervisor.new
  root_supervisor.add_worker(queue_supervisor)
  root_supervisor.add_worker(business_pool_supervisor)

  business_actor, business_pool = BusinessActor.pool(pool_size)
  business_pool.each do |actor|
    actor.add_observer(BusinessActorObserver)
    business_pool_supervisor.add_worker(actor)
  end

  consumer_actor = ConsumerActor.new(business_actor, pool_size)
  queue_supervisor.add_worker(consumer_actor)

  root_supervisor.run!
  until root_supervisor.running? && queue_supervisor.running? && business_pool_supervisor.running?
    sleep(0.1)
  end

  logger.info 'Start consuming...'

  consumer_actor.post(queue_name, worker)
  loop { sleep(1) } if block
end