Class: Katello::Qpid::Connection::Sender

Inherits:
Qpid::Proton::MessagingHandler
  • Object
show all
Defined in:
app/lib/katello/qpid/connection.rb

Instance Method Summary collapse

Constructor Details

#initialize(url, connection_options, address, messages) ⇒ Sender

Returns a new instance of Sender.



7
8
9
10
11
12
13
14
15
# File 'app/lib/katello/qpid/connection.rb', line 7

def initialize(url, connection_options, address, messages)
  super()
  @url = url
  @connection_options = connection_options
  @address = address
  @messages = messages
  @sent = 0
  @confirmed = 0
end

Instance Method Details

#on_container_start(container) ⇒ Object



17
18
19
20
21
# File 'app/lib/katello/qpid/connection.rb', line 17

def on_container_start(container)
  c = container.connect(@url, @connection_options)
  c.open_sender
  @receiver = c.open_receiver(@address) if @address
end

#on_message(_delivery, message) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'app/lib/katello/qpid/connection.rb', line 40

def on_message(_delivery, message)
  opcode = message.properties['qmf.opcode']
  if opcode == '_exception'
    error_code = message.body.dig('_values', 'error_code')
    if error_code != 7 # not found
      error_message = message.body.dig('_values', 'error_text')
      fail(error_message)
    end
  end
end

#on_sendable(sender) ⇒ Object



23
24
25
26
27
28
29
30
31
# File 'app/lib/katello/qpid/connection.rb', line 23

def on_sendable(sender)
  @messages.each do |msg|
    msg.reply_to = @receiver.remote_source.address if @receiver
    sender.send(msg)
    @sent += 1
  end
ensure
  sender.close
end

#on_tracker_accept(tracker) ⇒ Object



33
34
35
36
37
38
# File 'app/lib/katello/qpid/connection.rb', line 33

def on_tracker_accept(tracker)
  @confirmed += 1
  if @confirmed == @sent
    tracker.connection.close
  end
end