Class: Qpid::Proton::Messenger

Inherits:
Object
  • Object
show all
Includes:
ExceptionHandling
Defined in:
lib/qpid_proton/messenger.rb

Overview

A Messenger provides a high-level means for sending and receiving AMQP messages.

Examples

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ExceptionHandling

#check_for_error

Constructor Details

#initialize(name = nil) ⇒ Messenger

Creates a new Messenger.

The name parameter is optional. If one is not provided then a unique name is generated.

Options

  • name - the name (def. nil)



42
43
44
45
# File 'lib/qpid_proton/messenger.rb', line 42

def initialize(name = nil)
  @impl = Cproton.pn_messenger(name)
  ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
end

Class Method Details

.finalize!(impl) ⇒ Object

:nodoc:



47
48
49
50
51
# File 'lib/qpid_proton/messenger.rb', line 47

def self.finalize!(impl) # :nodoc:
  proc {
    Cproton.pn_messenger_free(impl)
  }
end

Instance Method Details

#accept(tracker = nil) ⇒ Object

Accepts the incoming message identified by the tracker.

Options

  • tracker - the tracker

  • flag - the flag

Raises:

  • (TypeError)


296
297
298
299
300
301
302
303
304
305
# File 'lib/qpid_proton/messenger.rb', line 296

def accept(tracker = nil)
  raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
  if tracker.nil? then
    tracker = self.incoming_tracker
    flag = Cproton::PN_CUMULATIVE
  else
    flag = 0
  end
  check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag))
end

#blockingObject



78
79
80
# File 'lib/qpid_proton/messenger.rb', line 78

def blocking
  Cproton.pn_mesenger_is_blocking(@impl)
end

#blocking=(blocking) ⇒ Object



82
83
84
# File 'lib/qpid_proton/messenger.rb', line 82

def blocking=(blocking)
  Cproton.pn_messenger_set_blocking(@impl, blocking)
end

#certificateObject

Returns the path to a certificate file.



146
147
148
# File 'lib/qpid_proton/messenger.rb', line 146

def certificate
  Cproton.pn_messenger_get_certificate(@impl)
end

#certificate=(certificate) ⇒ Object

Path to a certificate file for the Messenger.

This certificate is used when the Messenger accepts or establishes SSL/TLS connections.

Options

  • certificate - the certificate



140
141
142
# File 'lib/qpid_proton/messenger.rb', line 140

def certificate=(certificate)
  Cproton.pn_messenger_set_certificate(@impl, certificate)
end

#errnoObject

Returns the most recent error number.



94
95
96
# File 'lib/qpid_proton/messenger.rb', line 94

def errno
  Cproton.pn_messenger_errno(@impl)
end

#errorObject

Returns the most recent error message.



100
101
102
# File 'lib/qpid_proton/messenger.rb', line 100

def error
  Cproton.pn_error_text(Cproton.pn_messenger_error(@impl))
end

#error?Boolean

Reports whether an error occurred.

Returns:

  • (Boolean)


88
89
90
# File 'lib/qpid_proton/messenger.rb', line 88

def error?
  !Cproton.pn_messenger_errno(@impl).zero?
end

#get(msg = nil) ⇒ Object

Gets a single message incoming message from the local queue.

If no message is provided in the argument, then one is created. In either case, the one returned will be the fetched message.

Options

  • msg - the (optional) Message instance to be used



221
222
223
224
225
226
227
228
229
230
231
# File 'lib/qpid_proton/messenger.rb', line 221

def get(msg = nil)
  msg_impl = nil
  if msg.nil? then
    msg_impl = nil
  else
    msg_impl = msg.impl
  end
  check_for_error(Cproton.pn_messenger_get(@impl, msg_impl))
  msg.post_decode unless msg.nil?
  return incoming_tracker
end

#incomingObject

Returns the number of messages in the incoming queue that have not been retrieved.



268
269
270
# File 'lib/qpid_proton/messenger.rb', line 268

def incoming
  Cproton.pn_messenger_incoming(@impl)
end

#incoming_trackerObject

Returns a Tracker for the most recently received message.



283
284
285
286
287
# File 'lib/qpid_proton/messenger.rb', line 283

def incoming_tracker
  impl = Cproton.pn_messenger_incoming_tracker(@impl)
  return nil if impl == -1
  Qpid::Proton::Tracker.new(impl)
end

#incoming_windowObject

Returns the incoming window.



370
371
372
# File 'lib/qpid_proton/messenger.rb', line 370

def incoming_window
  Cproton.pn_messenger_get_incoming_window(@impl)
end

#incoming_window=(window) ⇒ Object

Sets the incoming window.

If the incoming window is set to a positive value, then after each call to #accept or #reject, the object will track the status of that many deliveries.

Options

  • window - the window size

Raises:

  • (TypeError)


363
364
365
366
# File 'lib/qpid_proton/messenger.rb', line 363

def incoming_window=(window)
  raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
  check_for_error(Cproton.pn_messenger_set_incoming_window(@impl, window))
end

#nameObject

Returns the name.



55
56
57
# File 'lib/qpid_proton/messenger.rb', line 55

def name
  Cproton.pn_messenger_name(@impl)
end

#outgoingObject

Returns the number messages in the outgoing queue that have not been transmitted.



261
262
263
# File 'lib/qpid_proton/messenger.rb', line 261

def outgoing
  Cproton.pn_messenger_outgoing(@impl)
end

#outgoing_trackerObject

Returns a Tracker for the message most recently sent via the put method.



275
276
277
278
279
# File 'lib/qpid_proton/messenger.rb', line 275

def outgoing_tracker
  impl = Cproton.pn_messenger_outgoing_tracker(@impl)
  return nil if impl == -1
  Qpid::Proton::Tracker.new(impl)
end

#outgoing_windowObject

Returns the outgoing window.



390
391
392
# File 'lib/qpid_proton/messenger.rb', line 390

def outgoing_window
  Cproton.pn_messenger_get_outgoing_window(@impl)
end

#outgoing_window=(window) ⇒ Object

Sets the outgoing window.

If the outgoing window is set to a positive value, then after each call to #send, the object will track the status of that many deliveries.

Options

  • window - the window size

Raises:

  • (TypeError)


383
384
385
386
# File 'lib/qpid_proton/messenger.rb', line 383

def outgoing_window=(window)
  raise TypeError.new("invalid window: #{window}") unless valid_window?(window)
  check_for_error(Cproton.pn_messenger_set_outgoing_window(@impl, window))
end

#private_keyObject

Returns the path to a private key file.



166
167
168
# File 'lib/qpid_proton/messenger.rb', line 166

def private_key
  Cproton.pn_messenger_get_private_key(@impl)
end

#private_key=(key) ⇒ Object

Path to a private key file for the Messenger.

The property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connections.

Options

  • key - the key file



160
161
162
# File 'lib/qpid_proton/messenger.rb', line 160

def private_key=(key)
  Cproton.pn_messenger_set_private_key(@impl, key)
end

#put(message) ⇒ Object

Puts a single message into the outgoing queue.

To ensure messages are sent, you should then call ::send.

Options

  • message - the message

Raises:

  • (TypeError)


196
197
198
199
200
201
202
203
# File 'lib/qpid_proton/messenger.rb', line 196

def put(message)
  raise TypeError.new("invalid message: #{message}") if message.nil?
  raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message)
  # encode the message first
  message.pre_encode
  check_for_error(Cproton.pn_messenger_put(@impl, message.impl))
  return outgoing_tracker
end

#receive(limit = -1)) ⇒ Object

Receives up to the specified number of messages, blocking until at least one message is received.

Options ====

  • limit - the maximum number of messages to receive



240
241
242
# File 'lib/qpid_proton/messenger.rb', line 240

def receive(limit = -1)
  check_for_error(Cproton.pn_messenger_recv(@impl, limit))
end

#receivingObject



244
245
246
# File 'lib/qpid_proton/messenger.rb', line 244

def receiving
  Cproton.pn_messenger_receiving(@impl)
end

#reject(tracker) ⇒ Object

Rejects the incoming message identified by the tracker.

Options

  • tracker - the tracker

  • flag - the flag

Raises:

  • (TypeError)


314
315
316
317
318
319
320
321
322
323
# File 'lib/qpid_proton/messenger.rb', line 314

def reject(tracker)
  raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker)
  if tracker.nil? then
    tracker = self.incoming_tracker
    flag = Cproton::PN_CUMULATIVE
  else
    flag = 0
  end
  check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag))
end

#send(n = -1)) ⇒ Object

Sends all outgoing messages, blocking until the outgoing queue is empty.



208
209
210
# File 'lib/qpid_proton/messenger.rb', line 208

def send(n = -1)
  check_for_error(Cproton.pn_messenger_send(@impl, n))
end

#settle(tracker, flag) ⇒ Object

Settles messages for a tracker.

Options

  • tracker - the tracker

  • flag - the flag

Examples

Raises:

  • (TypeError)


347
348
349
350
351
# File 'lib/qpid_proton/messenger.rb', line 347

def settle(tracker, flag)
  raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
  raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag)
  Cproton.pn_messenger_settle(@impl, tracker.impl, flag)
end

#startObject

Starts the Messenger, allowing it to begin sending and receiving messages.



107
108
109
# File 'lib/qpid_proton/messenger.rb', line 107

def start
  check_for_error(Cproton.pn_messenger_start(@impl))
end

#status(tracker) ⇒ Object

Gets the last known remote state of the delivery associated with the given tracker. See TrackerStatus for details on the values returned.

Options

  • tracker - the tracker

Raises:

  • (TypeError)


333
334
335
336
# File 'lib/qpid_proton/messenger.rb', line 333

def status(tracker)
  raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker)
  Qpid::Proton::TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl))
end

#stopObject

Stops the Messenger, preventing it from sending or receiving any more messages.



114
115
116
# File 'lib/qpid_proton/messenger.rb', line 114

def stop
  check_for_error(Cproton.pn_messenger_stop(@impl))
end

#stoppedObject



118
119
120
# File 'lib/qpid_proton/messenger.rb', line 118

def stopped
  Cproton.pn_messenger_stopped(@impl)
end

#subscribe(address) ⇒ Object

Subscribes the Messenger to a remote address.

Raises:

  • (TypeError)


124
125
126
127
128
129
# File 'lib/qpid_proton/messenger.rb', line 124

def subscribe(address)
  raise TypeError.new("invalid address: #{address}") if address.nil?
  subscription = Cproton.pn_messenger_subscribe(@impl, address)
  raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil?
  Qpid::Proton::Subscription.new(subscription)
end

#timeoutObject

Returns the timeout period



74
75
76
# File 'lib/qpid_proton/messenger.rb', line 74

def timeout
  Cproton.pn_messenger_get_timeout(@impl)
end

#timeout=(timeout) ⇒ Object

Sets the timeout period, in milliseconds.

A negative timeout period implies an infinite timeout.

Options

  • timeout - the timeout period

Raises:

  • (TypeError)


67
68
69
70
# File 'lib/qpid_proton/messenger.rb', line 67

def timeout=(timeout)
  raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil?
  Cproton.pn_messenger_set_timeout(@impl, timeout)
end

#trusted_certificatesObject

The path to the databse of trusted certificates.



184
185
186
# File 'lib/qpid_proton/messenger.rb', line 184

def trusted_certificates
  Cproton.pn_messenger_get_trusted_certificates(@impl)
end

#trusted_certificates=(certificates) ⇒ Object

A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection. If this property is nil, then the peer will not be verified.

Options

  • certificates - the certificates path



178
179
180
# File 'lib/qpid_proton/messenger.rb', line 178

def trusted_certificates=(certificates)
  Cproton.pn_messenger_set_trusted_certificates(@impl,certificates)
end

#work(timeout = -1)) ⇒ Object



248
249
250
251
252
253
254
255
256
# File 'lib/qpid_proton/messenger.rb', line 248

def work(timeout=-1)
  err = Cproton.pn_messenger_work(@impl, timeout)
  if (err == Cproton::PN_TIMEOUT) then
    return false
  else
    check_for_error(err)
    return true
  end
end