Class: Qpid::Messaging::Receiver
- Inherits:
-
Object
- Object
- Qpid::Messaging::Receiver
- Defined in:
- lib/qpid_messaging/receiver.rb
Overview
Receiver
is the entity through which messages are received.
An instance of Receiver
can only be created using an active (i.e., not previously closed) Session. See Qpid::Messaging::Session.create_receiver for more details.
Example
# create a connection and a session
conn = Qpid::Messaging::Connection.new :url => "mybroker:5762"
conn.open
session = conn.create_session
# create a receiver that listens on the "updates" topic of "alerts"
receiver = session.create_receiver "alerts/updates"
# wait for an incoming message and process it
incoming = receiver.get Qpid::Messaging::Duration::FOREVER
process(incoming)
Instance Method Summary collapse
-
#available ⇒ Object
Returns the number of messages locally held.
-
#capacity ⇒ Object
Returns the capacity.
-
#capacity=(capacity) ⇒ Object
Sets the capacity.
-
#close ⇒ Object
Closes this
Receiver
. -
#closed? ⇒ Boolean
Returns whether the
Receiver
is closed. -
#fetch(duration = Qpid::Messaging::Duration::FOREVER) ⇒ Object
Retrieves a message from the receiver’s subscription, or waits for up to the duration specified for one to become available.
-
#get(duration = Qpid::Messaging::Duration::FOREVER) ⇒ Object
Retrieves a message from the local queue, or waits for up to the duration specified for one to become available.
-
#initialize(session, receiver_impl) ⇒ Receiver
constructor
:nodoc:.
-
#name ⇒ Object
Returns the name of this
Receiver
. -
#receiver_impl ⇒ Object
:nodoc:.
-
#session ⇒ Object
Returns the owning Session for this
Receiver
. -
#unsettled ⇒ Object
Returns the number of messages that have been received and acknowledged but whose acknowledgements have not been confirmed by the sender.
Constructor Details
#initialize(session, receiver_impl) ⇒ Receiver
:nodoc:
46 47 48 49 |
# File 'lib/qpid_messaging/receiver.rb', line 46 def initialize(session, receiver_impl) # :nodoc: @session = session @receiver_impl = receiver_impl end |
Instance Method Details
#available ⇒ Object
Returns the number of messages locally held.
The available is always 0 <= available <= capacity.
If the #capacity is set to 0 then available will always be 0.
Examples
# output the number of messages waiting while processing
loop do
puts "There are #{recv.available} messages pending..."
# wait forever (the default) for the next message
msg = recv.get
# process the message
msg
end
146 |
# File 'lib/qpid_messaging/receiver.rb', line 146 def available; @receiver_impl.getAvailable; end |
#capacity ⇒ Object
Returns the capacity.
127 |
# File 'lib/qpid_messaging/receiver.rb', line 127 def capacity; @receiver_impl.getCapacity; end |
#capacity=(capacity) ⇒ Object
Sets the capacity.
The capacity of a Receiver
is the number of Messages that can be pre-fetched from the broker and held locally. If capacity is 0 then messages will never be pre-fetched and all messages must instead be retrieved using #fetch.
Options
-
capacity - the capacity
Examples
# create a receiver and give it a capacity of 50
recv = session.create_receiver "alerts/minor"
recv.capacity = 50
124 |
# File 'lib/qpid_messaging/receiver.rb', line 124 def capacity=(capacity); @receiver_impl.setCapacity capacity; end |
#close ⇒ Object
Closes this Receiver
.
This does not affect the owning Session or Connection.
155 |
# File 'lib/qpid_messaging/receiver.rb', line 155 def close; @receiver_impl.close; end |
#closed? ⇒ Boolean
Returns whether the Receiver
is closed.
158 |
# File 'lib/qpid_messaging/receiver.rb', line 158 def closed?; @receiver_impl.isClosed; end |
#fetch(duration = Qpid::Messaging::Duration::FOREVER) ⇒ Object
Retrieves a message from the receiver’s subscription, or waits for up to the duration specified for one to become available.
If no message is fetched within the specified time then a MessagingException is raised.
Options
-
duration - the timeout to wait (def. Duration::FOREVER)
Examples
# retrieves a message, also handles exceptions raised on no messages
begin
# checks for a message, times out after one second
msg = recv.fetch Qpid::Messaging::Duration::SECOND
puts "Fetched this message: #{message.content}"
rescue
puts "No messages available.
end
102 103 104 105 |
# File 'lib/qpid_messaging/receiver.rb', line 102 def fetch(duration = Qpid::Messaging::Duration::FOREVER) = @receiver_impl.fetch duration.duration_impl unless .nil? end |
#get(duration = Qpid::Messaging::Duration::FOREVER) ⇒ Object
Retrieves a message from the local queue, or waits for up to the duration specified for one to become available.
If no message is received within the specified time then a MessagingException is raised.
Options
-
duration - the timeout to wait
Examples
# retrieves a message, also handles exceptions raised on no messages
begin
# checks for a message, returning immediately
msg = recv.get Qpid::Messaging::Duration::IMMEDIATE
puts "Received this message: #{message.content}"
rescue
puts "No messages available.
end
76 77 78 79 |
# File 'lib/qpid_messaging/receiver.rb', line 76 def get(duration = Qpid::Messaging::Duration::FOREVER) = @receiver_impl.get duration.duration_impl unless .nil? end |
#name ⇒ Object
Returns the name of this Receiver
.
161 |
# File 'lib/qpid_messaging/receiver.rb', line 161 def name; @receiver_impl.getName; end |
#receiver_impl ⇒ Object
:nodoc:
51 52 53 |
# File 'lib/qpid_messaging/receiver.rb', line 51 def receiver_impl # :nodoc: @receiver_impl end |
#session ⇒ Object
Returns the owning Session for this Receiver
.
164 |
# File 'lib/qpid_messaging/receiver.rb', line 164 def session; @session; end |
#unsettled ⇒ Object
Returns the number of messages that have been received and acknowledged but whose acknowledgements have not been confirmed by the sender.
150 |
# File 'lib/qpid_messaging/receiver.rb', line 150 def unsettled; @receiver_impl.getUnsettled; end |