Class: MultipleMan::Consumers::Transitional
- Inherits:
-
Object
- Object
- MultipleMan::Consumers::Transitional
- Defined in:
- lib/multiple_man/consumers/transitional.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#subscription ⇒ Object
readonly
Returns the value of attribute subscription.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
- #handle_error(ex, delivery_info) ⇒ Object
-
#initialize(subscription:, queue:, topic:) ⇒ Transitional
constructor
A new instance of Transitional.
- #listen ⇒ Object
- #operation(delivery_info, payload) ⇒ Object
- #process_message(delivery_info, payload) ⇒ Object
- #routing_key ⇒ Object
Constructor Details
#initialize(subscription:, queue:, topic:) ⇒ Transitional
Returns a new instance of Transitional.
9 10 11 12 13 |
# File 'lib/multiple_man/consumers/transitional.rb', line 9 def initialize(subscription:, queue:, topic:) @subscription = subscription @topic = topic @queue = queue end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
7 8 9 |
# File 'lib/multiple_man/consumers/transitional.rb', line 7 def queue @queue end |
#subscription ⇒ Object (readonly)
Returns the value of attribute subscription.
7 8 9 |
# File 'lib/multiple_man/consumers/transitional.rb', line 7 def subscription @subscription end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
7 8 9 |
# File 'lib/multiple_man/consumers/transitional.rb', line 7 def topic @topic end |
Instance Method Details
#handle_error(ex, delivery_info) ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/multiple_man/consumers/transitional.rb', line 33 def handle_error(ex, delivery_info) MultipleMan.logger.error " Error - #{ex.}\n\n#{ex.backtrace}" MultipleMan.error(ex, reraise: false) # Requeue the message queue.channel.nack(delivery_info.delivery_tag) end |
#listen ⇒ Object
15 16 17 18 19 20 |
# File 'lib/multiple_man/consumers/transitional.rb', line 15 def listen MultipleMan.logger.info "Listening for #{subscription.listen_to} with routing key #{routing_key}." queue.unbind(topic, routing_key: routing_key).subscribe(manual_ack: true) do |delivery_info, _, payload| (delivery_info, payload) end end |
#operation(delivery_info, payload) ⇒ Object
41 42 43 |
# File 'lib/multiple_man/consumers/transitional.rb', line 41 def operation(delivery_info, payload) payload['operation'] || delivery_info.routing_key.split(".").last end |
#process_message(delivery_info, payload) ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/multiple_man/consumers/transitional.rb', line 22 def (delivery_info, payload) MultipleMan.logger.debug "Processing message for #{delivery_info.routing_key}." payload = JSON.parse(payload).with_indifferent_access subscription.send(operation(delivery_info, payload), payload) MultipleMan.logger.debug " Successfully processed!" queue.channel.acknowledge(delivery_info.delivery_tag, false) rescue => ex raise MultipleMan::ConsumerError rescue handle_error($!, delivery_info) end |
#routing_key ⇒ Object
45 46 47 |
# File 'lib/multiple_man/consumers/transitional.rb', line 45 def routing_key subscription.routing_key end |