Class: Qpid::Messaging::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/qpid_messaging/session.rb

Overview

A Session represents a distinct conversation between end points. They are created from an active (i.e., not closed) Connection.

A Session is used to acknowledge individual or all messages that have passed through it

Instance Method Summary collapse

Constructor Details

#initialize(connection, session) ⇒ Session

:nodoc:



31
32
33
34
# File 'lib/qpid_messaging/session.rb', line 31

def initialize(connection, session) # :nodoc:
  @connection   = connection
  @session_impl = session
end

Instance Method Details

#acknowledge(options = {}) ⇒ Object

Acknowledges one or more outstanding messages that have been received on this session.

Arguments

  • options - the set of options

Options

  • :message - if specified, then only that Message is acknowledged

  • :sync - if true, the call will block until processed by the broker

Examples

# acknowledge all received messages
session.acknowledge

# acknowledge a single message
session.acknowledge :message => message

# acknowledge all messages, wait until the call finishes
session.acknowledge :sync => true

– TODO: Add an optional block to be used for blocking calls. ++



148
149
150
151
152
153
154
155
156
157
# File 'lib/qpid_messaging/session.rb', line 148

def acknowledge(options = {})
  sync = options[:sync] || false
  message = options[:message] if options[:message]

  unless message.nil?
    @session_impl.acknowledge message.message_impl, sync
  else
    @session_impl.acknowledge sync
  end
end

#closeObject

Closes the Session and all associated Sender and Receiver instances.

NOTE: All Session instances for a Connection are closed when the Connection is closed. But closing a Session does not affect the owning Connection.



114
# File 'lib/qpid_messaging/session.rb', line 114

def close; @session_impl.close; end

#commitObject

Commits any pending transactions for a transactional session.



117
# File 'lib/qpid_messaging/session.rb', line 117

def commit; @session_impl.commit; end

#connectionObject

Returns the Connection associated with this session.



41
42
43
# File 'lib/qpid_messaging/session.rb', line 41

def connection
  @connection
end

#create_receiver(address) ⇒ Object

Creates a new endpoint for receiving messages.

The address can either be an instance Address or else an address string.

Arguments

  • address - the end point address.



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/qpid_messaging/session.rb', line 85

def create_receiver(address)
  result        = nil
  receiver_impl = nil

  if address.class == Qpid::Messaging::Address
    address_impl = address.address_impl
    receiver_impl = @session_impl.createReceiver address_impl
  else
    receiver_impl = @session_impl.createReceiver(address)
  end

  Qpid::Messaging::Receiver.new self, receiver_impl
end

#create_sender(address) ⇒ Object

Creates a new endpoint for sending messages.

The address can either be an instance Address or else an address string.

Arguments

  • address - the end point address.



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/qpid_messaging/session.rb', line 53

def create_sender(address)
  _address = address

  if address.class == Qpid::Messaging::Address
    _address = address.address_impl
  end

  sender_impl = @session_impl.createSender(_address)
  sender_name = sender_impl.getName

  Qpid::Messaging::Sender.new(self, sender_impl)
end

#errorsObject

If the Session has been rendered invalid due to some exception, this method will result in that exception being raised.

If none have occurred, then no exceptions are raised.

Examples

# show any errors that occurred during the Session
if @session.errors?
  begin
    @session.errors
  rescue Exception => error
    puts "An error occurred: #{error}"
  end
end


257
# File 'lib/qpid_messaging/session.rb', line 257

def errors; @session_impl.checkError; end

#errors?Boolean

Returns true if there were exceptions on this session.

Returns:

  • (Boolean)


240
# File 'lib/qpid_messaging/session.rb', line 240

def errors?; @session_impl.hasError; end

#next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) ⇒ Object

Fetches the next Receiver with a message pending. Waits the specified number of milliseconds before timing out.

For a Receiver to be returned, it must have a capacity > 0 and have Messages locally queued.

If no Receiver is found within the time out period, then a MessageError is raised.

Arguments

  • timeout - the duration

Examples

loop do

  begin
    # wait a maximum of one minute for the next receiver to be ready
    recv = session.next_receiver Qpid::Messaging::Duration::MINUTE

    # get and dispatch the message
    msg = recv.get
    dispatch_message msg

  rescue
    puts "No receivers were returned"
  end

end


228
229
230
231
232
233
234
235
236
237
# File 'lib/qpid_messaging/session.rb', line 228

def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block)
  receiver_impl = @session_impl.nextReceiver(timeout.duration_impl)

  unless receiver_impl.nil?
    recv = Qpid::Messaging::Receiver.new self, receiver_impl
    block.call recv unless block.nil?
  end

  return recv
end

#receivableObject

Returns the total number of receivable messages, and messages already received, by Receiver instances associated with this Session.



191
# File 'lib/qpid_messaging/session.rb', line 191

def receivable; @session_impl.getReceivable; end

#receiver(name) ⇒ Object

Retrieves the Receiver with the specified name, or nil if no such Receiver exists.

Arguments

  • name - the name of the Receiver



105
106
107
# File 'lib/qpid_messaging/session.rb', line 105

def receiver(name)
  Qpid::Messaging::Receiver.new self, @session_impl.getReceiver(name)
end

#reject(message) ⇒ Object

Rejects the specified message. A rejected message will not be redelivered.

NOTE: A message cannot be rejected once it has been acknowledged.



163
# File 'lib/qpid_messaging/session.rb', line 163

def reject(message); @session_impl.reject message.message_impl; end

#release(message) ⇒ Object

Releases the message, which allows the broker to attempt to redeliver it.

NOTE: A message connot be released once it has been acknowled.



169
# File 'lib/qpid_messaging/session.rb', line 169

def release(message); @session_impl.release message.message_impl; end

#rollbackObject

Rolls back any uncommitted transactions on a transactional session.



120
# File 'lib/qpid_messaging/session.rb', line 120

def rollback; @session_impl.rollback; end

#sender(name) ⇒ Object

Retrieves the Sender with the specified name.

Raises an exception if no such Sender exists.

Arguments

  • name - the name of the Sender



73
74
75
# File 'lib/qpid_messaging/session.rb', line 73

def sender(name)
  Qpid::Messaging::Sender.new self, @session_impl.getSender(name)
end

#session_implObject

:nodoc:



36
37
38
# File 'lib/qpid_messaging/session.rb', line 36

def session_impl # :nodoc:
  @session_impl
end

#sync(args = {}) ⇒ Object

Requests synchronization with the broker.

Arguments

  • options - the list of options

Options

  • :block - if true, the call blocks until the broker acknowledges it

– TODO: Add an optional block to be used for blocking calls. ++



184
185
186
187
# File 'lib/qpid_messaging/session.rb', line 184

def sync(args = {})
  block = args[:block] || false
  @session_impl.sync block
end

#unsettled_acksObject

Returns the number of messages that have been acknowledged by this Session whose acknowledgements have not been confirmed as processed by the broker.



196
# File 'lib/qpid_messaging/session.rb', line 196

def unsettled_acks; @session_impl.getUnsettledAcks; end