Class: Hutch::Broker
- Inherits:
-
Object
show all
- Includes:
- Logging
- Defined in:
- lib/hutch/broker.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
#initialize(config = nil) ⇒ Broker
Returns a new instance of Broker.
15
16
17
|
# File 'lib/hutch/broker.rb', line 15
def initialize(config = nil)
@config = config || Hutch::Config
end
|
Instance Attribute Details
#api_client ⇒ Object
Returns the value of attribute api_client.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def api_client
@api_client
end
|
#channel ⇒ Object
Returns the value of attribute channel.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def channel
@channel
end
|
#connection ⇒ Object
Returns the value of attribute connection.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def connection
@connection
end
|
#exchange ⇒ Object
Returns the value of attribute exchange.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def exchange
@exchange
end
|
Instance Method Details
#ack(delivery_tag) ⇒ Object
222
223
224
|
# File 'lib/hutch/broker.rb', line 222
def ack(delivery_tag)
channel.ack(delivery_tag, false)
end
|
#bind_queue(queue, routing_keys) ⇒ Object
Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.
191
192
193
194
195
196
197
198
199
|
# File 'lib/hutch/broker.rb', line 191
def bind_queue(queue, routing_keys)
unbind_redundant_bindings(queue, routing_keys)
routing_keys.each do |routing_key|
logger.debug "creating binding #{queue.name} <--> #{routing_key}"
queue.bind(exchange, routing_key: routing_key)
end
end
|
#bindings ⇒ Object
Return a mapping of queue names to the routing keys they’re bound to.
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/hutch/broker.rb', line 164
def bindings
results = Hash.new { |hash, key| hash[key] = [] }
api_client.bindings.each do |binding|
next if binding['destination'] == binding['routing_key']
next unless binding['source'] == @config[:mq_exchange]
next unless binding['vhost'] == @config[:mq_vhost]
results[binding['destination']] << binding['routing_key']
end
results
end
|
#confirm_select(*args) ⇒ Object
234
235
236
|
# File 'lib/hutch/broker.rb', line 234
def confirm_select(*args)
channel.confirm_select(*args)
end
|
#connect(options = {}) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/hutch/broker.rb', line 28
def connect(options = {})
@options = options
set_up_amqp_connection
if http_api_use_enabled?
logger.info "HTTP API use is enabled"
set_up_api_connection
else
logger.info "HTTP API use is disabled"
end
if tracing_enabled?
logger.info "tracing is enabled using #{@config[:tracer]}"
else
logger.info "tracing is disabled"
end
if block_given?
begin
yield
ensure
disconnect
end
end
end
|
#declare_exchange(ch = channel) ⇒ Object
105
106
107
108
109
110
111
112
113
|
# File 'lib/hutch/broker.rb', line 105
def declare_exchange(ch = channel)
exchange_name = @config[:mq_exchange]
exchange_options = { durable: true }.merge(@config[:mq_exchange_options])
logger.info "using topic exchange '#{exchange_name}'"
with_bunny_precondition_handler('exchange') do
ch.topic(exchange_name, exchange_options)
end
end
|
#declare_exchange!(*args) ⇒ Object
115
116
117
|
# File 'lib/hutch/broker.rb', line 115
def declare_exchange!(*args)
@exchange = declare_exchange(*args)
end
|
#declare_publisher! ⇒ Object
119
120
121
|
# File 'lib/hutch/broker.rb', line 119
def declare_publisher!
@publisher = Hutch::Publisher.new(connection, channel, exchange, @config)
end
|
#disconnect ⇒ Object
53
54
55
56
57
58
59
60
|
# File 'lib/hutch/broker.rb', line 53
def disconnect
@channel.close if @channel
@connection.close if @connection
@channel = nil
@connection = nil
@exchange = nil
@api_client = nil
end
|
#http_api_use_enabled? ⇒ Boolean
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/hutch/broker.rb', line 139
def http_api_use_enabled?
op = @options.fetch(:enable_http_api_use, true)
cf = if @config[:enable_http_api_use].nil?
true
else
@config[:enable_http_api_use]
end
op && cf
end
|
#nack(delivery_tag) ⇒ Object
226
227
228
|
# File 'lib/hutch/broker.rb', line 226
def nack(delivery_tag)
channel.nack(delivery_tag, false, false)
end
|
#open_channel ⇒ Object
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/hutch/broker.rb', line 90
def open_channel
logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}, abort on exception #{consumer_pool_abort_on_exception}"
connection.create_channel(nil, consumer_pool_size, consumer_pool_abort_on_exception).tap do |ch|
connection.prefetch_channel(ch, @config[:channel_prefetch])
if @config[:publisher_confirms] || @config[:force_publisher_confirms]
logger.info 'enabling publisher confirms'
ch.confirm_select
end
end
end
|
#open_channel! ⇒ Object
101
102
103
|
# File 'lib/hutch/broker.rb', line 101
def open_channel!
@channel = open_channel
end
|
#open_connection ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/hutch/broker.rb', line 73
def open_connection
logger.info "connecting to rabbitmq (#{sanitized_uri})"
connection = Hutch::Adapter.new(connection_params)
with_bunny_connection_handler(sanitized_uri) do
connection.start
end
logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}"
connection
end
|
#open_connection! ⇒ Object
86
87
88
|
# File 'lib/hutch/broker.rb', line 86
def open_connection!
@connection = open_connection
end
|
#publish(*args) ⇒ Object
230
231
232
|
# File 'lib/hutch/broker.rb', line 230
def publish(*args)
@publisher.publish(*args)
end
|
#queue(name, arguments = {}) ⇒ Object
Create / get a durable queue and apply namespace if it exists.
155
156
157
158
159
160
161
|
# File 'lib/hutch/broker.rb', line 155
def queue(name, arguments = {})
with_bunny_precondition_handler('queue') do
namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
name = name.prepend(namespace + ":") if namespace.present?
channel.queue(name, durable: true, arguments: arguments)
end
end
|
#reject(delivery_tag, requeue = false) ⇒ Object
218
219
220
|
# File 'lib/hutch/broker.rb', line 218
def reject(delivery_tag, requeue=false)
channel.reject(delivery_tag, requeue)
end
|
#requeue(delivery_tag) ⇒ Object
214
215
216
|
# File 'lib/hutch/broker.rb', line 214
def requeue(delivery_tag)
channel.reject(delivery_tag, true)
end
|
#set_up_amqp_connection ⇒ Object
Connect to RabbitMQ via AMQP
This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existence of the exchange we’ll be using.
66
67
68
69
70
71
|
# File 'lib/hutch/broker.rb', line 66
def set_up_amqp_connection
open_connection!
open_channel!
declare_exchange!
declare_publisher!
end
|
#set_up_api_connection ⇒ Object
Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/hutch/broker.rb', line 126
def set_up_api_connection
logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})"
with_authentication_error_handler do
with_connection_error_handler do
@api_client = CarrotTop.new(host: api_config.host, port: api_config.port,
user: api_config.username, password: api_config.password,
ssl: api_config.ssl)
@api_client.exchanges
end
end
end
|
#stop ⇒ Object
201
202
203
204
205
206
207
208
209
210
211
212
|
# File 'lib/hutch/broker.rb', line 201
def stop
if defined?(JRUBY_VERSION)
channel.close
else
channel_work_pool.shutdown
channel_work_pool.join(@config[:graceful_exit_timeout])
channel_work_pool.kill
end
end
|
#tracing_enabled? ⇒ Boolean
150
151
152
|
# File 'lib/hutch/broker.rb', line 150
def tracing_enabled?
@config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer
end
|
#unbind_redundant_bindings(queue, routing_keys) ⇒ Object
Find the existing bindings, and unbind any redundant bindings
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/hutch/broker.rb', line 176
def unbind_redundant_bindings(queue, routing_keys)
return unless http_api_use_enabled?
bindings.each do |dest, keys|
next unless dest == queue.name
keys.reject { |key| routing_keys.include?(key) }.each do |key|
logger.debug "removing redundant binding #{queue.name} <--> #{key}"
queue.unbind(exchange, routing_key: key)
end
end
end
|
#using_publisher_confirmations? ⇒ Boolean
Returns True if channel is set up to use publisher confirmations.
243
244
245
|
# File 'lib/hutch/broker.rb', line 243
def using_publisher_confirmations?
channel.using_publisher_confirmations?
end
|
#wait_for_confirms ⇒ Object
238
239
240
|
# File 'lib/hutch/broker.rb', line 238
def wait_for_confirms
channel.wait_for_confirms
end
|