Class: Blinkbox::CommonMessaging::Queue
- Inherits:
-
Object
- Object
- Blinkbox::CommonMessaging::Queue
- Extended by:
- Forwardable
- Defined in:
- lib/blinkbox/common_messaging/queue.rb
Overview
A proxy class for generating queues and binding them to exchanges using Bunny. In the format expected from blinkbox Books services.
Instance Method Summary collapse
-
#initialize(queue_name, exchange: "amq.headers", dlx: "#{exchange}.DLX", bindings: [], prefetch: 10, exclusive: false, temporary: false) ⇒ Bunny::Queue
constructor
Create a queue object for subscribing to messages with.
-
#on_exception {|exception, channel, delivery| ... } ⇒ Object
Defines a new block for handling exceptions which occur when processing an incoming message.
-
#purge! ⇒ true
Purges all messages from this queue.
-
#subscribe(block: true, accept: nil) {|metadata, payload_object| ... } ⇒ Object
Emits the metadata and objectified payload for every message which appears on the queue.
Constructor Details
#initialize(queue_name, exchange: "amq.headers", dlx: "#{exchange}.DLX", bindings: [], prefetch: 10, exclusive: false, temporary: false) ⇒ Bunny::Queue
Create a queue object for subscribing to messages with.
NB. There is no way to know what bindings have already been made for a queue, so all code subscribing to a queue should cope with receiving messages it’s not expecting.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/blinkbox/common_messaging/queue.rb', line 23 def initialize(queue_name, exchange: "amq.headers", dlx: "#{exchange}.DLX", bindings: [], prefetch: 10, exclusive: false, temporary: false) raise ArgumentError, "Prefetch must be a positive integer" unless prefetch.is_a?(Integer) && prefetch > 0 connection = CommonMessaging.connection @logger = CommonMessaging.config[:logger] # We create one channel per queue because it means that any issues are isolated # and we can start a new channel and resume efforts in a segregated manner. @channel = connection.create_channel @channel.prefetch(prefetch) args = {} args["x-dead-letter-exchange"] = dlx unless dlx.nil? @queue = @channel.queue( queue_name, durable: !temporary, auto_delete: temporary, exclusive: exclusive, arguments: args ) @exchange = @channel.headers( exchange, durable: true, auto_delete: false, passive: true ) Kernel.warn "No bindings were given, the queue is unlikely to receive any messages" if bindings.empty? bindings.each do |binding| @queue.bind(@exchange, arguments: binding) end end |
Instance Method Details
#on_exception {|exception, channel, delivery| ... } ⇒ Object
Defines a new block for handling exceptions which occur when processing an incoming message. Cases where this might occur include:
-
A message which doesn’t have a recognised content-type (ie. one which has been ‘init’ed)
-
An invalid JSON message
-
A valid JSON message which doesn’t pass schema validation
72 73 74 75 |
# File 'lib/blinkbox/common_messaging/queue.rb', line 72 def on_exception(&block) raise ArgumentError, "Please specify a block to call when an exception is raised" unless block_given? @on_exception = block end |
#purge! ⇒ true
Purges all messages from this queue. Destroys data!
135 136 137 138 |
# File 'lib/blinkbox/common_messaging/queue.rb', line 135 def purge! @queue.purge true end |
#subscribe(block: true, accept: nil) {|metadata, payload_object| ... } ⇒ Object
Emits the metadata and objectified payload for every message which appears on the queue. Any message with a content-type not ‘init’ed will be rejected (without retry) automatically.
-
Returning ‘true` or `:ack` from the block will acknowledge and remove the message from the queue
-
Returning ‘false` or `:reject` from the block will send the message to the DLQ
-
Returning ‘:retry` will put the message back on the queue to be tried again later.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/blinkbox/common_messaging/queue.rb', line 98 def subscribe(block: true, accept: nil) raise ArgumentError, "Please give a block to run when a message is received" unless block_given? @queue.subscribe( block: block, manual_ack: true ) do |delivery_info, , payload| begin if accept.nil? object = payload else klass = Blinkbox::CommonMessaging.class_from_content_type([:headers]['content-type']) if accept.include?(klass) object = klass.new(JSON.parse(payload)) else response = :reject end end response ||= yield(, object) case response when :ack, true @channel.ack(delivery_info[:delivery_tag]) when :reject, false @channel.reject(delivery_info[:delivery_tag], false) when :retry @channel.reject(delivery_info[:delivery_tag], true) else fail "Unknown response from subscribe block: #{response}" end rescue Exception => e (@on_exception || method(:default_on_exception)).call(e, @channel, delivery_info, , payload) end end end |