Class: MultipleMan::Consumers::Transitional

Inherits:
Object
  • Object
show all
Defined in:
lib/multiple_man/consumers/transitional.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription:, queue:, topic:) ⇒ Transitional

Returns a new instance of Transitional.



9
10
11
12
13
# File 'lib/multiple_man/consumers/transitional.rb', line 9

def initialize(subscription:, queue:, topic:)
  @subscription = subscription
  @topic = topic
  @queue = queue
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/multiple_man/consumers/transitional.rb', line 7

def queue
  @queue
end

#subscriptionObject (readonly)

Returns the value of attribute subscription.



7
8
9
# File 'lib/multiple_man/consumers/transitional.rb', line 7

def subscription
  @subscription
end

#topicObject (readonly)

Returns the value of attribute topic.



7
8
9
# File 'lib/multiple_man/consumers/transitional.rb', line 7

def topic
  @topic
end

Instance Method Details

#handle_error(ex, delivery_info) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/multiple_man/consumers/transitional.rb', line 33

def handle_error(ex, delivery_info)
  MultipleMan.logger.error "   Error - #{ex.message}\n\n#{ex.backtrace}"
  MultipleMan.error(ex, reraise: false)

  # Requeue the message
  queue.channel.nack(delivery_info.delivery_tag)
end

#listenObject



15
16
17
18
19
20
# File 'lib/multiple_man/consumers/transitional.rb', line 15

def listen
  MultipleMan.logger.info "Listening for #{subscription.listen_to} with routing key #{routing_key}."
  queue.unbind(topic, routing_key: routing_key).subscribe(manual_ack: true) do |delivery_info, _, payload|
    process_message(delivery_info, payload)
  end
end

#operation(delivery_info, payload) ⇒ Object



41
42
43
# File 'lib/multiple_man/consumers/transitional.rb', line 41

def operation(delivery_info, payload)
  payload['operation'] || delivery_info.routing_key.split(".").last
end

#process_message(delivery_info, payload) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/multiple_man/consumers/transitional.rb', line 22

def process_message(delivery_info, payload)
  MultipleMan.logger.debug "Processing message for #{delivery_info.routing_key}."

  payload = JSON.parse(payload).with_indifferent_access
  subscription.send(operation(delivery_info, payload), payload)
  MultipleMan.logger.debug "   Successfully processed!"
  queue.channel.acknowledge(delivery_info.delivery_tag, false)
rescue => ex
  raise MultipleMan::ConsumerError rescue handle_error($!, delivery_info)
end

#routing_keyObject



45
46
47
# File 'lib/multiple_man/consumers/transitional.rb', line 45

def routing_key
  subscription.routing_key
end