Module: Qpid::Messaging

Defined in:
lib/qpid_messaging.rb,
lib/qpid_messaging/sender.rb,
lib/qpid_messaging/address.rb,
lib/qpid_messaging/message.rb,
lib/qpid_messaging/session.rb,
lib/qpid_messaging/duration.rb,
lib/qpid_messaging/encoding.rb,
lib/qpid_messaging/receiver.rb,
lib/qpid_messaging/connection.rb

Overview

The Qpid Messaging framework is an enterprise messaging framework based on the open-source AMQP protocol.

Example Application

Here is a simple example application. It creates a link to a broker located on a system named broker.myqpiddomain.com. It then creates a new messaging queue named “qpid-examples” and publishes a message to it. It then consumes that same message and closes the connection.

require 'rubygems'
gem 'qpid_messaging'
require 'qpid_messaging'

# create a connection, open it and then create a session named "session1"
conn = Qpid::Messaging::Connection.new :name => "broker.myqpiddomain.com"
conn.open
session = conn.create_session "session1"

# create a sender and a receiver
# the sender marks the queue as one that is deleted when trhe sender disconnects
send = session.create_sender "qpid-examples;{create:always,delete:always}"
recv = session.create_receiver "qpid-examples"

# create an outgoing message and send it
outgoing = Qpid::Messaging::Message.new :content => "The time is #{Time.new}"
sender.send outgoing

# set the receiver's capacity to 10 and then check out many messages are pending
recv.capacity = 10
puts "There are #{recv.available} messages waiting." # should report 1 message

# get the nextwaiting  message, which should be in the local queue now,
# and output the contents
incoming = recv.get Qpid::Messaging::Duration::IMMEDIATE
puts "Received the following message: #{incoming.content}"
# the output should be the text that was sent earlier

# acknowledge the message, letting the sender know the message was received
puts "The sender currently has #{send.unsettled} message(s) pending."
# should report 1 unsettled message
session.acknowledge incoming # acknowledge the received message
puts "Now sender currently has #{send.unsettled} message(s) pending."
# should report 0 unsettled messages

# close the connection
conn.close

Defined Under Namespace

Classes: Address, Connection, Duration, Message, Receiver, Sender, Session

Class Method Summary collapse

Class Method Details

.decode(message, content_type = nil) ⇒ Object

Decodes and returns the message’s content.



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/qpid_messaging/encoding.rb', line 30

def self.decode(message, content_type = nil) # :nodoc:
  content_type = message.content_type if content_type.nil?

  case content_type
    when "amqp/map"
      return Cqpid.decodeMap message.message_impl
    when "amqp/list"
      return Cqpid.decodeList message.message_impl
  end

  message.content
end

.encode(content, message, encoding = nil) ⇒ Object

Encodes the supplied content into the given message.



25
26
27
# File 'lib/qpid_messaging/encoding.rb', line 25

def self.encode content, message, encoding = nil # :nodoc:
  Cqpid::encode content, message.message_impl, encoding
end

.stringify(value) ⇒ Object

Takes as input any type and converts anything that’s a symbol into a string.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/qpid_messaging/encoding.rb', line 45

def self.stringify(value) # :nodoc:
  # set the default value
  result = value

  case value

  when Symbol
    result = value.to_s

  when Hash
    result = {}
    value.each_pair do |key, value|
      result[stringify(key)] = stringify(value)
    end

  when Array
    result = []
    value.each do |element|
      result  << stringify(element)
    end

  end

  return result

end