Class: Katello::Qpid::Connection

Inherits:
Object
  • Object
show all
Defined in:
app/lib/katello/qpid/connection.rb

Defined Under Namespace

Classes: Receiver, Sender

Instance Method Summary collapse

Constructor Details

#initialize(url:, ssl_cert_file:, ssl_key_file:, ssl_ca_file:) ⇒ Connection

Returns a new instance of Connection.



74
75
76
77
78
79
80
81
82
83
84
85
# File 'app/lib/katello/qpid/connection.rb', line 74

def initialize(url:, ssl_cert_file:, ssl_key_file:, ssl_ca_file:)
  @url = url
  ssl_domain = ::Qpid::Proton::SSLDomain.new(::Qpid::Proton::SSLDomain::MODE_CLIENT)
  ssl_domain.peer_authentication(::Qpid::Proton::SSLDomain::VERIFY_PEER_NAME)
  ssl_domain.credentials(ssl_cert_file, ssl_key_file, nil) if ssl_cert_file && ssl_key_file
  ssl_domain.trusted_ca_db(ssl_ca_file) if ssl_ca_file
  @connection_options = {
    ssl_domain: ssl_domain,
    sasl_allowed_mechs: 'external',
    virtual_host: URI.parse(url).host
  }
end

Instance Method Details

#closeObject



131
132
133
# File 'app/lib/katello/qpid/connection.rb', line 131

def close
  @container&.stop
end

#delete_queue(queue_name) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'app/lib/katello/qpid/connection.rb', line 87

def delete_queue(queue_name)
  address = "qmf.default.direct"
  message = ::Qpid::Proton::Message.new
  message.subject = 'broker'
  message.address = address
  message.body = {
    '_object_id' => {
      '_object_name' => 'org.apache.qpid.broker:broker:amqp-broker'
    },
    '_method_name' => 'delete',
    '_arguments' => {
      'strict' => true,
      'name' => queue_name,
      'type' => 'queue',
      'properties' => {}
    }
  }

  message.properties = {
    'qmf.opcode' => '_method_request',
    'x-amqp-0-10.app-id' => 'qmf2',
    'method' => 'request'
  }

  sender = Sender.new(@url, @connection_options, address, [message])
  with_connection(sender)
end

#open?Boolean

Returns:

  • (Boolean)


135
136
137
# File 'app/lib/katello/qpid/connection.rb', line 135

def open?
  (@container&.running || 0) > 0
end

#receive_messages(address:, handler:) ⇒ Object



126
127
128
129
# File 'app/lib/katello/qpid/connection.rb', line 126

def receive_messages(address:, handler:)
  receiver = Receiver.new(@url, @connection_options, address, handler)
  with_connection(receiver)
end

#send_messages(messages) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'app/lib/katello/qpid/connection.rb', line 115

def send_messages(messages)
  qpid_messages = messages.map do |message|
    msg = ::Qpid::Proton::Message.new
    msg.body = message.to_s
    msg.address = message.recipient_address
    msg
  end
  sender = Sender.new(@url, @connection_options, nil, qpid_messages)
  with_connection(sender)
end