Class: Blinkbox::CommonMessaging::Queue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/blinkbox/common_messaging/queue.rb

Overview

A proxy class for generating queues and binding them to exchanges using Bunny. In the format expected from blinkbox Books services.

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, exchange: "amq.headers", dlx: "#{exchange}.DLX", bindings: [], prefetch: 10, exclusive: false, temporary: false) ⇒ Bunny::Queue

Create a queue object for subscribing to messages with.

NB. There is no way to know what bindings have already been made for a queue, so all code subscribing to a queue should cope with receiving messages it’s not expecting.

Parameters:

  • queue_name (String)

    The name of the queue which should be used and (if necessary) created.

  • exchange (String) (defaults to: "amq.headers")

    The name of the Exchange to bind to. The default value should be avoided for production uses.

  • dlx (String, nil) (defaults to: "#{exchange}.DLX")

    The name of the Dead Letter Exchange to send nacked messages to.

  • bindings (Array, Hash) (defaults to: [])

    An array of hashes, each on detailing the parameters for a new binding.

  • prefetch (Integer) (defaults to: 10)

    The number of messages to collect at a time when subscribing.

Raises:

  • (Bunny::NotFound)

    If the exchange does not exist.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/blinkbox/common_messaging/queue.rb', line 23

def initialize(queue_name, exchange: "amq.headers", dlx: "#{exchange}.DLX", bindings: [], prefetch: 10, exclusive: false, temporary: false)
  raise ArgumentError, "Prefetch must be a positive integer" unless prefetch.is_a?(Integer) && prefetch > 0
  connection = CommonMessaging.connection
  @logger = CommonMessaging.config[:logger]
  # We create one channel per queue because it means that any issues are isolated
  # and we can start a new channel and resume efforts in a segregated manner.
  @channel = connection.create_channel
  @channel.prefetch(prefetch)
  args = {}
  args["x-dead-letter-exchange"] = dlx unless dlx.nil?
  @queue = @channel.queue(
    queue_name,
    durable: !temporary,
    auto_delete: temporary,
    exclusive: exclusive,
    arguments: args
  )
  @exchange = @channel.headers(
    exchange,
    durable: true,
    auto_delete: false,
    passive: true
  )
  Kernel.warn "No bindings were given, the queue is unlikely to receive any messages" if bindings.empty?
  bindings.each do |binding|
    @queue.bind(@exchange, arguments: binding)
  end
end

Instance Method Details

#on_exception {|exception, channel, delivery| ... } ⇒ Object

Defines a new block for handling exceptions which occur when processing an incoming message. Cases where this might occur include:

  • A message which doesn’t have a recognised content-type (ie. one which has been ‘init’ed)

  • An invalid JSON message

  • A valid JSON message which doesn’t pass schema validation

Examples:

Sending excepted messages to a log, then nack them

log = Logger.new(STDOUT)
queue = Blinkbox::CommonMessaging::Queue.new("My.Queue")
queue.on_exception do |e, delivery_info, , payload|
  log.error e
  channel.reject(delivery_info[:delivery_tag], false)
end

Yields:

  • (exception, channel, delivery)

    info, metadata, payload] Yields for each exception which occurs.

Yield Parameters:

  • exception (Exception)

    The exception which was raised.

  • exception (Bunny::Connection)

    The channel this exchnage is using (useful for nacking).

  • delivery_info (Hash)

    The RabbitMQ delivery info for the message (useful for nacking).

  • metadata (Hash)

    The metadata delivered from the RabbitMQ server (parameters and headers).

  • payload (String)

    The message that was received

Raises:

  • (ArgumentError)


72
73
74
75
# File 'lib/blinkbox/common_messaging/queue.rb', line 72

def on_exception(&block)
  raise ArgumentError, "Please specify a block to call when an exception is raised" unless block_given?
  @on_exception = block
end

#purge!true

Purges all messages from this queue. Destroys data!

Returns:

  • (true)

    Returns true if the purge occurred correctly (or a RabbitMQ error if it couldn’t)



135
136
137
138
# File 'lib/blinkbox/common_messaging/queue.rb', line 135

def purge!
  @queue.purge
  true
end

#subscribe(block: true, accept: nil) {|metadata, payload_object| ... } ⇒ Object

Emits the metadata and objectified payload for every message which appears on the queue. Any message with a content-type not ‘init’ed will be rejected (without retry) automatically.

  • Returning ‘true` or `:ack` from the block will acknowledge and remove the message from the queue

  • Returning ‘false` or `:reject` from the block will send the message to the DLQ

  • Returning ‘:retry` will put the message back on the queue to be tried again later.

Examples:

Subscribing to messages

queue = Blinkbox::CommonMessaging::Queue.new("catch-all", exchange_name: "Marvin", [{}])
queue.subscribe(block:true) do |metadata, obj|
  puts "Messge received."
  puts "Headers: #{metadata[:headers].to_json}"
  puts "Body: #{obj.to_json}"
end

Parameters:

  • :block (Boolean)

    Should this method block while being executed (true, default) or spawn a new thread? (false)

  • :accept (Array<Blinkbox::CommonMessaging::JsonSchemaPowered>, nil)

    List of schema types to accept (any not on the list will be rejected). ‘nil` will accept all message types and not validate incoming messages.

Yields:

  • (metadata, payload_object)

    A block to execute for each message which is received on this queue.

Yield Parameters:

Yield Returns:

  • (Boolean, :ack, :reject, :retry)

Raises:

  • (ArgumentError)


98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/blinkbox/common_messaging/queue.rb', line 98

def subscribe(block: true, accept: nil)
  raise ArgumentError, "Please give a block to run when a message is received" unless block_given?
  @queue.subscribe(
    block: block,
    manual_ack: true
  ) do |delivery_info, , payload|
    begin
      if accept.nil?
        object = payload
      else
        klass = Blinkbox::CommonMessaging.class_from_content_type([:headers]['content-type'])
        if accept.include?(klass)
          object = klass.new(JSON.parse(payload)) 
        else
          response = :reject 
        end
      end
      response ||= yield(, object)
      case response
      when :ack, true
        @channel.ack(delivery_info[:delivery_tag])
      when :reject, false
        @channel.reject(delivery_info[:delivery_tag], false)
      when :retry
        @channel.reject(delivery_info[:delivery_tag], true)
      else
        fail "Unknown response from subscribe block: #{response}"
      end
    rescue Exception => e
      (@on_exception || method(:default_on_exception)).call(e, @channel, delivery_info, , payload)
    end
  end
end