Module: PipelineToolkit::Amqp::Reader

Includes:
Abstract
Included in:
MessageSubscriber
Defined in:
lib/pipeline_toolkit/amqp/reader.rb

Overview

The Reader provides functionality specific to asynchronous message delivery by subscribing to the AMQP server.

Instance Attribute Summary

Attributes included from Abstract

#options

Instance Method Summary collapse

Methods included from Abstract

#bind_queue, #initialize, #initialize_channel, #initialize_connection, #initialize_exchange, #initialize_queue, #stop_connection

Instance Method Details

#initialize_readerObject

Initialize the AMQP server specific to handling of messages from the server.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 17

def initialize_reader
  DefaultLogger.debug("Amqp::Reader#initialize_reader")
  @ack_headers ||= {}
  initialize_connection
  initialize_channel
  # NB: Used to be 1 to prevent deadlocks. Think deadlocks won't happen anymore (because doing asyn
  # reads now), but keep an eye on this parameter.
  @channel.prefetch(25) 
  initialize_exchange
  initialize_queue
  bind_queue()
end

#perform_acknowledgement(ack_id) ⇒ Object

Perform the acknowledgement on the stored header to tell AMQP server we are done with the message.

Parameters:

  • ack_id (String)

    The acknowledgement identifier



61
62
63
64
65
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 61

def perform_acknowledgement(ack_id)
  DefaultLogger.debug("Amqp::Reader#perform_acknowledgement(ack_id)")
  header = @ack_headers.delete(ack_id)
  header.ack
end

#queue_subscribeObject

Subscribe the queue to receive messages



33
34
35
36
37
38
39
40
41
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 33

def queue_subscribe
  DefaultLogger.debug("Amqp::Reader#queue_subscribe")
  unless AMQP.closing?
    @queue.subscribe(:ack => options[:ack]) do |header, body|
      DefaultLogger.debug("Rabbit Message: #{body}")
      process_queue_message(header, body)
    end
  end
end

#store_acknowledgement(message, header) ⇒ Object

Store the acknowledgement header

Parameters:

  • message (Hash)

    The message

  • header (MQ::Header)

    The message queue header for the message



49
50
51
52
53
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 49

def store_acknowledgement(message, header)
  DefaultLogger.debug("Amqp::Reader#store_acknowledgement(message, header)")
  message[:ack_id] = header.delivery_tag.to_s
  @ack_headers[message[:ack_id]] = header
end