Class: Katello::Qpid::Connection::Sender
- Inherits:
-
Qpid::Proton::MessagingHandler
- Object
- Qpid::Proton::MessagingHandler
- Katello::Qpid::Connection::Sender
- Defined in:
- app/lib/katello/qpid/connection.rb
Instance Method Summary collapse
-
#initialize(url, connection_options, address, messages) ⇒ Sender
constructor
A new instance of Sender.
- #on_container_start(container) ⇒ Object
- #on_message(_delivery, message) ⇒ Object
- #on_sendable(sender) ⇒ Object
- #on_tracker_accept(tracker) ⇒ Object
Constructor Details
#initialize(url, connection_options, address, messages) ⇒ Sender
Returns a new instance of Sender.
7 8 9 10 11 12 13 14 15 |
# File 'app/lib/katello/qpid/connection.rb', line 7 def initialize(url, , address, ) super() @url = url @connection_options = @address = address @messages = @sent = 0 @confirmed = 0 end |
Instance Method Details
#on_container_start(container) ⇒ Object
17 18 19 20 21 |
# File 'app/lib/katello/qpid/connection.rb', line 17 def on_container_start(container) c = container.connect(@url, @connection_options) c.open_sender @receiver = c.open_receiver(@address) if @address end |
#on_message(_delivery, message) ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'app/lib/katello/qpid/connection.rb', line 40 def (_delivery, ) opcode = .properties['qmf.opcode'] if opcode == '_exception' error_code = .body.dig('_values', 'error_code') if error_code != 7 # not found = .body.dig('_values', 'error_text') fail() end end end |
#on_sendable(sender) ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'app/lib/katello/qpid/connection.rb', line 23 def on_sendable(sender) @messages.each do |msg| msg.reply_to = @receiver.remote_source.address if @receiver sender.send(msg) @sent += 1 end ensure sender.close end |
#on_tracker_accept(tracker) ⇒ Object
33 34 35 36 37 38 |
# File 'app/lib/katello/qpid/connection.rb', line 33 def on_tracker_accept(tracker) @confirmed += 1 if @confirmed == @sent tracker.connection.close end end |