Class: Qpid::Proton::Messenger
- Inherits:
-
Object
- Object
- Qpid::Proton::Messenger
- Includes:
- ExceptionHandling
- Defined in:
- lib/qpid_proton/messenger.rb
Overview
A Messenger provides a high-level means for sending and receiving AMQP messages.
Examples
Class Method Summary collapse
-
.finalize!(impl) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#accept(tracker = nil) ⇒ Object
Accepts the incoming message identified by the tracker.
- #blocking ⇒ Object
- #blocking=(blocking) ⇒ Object
-
#certificate ⇒ Object
Returns the path to a certificate file.
-
#certificate=(certificate) ⇒ Object
Path to a certificate file for the
Messenger. -
#errno ⇒ Object
Returns the most recent error number.
-
#error ⇒ Object
Returns the most recent error message.
-
#error? ⇒ Boolean
Reports whether an error occurred.
-
#get(msg = nil) ⇒ Object
Gets a single message incoming message from the local queue.
-
#incoming ⇒ Object
Returns the number of messages in the incoming queue that have not been retrieved.
-
#incoming_tracker ⇒ Object
Returns a
Trackerfor the most recently received message. -
#incoming_window ⇒ Object
Returns the incoming window.
-
#incoming_window=(window) ⇒ Object
Sets the incoming window.
-
#initialize(name = nil) ⇒ Messenger
constructor
Creates a new
Messenger. -
#name ⇒ Object
Returns the name.
-
#outgoing ⇒ Object
Returns the number messages in the outgoing queue that have not been transmitted.
-
#outgoing_tracker ⇒ Object
Returns a
Trackerfor the message most recently sent via the put method. -
#outgoing_window ⇒ Object
Returns the outgoing window.
-
#outgoing_window=(window) ⇒ Object
Sets the outgoing window.
-
#private_key ⇒ Object
Returns the path to a private key file.
-
#private_key=(key) ⇒ Object
Path to a private key file for the
Messenger. -
#put(message) ⇒ Object
Puts a single message into the outgoing queue.
-
#receive(limit = -1)) ⇒ Object
Receives up to the specified number of messages, blocking until at least one message is received.
- #receiving ⇒ Object
-
#reject(tracker) ⇒ Object
Rejects the incoming message identified by the tracker.
-
#send(n = -1)) ⇒ Object
Sends all outgoing messages, blocking until the outgoing queue is empty.
-
#settle(tracker, flag) ⇒ Object
Settles messages for a tracker.
-
#start ⇒ Object
Starts the
Messenger, allowing it to begin sending and receiving messages. -
#status(tracker) ⇒ Object
Gets the last known remote state of the delivery associated with the given tracker.
-
#stop ⇒ Object
Stops the
Messenger, preventing it from sending or receiving any more messages. - #stopped ⇒ Object
-
#subscribe(address) ⇒ Object
Subscribes the
Messengerto a remote address. -
#timeout ⇒ Object
Returns the timeout period.
-
#timeout=(timeout) ⇒ Object
Sets the timeout period, in milliseconds.
-
#trusted_certificates ⇒ Object
The path to the databse of trusted certificates.
-
#trusted_certificates=(certificates) ⇒ Object
A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection.
- #work(timeout = -1)) ⇒ Object
Methods included from ExceptionHandling
Constructor Details
#initialize(name = nil) ⇒ Messenger
Creates a new Messenger.
The name parameter is optional. If one is not provided then a unique name is generated.
Options
-
name - the name (def. nil)
42 43 44 45 |
# File 'lib/qpid_proton/messenger.rb', line 42 def initialize(name = nil) @impl = Cproton.pn_messenger(name) ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) end |
Class Method Details
.finalize!(impl) ⇒ Object
:nodoc:
47 48 49 50 51 |
# File 'lib/qpid_proton/messenger.rb', line 47 def self.finalize!(impl) # :nodoc: proc { Cproton.pn_messenger_free(impl) } end |
Instance Method Details
#accept(tracker = nil) ⇒ Object
Accepts the incoming message identified by the tracker.
Options
-
tracker - the tracker
-
flag - the flag
296 297 298 299 300 301 302 303 304 305 |
# File 'lib/qpid_proton/messenger.rb', line 296 def accept(tracker = nil) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag)) end |
#blocking ⇒ Object
78 79 80 |
# File 'lib/qpid_proton/messenger.rb', line 78 def blocking Cproton.pn_mesenger_is_blocking(@impl) end |
#blocking=(blocking) ⇒ Object
82 83 84 |
# File 'lib/qpid_proton/messenger.rb', line 82 def blocking=(blocking) Cproton.pn_messenger_set_blocking(@impl, blocking) end |
#certificate ⇒ Object
Returns the path to a certificate file.
146 147 148 |
# File 'lib/qpid_proton/messenger.rb', line 146 def certificate Cproton.pn_messenger_get_certificate(@impl) end |
#certificate=(certificate) ⇒ Object
Path to a certificate file for the Messenger.
This certificate is used when the Messenger accepts or establishes SSL/TLS connections.
Options
-
certificate - the certificate
140 141 142 |
# File 'lib/qpid_proton/messenger.rb', line 140 def certificate=(certificate) Cproton.pn_messenger_set_certificate(@impl, certificate) end |
#errno ⇒ Object
Returns the most recent error number.
94 95 96 |
# File 'lib/qpid_proton/messenger.rb', line 94 def errno Cproton.pn_messenger_errno(@impl) end |
#error ⇒ Object
Returns the most recent error message.
100 101 102 |
# File 'lib/qpid_proton/messenger.rb', line 100 def error Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) end |
#error? ⇒ Boolean
Reports whether an error occurred.
88 89 90 |
# File 'lib/qpid_proton/messenger.rb', line 88 def error? !Cproton.pn_messenger_errno(@impl).zero? end |
#get(msg = nil) ⇒ Object
Gets a single message incoming message from the local queue.
If no message is provided in the argument, then one is created. In either case, the one returned will be the fetched message.
Options
-
msg - the (optional)
Messageinstance to be used
221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/qpid_proton/messenger.rb', line 221 def get(msg = nil) msg_impl = nil if msg.nil? then msg_impl = nil else msg_impl = msg.impl end check_for_error(Cproton.pn_messenger_get(@impl, msg_impl)) msg.post_decode unless msg.nil? return incoming_tracker end |
#incoming ⇒ Object
Returns the number of messages in the incoming queue that have not been retrieved.
268 269 270 |
# File 'lib/qpid_proton/messenger.rb', line 268 def incoming Cproton.pn_messenger_incoming(@impl) end |
#incoming_tracker ⇒ Object
Returns a Tracker for the most recently received message.
283 284 285 286 287 |
# File 'lib/qpid_proton/messenger.rb', line 283 def incoming_tracker impl = Cproton.pn_messenger_incoming_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end |
#incoming_window ⇒ Object
Returns the incoming window.
370 371 372 |
# File 'lib/qpid_proton/messenger.rb', line 370 def incoming_window Cproton.pn_messenger_get_incoming_window(@impl) end |
#incoming_window=(window) ⇒ Object
Sets the incoming window.
If the incoming window is set to a positive value, then after each call to #accept or #reject, the object will track the status of that many deliveries.
Options
-
window - the window size
363 364 365 366 |
# File 'lib/qpid_proton/messenger.rb', line 363 def incoming_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) check_for_error(Cproton.pn_messenger_set_incoming_window(@impl, window)) end |
#name ⇒ Object
Returns the name.
55 56 57 |
# File 'lib/qpid_proton/messenger.rb', line 55 def name Cproton.pn_messenger_name(@impl) end |
#outgoing ⇒ Object
Returns the number messages in the outgoing queue that have not been transmitted.
261 262 263 |
# File 'lib/qpid_proton/messenger.rb', line 261 def outgoing Cproton.pn_messenger_outgoing(@impl) end |
#outgoing_tracker ⇒ Object
Returns a Tracker for the message most recently sent via the put method.
275 276 277 278 279 |
# File 'lib/qpid_proton/messenger.rb', line 275 def outgoing_tracker impl = Cproton.pn_messenger_outgoing_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end |
#outgoing_window ⇒ Object
Returns the outgoing window.
390 391 392 |
# File 'lib/qpid_proton/messenger.rb', line 390 def outgoing_window Cproton.pn_messenger_get_outgoing_window(@impl) end |
#outgoing_window=(window) ⇒ Object
Sets the outgoing window.
If the outgoing window is set to a positive value, then after each call to #send, the object will track the status of that many deliveries.
Options
-
window - the window size
383 384 385 386 |
# File 'lib/qpid_proton/messenger.rb', line 383 def outgoing_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) check_for_error(Cproton.pn_messenger_set_outgoing_window(@impl, window)) end |
#private_key ⇒ Object
Returns the path to a private key file.
166 167 168 |
# File 'lib/qpid_proton/messenger.rb', line 166 def private_key Cproton.pn_messenger_get_private_key(@impl) end |
#private_key=(key) ⇒ Object
Path to a private key file for the Messenger.
The property must be specified for the Messenger to accept incoming SSL/TLS connections and to establish client authenticated outgoing SSL/TLS connections.
Options
-
key - the key file
160 161 162 |
# File 'lib/qpid_proton/messenger.rb', line 160 def private_key=(key) Cproton.pn_messenger_set_private_key(@impl, key) end |
#put(message) ⇒ Object
Puts a single message into the outgoing queue.
To ensure messages are sent, you should then call ::send.
Options
-
message - the message
196 197 198 199 200 201 202 203 |
# File 'lib/qpid_proton/messenger.rb', line 196 def put() raise TypeError.new("invalid message: #{message}") if .nil? raise ArgumentError.new("invalid message type: #{message.class}") unless .kind_of?(Message) # encode the message first .pre_encode check_for_error(Cproton.pn_messenger_put(@impl, .impl)) return outgoing_tracker end |
#receive(limit = -1)) ⇒ Object
Receives up to the specified number of messages, blocking until at least one message is received.
Options ====
-
limit - the maximum number of messages to receive
240 241 242 |
# File 'lib/qpid_proton/messenger.rb', line 240 def receive(limit = -1) check_for_error(Cproton.pn_messenger_recv(@impl, limit)) end |
#receiving ⇒ Object
244 245 246 |
# File 'lib/qpid_proton/messenger.rb', line 244 def receiving Cproton.pn_messenger_receiving(@impl) end |
#reject(tracker) ⇒ Object
Rejects the incoming message identified by the tracker.
Options
-
tracker - the tracker
-
flag - the flag
314 315 316 317 318 319 320 321 322 323 |
# File 'lib/qpid_proton/messenger.rb', line 314 def reject(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag)) end |
#send(n = -1)) ⇒ Object
Sends all outgoing messages, blocking until the outgoing queue is empty.
208 209 210 |
# File 'lib/qpid_proton/messenger.rb', line 208 def send(n = -1) check_for_error(Cproton.pn_messenger_send(@impl, n)) end |
#settle(tracker, flag) ⇒ Object
Settles messages for a tracker.
Options
-
tracker - the tracker
-
flag - the flag
Examples
347 348 349 350 351 |
# File 'lib/qpid_proton/messenger.rb', line 347 def settle(tracker, flag) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) raise TypeError.new("invalid flag: #{flag}") unless Qpid::Proton::Tracker.valid_flag?(flag) Cproton.pn_messenger_settle(@impl, tracker.impl, flag) end |
#start ⇒ Object
Starts the Messenger, allowing it to begin sending and receiving messages.
107 108 109 |
# File 'lib/qpid_proton/messenger.rb', line 107 def start check_for_error(Cproton.pn_messenger_start(@impl)) end |
#status(tracker) ⇒ Object
Gets the last known remote state of the delivery associated with the given tracker. See TrackerStatus for details on the values returned.
Options
-
tracker - the tracker
333 334 335 336 |
# File 'lib/qpid_proton/messenger.rb', line 333 def status(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) Qpid::Proton::TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl)) end |
#stop ⇒ Object
Stops the Messenger, preventing it from sending or receiving any more messages.
114 115 116 |
# File 'lib/qpid_proton/messenger.rb', line 114 def stop check_for_error(Cproton.pn_messenger_stop(@impl)) end |
#stopped ⇒ Object
118 119 120 |
# File 'lib/qpid_proton/messenger.rb', line 118 def stopped Cproton.pn_messenger_stopped(@impl) end |
#subscribe(address) ⇒ Object
Subscribes the Messenger to a remote address.
124 125 126 127 128 129 |
# File 'lib/qpid_proton/messenger.rb', line 124 def subscribe(address) raise TypeError.new("invalid address: #{address}") if address.nil? subscription = Cproton.pn_messenger_subscribe(@impl, address) raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? Qpid::Proton::Subscription.new(subscription) end |
#timeout ⇒ Object
Returns the timeout period
74 75 76 |
# File 'lib/qpid_proton/messenger.rb', line 74 def timeout Cproton.pn_messenger_get_timeout(@impl) end |
#timeout=(timeout) ⇒ Object
Sets the timeout period, in milliseconds.
A negative timeout period implies an infinite timeout.
Options
-
timeout - the timeout period
67 68 69 70 |
# File 'lib/qpid_proton/messenger.rb', line 67 def timeout=(timeout) raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil? Cproton.pn_messenger_set_timeout(@impl, timeout) end |
#trusted_certificates ⇒ Object
The path to the databse of trusted certificates.
184 185 186 |
# File 'lib/qpid_proton/messenger.rb', line 184 def trusted_certificates Cproton.pn_messenger_get_trusted_certificates(@impl) end |
#trusted_certificates=(certificates) ⇒ Object
A path to a database of trusted certificates for use in verifying the peer on an SSL/TLS connection. If this property is nil, then the peer will not be verified.
Options
-
certificates - the certificates path
178 179 180 |
# File 'lib/qpid_proton/messenger.rb', line 178 def trusted_certificates=(certificates) Cproton.pn_messenger_set_trusted_certificates(@impl,certificates) end |
#work(timeout = -1)) ⇒ Object
248 249 250 251 252 253 254 255 256 |
# File 'lib/qpid_proton/messenger.rb', line 248 def work(timeout=-1) err = Cproton.pn_messenger_work(@impl, timeout) if (err == Cproton::PN_TIMEOUT) then return false else check_for_error(err) return true end end |