Class: Qpid::Messaging::Receiver

Inherits:
Object
  • Object
show all
Defined in:
lib/qpid_messaging/receiver.rb

Overview

Receiver is the entity through which messages are received.

An instance of Receiver can only be created using an active (i.e., not previously closed) Session. See Qpid::Messaging::Session.create_receiver for more details.

Example

# create a connection and a session
conn     = Qpid::Messaging::Connection.new :url => "mybroker:5762"
conn.open
session  = conn.create_session

# create a receiver that listens on the "updates" topic of "alerts"
receiver = session.create_receiver "alerts/updates"

# wait for an incoming message and process it
incoming = receiver.get Qpid::Messaging::Duration::FOREVER
process(incoming)

Instance Method Summary collapse

Constructor Details

#initialize(session, receiver_impl) ⇒ Receiver

:nodoc:



46
47
48
49
# File 'lib/qpid_messaging/receiver.rb', line 46

def initialize(session, receiver_impl) # :nodoc:
  @session       = session
  @receiver_impl = receiver_impl
end

Instance Method Details

#availableObject

Returns the number of messages locally held.

The available is always 0 <= available <= capacity.

If the #capacity is set to 0 then available will always be 0.

Examples

# output the number of messages waiting while processing
loop do
  puts "There are #{recv.available} messages pending..."
  # wait forever (the default) for the next message
  msg = recv.get
  # process the message
  dispatch_message msg
end


146
# File 'lib/qpid_messaging/receiver.rb', line 146

def available; @receiver_impl.getAvailable; end

#capacityObject

Returns the capacity.



127
# File 'lib/qpid_messaging/receiver.rb', line 127

def capacity; @receiver_impl.getCapacity; end

#capacity=(capacity) ⇒ Object

Sets the capacity.

The capacity of a Receiver is the number of Messages that can be pre-fetched from the broker and held locally. If capacity is 0 then messages will never be pre-fetched and all messages must instead be retrieved using #fetch.

Options

  • capacity - the capacity

Examples

# create a receiver and give it a capacity of 50
recv = session.create_receiver "alerts/minor"
recv.capacity = 50


124
# File 'lib/qpid_messaging/receiver.rb', line 124

def capacity=(capacity); @receiver_impl.setCapacity capacity; end

#closeObject

Closes this Receiver.

This does not affect the owning Session or Connection.



155
# File 'lib/qpid_messaging/receiver.rb', line 155

def close; @receiver_impl.close; end

#closed?Boolean

Returns whether the Receiver is closed.

Returns:

  • (Boolean)


158
# File 'lib/qpid_messaging/receiver.rb', line 158

def closed?; @receiver_impl.isClosed; end

#fetch(duration = Qpid::Messaging::Duration::FOREVER) ⇒ Object

Retrieves a message from the receiver’s subscription, or waits for up to the duration specified for one to become available.

If no message is fetched within the specified time then a MessagingException is raised.

Options

  • duration - the timeout to wait (def. Duration::FOREVER)

Examples

# retrieves a message, also handles exceptions raised on no messages
begin
  # checks for a message, times out after one second
  msg = recv.fetch Qpid::Messaging::Duration::SECOND
  puts "Fetched this message: #{message.content}"
rescue
  puts "No messages available.
end


102
103
104
105
# File 'lib/qpid_messaging/receiver.rb', line 102

def fetch(duration = Qpid::Messaging::Duration::FOREVER)
  message_impl = @receiver_impl.fetch duration.duration_impl
  create_message_wrapper message_impl unless message_impl.nil?
end

#get(duration = Qpid::Messaging::Duration::FOREVER) ⇒ Object

Retrieves a message from the local queue, or waits for up to the duration specified for one to become available.

If no message is received within the specified time then a MessagingException is raised.

Options

  • duration - the timeout to wait

Examples

# retrieves a message, also handles exceptions raised on no messages
begin
  # checks for a message, returning immediately
  msg = recv.get Qpid::Messaging::Duration::IMMEDIATE
  puts "Received this message: #{message.content}"
rescue
  puts "No messages available.
end


76
77
78
79
# File 'lib/qpid_messaging/receiver.rb', line 76

def get(duration = Qpid::Messaging::Duration::FOREVER)
  message_impl = @receiver_impl.get duration.duration_impl
  create_message_wrapper message_impl unless message_impl.nil?
end

#nameObject

Returns the name of this Receiver.



161
# File 'lib/qpid_messaging/receiver.rb', line 161

def name; @receiver_impl.getName; end

#receiver_implObject

:nodoc:



51
52
53
# File 'lib/qpid_messaging/receiver.rb', line 51

def receiver_impl # :nodoc:
  @receiver_impl
end

#sessionObject

Returns the owning Session for this Receiver.



164
# File 'lib/qpid_messaging/receiver.rb', line 164

def session; @session; end

#unsettledObject

Returns the number of messages that have been received and acknowledged but whose acknowledgements have not been confirmed by the sender.



150
# File 'lib/qpid_messaging/receiver.rb', line 150

def unsettled; @receiver_impl.getUnsettled; end