Class: Qpid::Messaging::Session
- Inherits:
-
Object
- Object
- Qpid::Messaging::Session
- Defined in:
- lib/qpid_messaging/session.rb
Overview
A Session
represents a distinct conversation between end points. They are created from an active (i.e., not closed) Connection.
A Session
is used to acknowledge individual or all messages that have passed through it
Instance Method Summary collapse
-
#acknowledge(options = {}) ⇒ Object
Acknowledges one or more outstanding messages that have been received on this session.
-
#close ⇒ Object
Closes the
Session
and all associatedSender
andReceiver
instances. -
#commit ⇒ Object
Commits any pending transactions for a transactional session.
-
#connection ⇒ Object
Returns the Connection associated with this session.
-
#create_receiver(address) ⇒ Object
Creates a new endpoint for receiving messages.
-
#create_sender(address) ⇒ Object
Creates a new endpoint for sending messages.
-
#errors ⇒ Object
If the
Session
has been rendered invalid due to some exception, this method will result in that exception being raised. -
#errors? ⇒ Boolean
Returns true if there were exceptions on this session.
-
#initialize(connection, session) ⇒ Session
constructor
:nodoc:.
-
#next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) ⇒ Object
Fetches the next Receiver with a message pending.
-
#receivable ⇒ Object
Returns the total number of receivable messages, and messages already received, by Receiver instances associated with this
Session
. -
#receiver(name) ⇒ Object
Retrieves the
Receiver
with the specified name, or nil if no such Receiver exists. -
#reject(message) ⇒ Object
Rejects the specified message.
-
#release(message) ⇒ Object
Releases the message, which allows the broker to attempt to redeliver it.
-
#rollback ⇒ Object
Rolls back any uncommitted transactions on a transactional session.
-
#sender(name) ⇒ Object
Retrieves the Sender with the specified name.
-
#session_impl ⇒ Object
:nodoc:.
-
#sync(args = {}) ⇒ Object
Requests synchronization with the broker.
-
#unsettled_acks ⇒ Object
Returns the number of messages that have been acknowledged by this
Session
whose acknowledgements have not been confirmed as processed by the broker.
Constructor Details
#initialize(connection, session) ⇒ Session
:nodoc:
31 32 33 34 |
# File 'lib/qpid_messaging/session.rb', line 31 def initialize(connection, session) # :nodoc: @connection = connection @session_impl = session end |
Instance Method Details
#acknowledge(options = {}) ⇒ Object
Acknowledges one or more outstanding messages that have been received on this session.
Arguments
-
options
- the set of options
Options
-
:message - if specified, then only that Message is acknowledged
-
:sync - if true, the call will block until processed by the broker
Examples
# acknowledge all received messages
session.acknowledge
# acknowledge a single message
session.acknowledge :message =>
# acknowledge all messages, wait until the call finishes
session.acknowledge :sync => true
– TODO: Add an optional block to be used for blocking calls. ++
148 149 150 151 152 153 154 155 156 157 |
# File 'lib/qpid_messaging/session.rb', line 148 def acknowledge( = {}) sync = [:sync] || false = [:message] if [:message] unless .nil? @session_impl.acknowledge ., sync else @session_impl.acknowledge sync end end |
#close ⇒ Object
Closes the Session
and all associated Sender
and Receiver
instances.
NOTE: All Session
instances for a Connection are closed when the Connection is closed. But closing a Session
does not affect the owning Connection.
114 |
# File 'lib/qpid_messaging/session.rb', line 114 def close; @session_impl.close; end |
#commit ⇒ Object
Commits any pending transactions for a transactional session.
117 |
# File 'lib/qpid_messaging/session.rb', line 117 def commit; @session_impl.commit; end |
#connection ⇒ Object
Returns the Connection associated with this session.
41 42 43 |
# File 'lib/qpid_messaging/session.rb', line 41 def connection @connection end |
#create_receiver(address) ⇒ Object
Creates a new endpoint for receiving messages.
The address
can either be an instance Address or else an address string.
Arguments
-
address
- the end point address.
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/qpid_messaging/session.rb', line 85 def create_receiver(address) result = nil receiver_impl = nil if address.class == Qpid::Messaging::Address address_impl = address.address_impl receiver_impl = @session_impl.createReceiver address_impl else receiver_impl = @session_impl.createReceiver(address) end Qpid::Messaging::Receiver.new self, receiver_impl end |
#create_sender(address) ⇒ Object
Creates a new endpoint for sending messages.
The address can either be an instance Address or else an address string.
Arguments
-
address
- the end point address.
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/qpid_messaging/session.rb', line 53 def create_sender(address) _address = address if address.class == Qpid::Messaging::Address _address = address.address_impl end sender_impl = @session_impl.createSender(_address) sender_name = sender_impl.getName Qpid::Messaging::Sender.new(self, sender_impl) end |
#errors ⇒ Object
If the Session
has been rendered invalid due to some exception, this method will result in that exception being raised.
If none have occurred, then no exceptions are raised.
Examples
# show any errors that occurred during the Session
if @session.errors?
begin
@session.errors
rescue Exception => error
puts "An error occurred: #{error}"
end
end
257 |
# File 'lib/qpid_messaging/session.rb', line 257 def errors; @session_impl.checkError; end |
#errors? ⇒ Boolean
Returns true if there were exceptions on this session.
240 |
# File 'lib/qpid_messaging/session.rb', line 240 def errors?; @session_impl.hasError; end |
#next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) ⇒ Object
Fetches the next Receiver with a message pending. Waits the specified number of milliseconds before timing out.
For a Receiver to be returned, it must have a capacity > 0 and have Messages locally queued.
If no Receiver is found within the time out period, then a MessageError is raised.
Arguments
-
timeout
- the duration
Examples
loop do
begin
# wait a maximum of one minute for the next receiver to be ready
recv = session.next_receiver Qpid::Messaging::Duration::MINUTE
# get and dispatch the message
msg = recv.get
msg
rescue
puts "No receivers were returned"
end
end
228 229 230 231 232 233 234 235 236 237 |
# File 'lib/qpid_messaging/session.rb', line 228 def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) receiver_impl = @session_impl.nextReceiver(timeout.duration_impl) unless receiver_impl.nil? recv = Qpid::Messaging::Receiver.new self, receiver_impl block.call recv unless block.nil? end return recv end |
#receivable ⇒ Object
Returns the total number of receivable messages, and messages already received, by Receiver instances associated with this Session
.
191 |
# File 'lib/qpid_messaging/session.rb', line 191 def receivable; @session_impl.getReceivable; end |
#receiver(name) ⇒ Object
Retrieves the Receiver
with the specified name, or nil if no such Receiver exists.
Arguments
-
name
- the name of the Receiver
105 106 107 |
# File 'lib/qpid_messaging/session.rb', line 105 def receiver(name) Qpid::Messaging::Receiver.new self, @session_impl.getReceiver(name) end |
#reject(message) ⇒ Object
Rejects the specified message. A rejected message will not be redelivered.
NOTE: A message cannot be rejected once it has been acknowledged.
163 |
# File 'lib/qpid_messaging/session.rb', line 163 def reject(); @session_impl.reject .; end |
#release(message) ⇒ Object
Releases the message, which allows the broker to attempt to redeliver it.
NOTE: A message connot be released once it has been acknowled.
169 |
# File 'lib/qpid_messaging/session.rb', line 169 def release(); @session_impl.release .; end |
#rollback ⇒ Object
Rolls back any uncommitted transactions on a transactional session.
120 |
# File 'lib/qpid_messaging/session.rb', line 120 def rollback; @session_impl.rollback; end |
#sender(name) ⇒ Object
Retrieves the Sender with the specified name.
Raises an exception if no such Sender exists.
Arguments
-
name
- the name of the Sender
73 74 75 |
# File 'lib/qpid_messaging/session.rb', line 73 def sender(name) Qpid::Messaging::Sender.new self, @session_impl.getSender(name) end |
#session_impl ⇒ Object
:nodoc:
36 37 38 |
# File 'lib/qpid_messaging/session.rb', line 36 def session_impl # :nodoc: @session_impl end |
#sync(args = {}) ⇒ Object
Requests synchronization with the broker.
Arguments
-
options
- the list of options
Options
-
:block
- if true, the call blocks until the broker acknowledges it
– TODO: Add an optional block to be used for blocking calls. ++
184 185 186 187 |
# File 'lib/qpid_messaging/session.rb', line 184 def sync(args = {}) block = args[:block] || false @session_impl.sync block end |
#unsettled_acks ⇒ Object
Returns the number of messages that have been acknowledged by this Session
whose acknowledgements have not been confirmed as processed by the broker.
196 |
# File 'lib/qpid_messaging/session.rb', line 196 def unsettled_acks; @session_impl.getUnsettledAcks; end |