Class: Katello::Qpid::Connection
- Inherits:
-
Object
- Object
- Katello::Qpid::Connection
show all
- Defined in:
- app/lib/katello/qpid/connection.rb
Defined Under Namespace
Classes: Receiver, Sender
Instance Method Summary
collapse
Constructor Details
#initialize(url:, ssl_cert_file:, ssl_key_file:, ssl_ca_file:) ⇒ Connection
Returns a new instance of Connection.
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'app/lib/katello/qpid/connection.rb', line 74
def initialize(url:, ssl_cert_file:, ssl_key_file:, ssl_ca_file:)
@url = url
ssl_domain = ::Qpid::Proton::SSLDomain.new(::Qpid::Proton::SSLDomain::MODE_CLIENT)
ssl_domain.peer_authentication(::Qpid::Proton::SSLDomain::VERIFY_PEER_NAME)
ssl_domain.credentials(ssl_cert_file, ssl_key_file, nil) if ssl_cert_file && ssl_key_file
ssl_domain.trusted_ca_db(ssl_ca_file) if ssl_ca_file
@connection_options = {
ssl_domain: ssl_domain,
sasl_allowed_mechs: 'external',
virtual_host: URI.parse(url).host
}
end
|
Instance Method Details
#close ⇒ Object
131
132
133
|
# File 'app/lib/katello/qpid/connection.rb', line 131
def close
@container&.stop
end
|
#delete_queue(queue_name) ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'app/lib/katello/qpid/connection.rb', line 87
def delete_queue(queue_name)
address = "qmf.default.direct"
message = ::Qpid::Proton::Message.new
message.subject = 'broker'
message.address = address
message.body = {
'_object_id' => {
'_object_name' => 'org.apache.qpid.broker:broker:amqp-broker'
},
'_method_name' => 'delete',
'_arguments' => {
'strict' => true,
'name' => queue_name,
'type' => 'queue',
'properties' => {}
}
}
message.properties = {
'qmf.opcode' => '_method_request',
'x-amqp-0-10.app-id' => 'qmf2',
'method' => 'request'
}
sender = Sender.new(@url, @connection_options, address, [message])
with_connection(sender)
end
|
#open? ⇒ Boolean
135
136
137
|
# File 'app/lib/katello/qpid/connection.rb', line 135
def open?
(@container&.running || 0) > 0
end
|
#receive_messages(address:, handler:) ⇒ Object
126
127
128
129
|
# File 'app/lib/katello/qpid/connection.rb', line 126
def receive_messages(address:, handler:)
receiver = Receiver.new(@url, @connection_options, address, handler)
with_connection(receiver)
end
|
#send_messages(messages) ⇒ Object
115
116
117
118
119
120
121
122
123
124
|
# File 'app/lib/katello/qpid/connection.rb', line 115
def send_messages(messages)
qpid_messages = messages.map do |message|
msg = ::Qpid::Proton::Message.new
msg.body = message.to_s
msg.address = message.recipient_address
msg
end
sender = Sender.new(@url, @connection_options, nil, qpid_messages)
with_connection(sender)
end
|