Class: Emque::Consuming::Adapters::RabbitMq::Manager
- Inherits:
-
Object
- Object
- Emque::Consuming::Adapters::RabbitMq::Manager
- Includes:
- Emque::Consuming::Actor
- Defined in:
- lib/emque/consuming/adapters/rabbit_mq/manager.rb
Instance Attribute Summary collapse
- #delayed_message_workers ⇒ Object readonly
- #workers(flatten: false) ⇒ Object readonly
Instance Method Summary collapse
- #actor_died(actor, reason) ⇒ Object
- #retry_errors ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #worker(topic:, command:) ⇒ Object
Methods included from Emque::Consuming::Actor
Instance Attribute Details
#delayed_message_workers ⇒ Object
70 71 72 |
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 70 def @delayed_message_workers end |
#workers(flatten: false) ⇒ Object
66 67 68 |
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 66 def workers(flatten: false) flatten ? @workers.values.flatten : @workers end |
Instance Method Details
#actor_died(actor, reason) ⇒ Object
11 12 13 14 15 16 |
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 11 def actor_died(actor, reason) unless shutdown logger.error "RabbitMQ Manager: actor_died - #{actor.inspect} " + "died: #{reason}" end end |
#retry_errors ⇒ Object
74 75 76 |
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 74 def retry_errors ErrorWorker.new(@connection).retry_errors end |
#start ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 18 def start setup_connection initialize_error_queue initialize_workers if logger.info "RabbitMQ Manager: starting #{worker_count} workers..." workers(:flatten => true).each do |worker| worker.async.start end if .each do |worker| worker.async.start end end end |
#stop ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 34 def stop logger.info "RabbitMQ Manager: stopping #{worker_count} workers..." super do workers(:flatten => true).each do |worker| logger.info "RabbitMQ Manager: stopping #{worker.topic} worker..." worker.stop end if .each_with_index do |worker, i| logger.info "RabbitMQ Manager: stopping #{worker.class} #{i + 1} worker..." worker.stop end end end @connection.stop end |
#worker(topic:, command:) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 53 def worker(topic:, command:) if workers.has_key?(topic) case command when :down worker = workers[topic].pop worker.stop if worker when :up workers[topic] << new_worker(topic) workers[topic].last.async.start end end end |