Class: Emque::Consuming::Adapters::RabbitMq::Manager

Inherits:
Object
  • Object
show all
Includes:
Emque::Consuming::Actor
Defined in:
lib/emque/consuming/adapters/rabbit_mq/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emque::Consuming::Actor

included

Instance Attribute Details

#delayed_message_workersObject



70
71
72
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 70

def delayed_message_workers
  @delayed_message_workers
end

#workers(flatten: false) ⇒ Object



66
67
68
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 66

def workers(flatten: false)
  flatten ? @workers.values.flatten : @workers
end

Instance Method Details

#actor_died(actor, reason) ⇒ Object



11
12
13
14
15
16
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 11

def actor_died(actor, reason)
  unless shutdown
    logger.error "RabbitMQ Manager: actor_died - #{actor.inspect} " +
                 "died: #{reason}"
  end
end

#retry_errorsObject



74
75
76
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 74

def retry_errors
  ErrorWorker.new(@connection).retry_errors
end

#startObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 18

def start
  setup_connection
  initialize_error_queue
  initialize_workers
  initialize_delayed_message_workers if enable_delayed_message
  logger.info "RabbitMQ Manager: starting #{worker_count} workers..."
  workers(:flatten => true).each do |worker|
    worker.async.start
  end
  if enable_delayed_message
    delayed_message_workers.each do |worker|
      worker.async.start
    end
  end
end

#stopObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 34

def stop
  logger.info "RabbitMQ Manager: stopping #{worker_count} workers..."

  super do
    workers(:flatten => true).each do |worker|
      logger.info "RabbitMQ Manager: stopping #{worker.topic} worker..."
      worker.stop
    end
    if enable_delayed_message
      delayed_message_workers.each_with_index do |worker, i|
        logger.info "RabbitMQ Manager: stopping #{worker.class} #{i + 1} worker..."
        worker.stop
      end
    end
  end

  @connection.stop
end

#worker(topic:, command:) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/emque/consuming/adapters/rabbit_mq/manager.rb', line 53

def worker(topic:, command:)
  if workers.has_key?(topic)
    case command
    when :down
      worker = workers[topic].pop
      worker.stop if worker
    when :up
      workers[topic] << new_worker(topic)
      workers[topic].last.async.start
    end
  end
end