Class: Hutch::Broker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/broker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(config = nil) ⇒ Broker

Returns a new instance of Broker.

Parameters:

  • config (nil, Hash) (defaults to: nil)

    Configuration override



15
16
17
# File 'lib/hutch/broker.rb', line 15

def initialize(config = nil)
  @config = config || Hutch::Config
end

Instance Attribute Details

#api_clientObject

Returns the value of attribute api_client.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def api_client
  @api_client
end

#channelObject

Returns the value of attribute channel.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def exchange
  @exchange
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



222
223
224
# File 'lib/hutch/broker.rb', line 222

def ack(delivery_tag)
  channel.ack(delivery_tag, false)
end

#bind_queue(queue, routing_keys) ⇒ Object

Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.



191
192
193
194
195
196
197
198
199
# File 'lib/hutch/broker.rb', line 191

def bind_queue(queue, routing_keys)
  unbind_redundant_bindings(queue, routing_keys)

  # Ensure all the desired bindings are present
  routing_keys.each do |routing_key|
    logger.debug "creating binding #{queue.name} <--> #{routing_key}"
    queue.bind(exchange, routing_key: routing_key)
  end
end

#bindingsObject

Return a mapping of queue names to the routing keys they’re bound to.



164
165
166
167
168
169
170
171
172
173
# File 'lib/hutch/broker.rb', line 164

def bindings
  results = Hash.new { |hash, key| hash[key] = [] }
  api_client.bindings.each do |binding|
    next if binding['destination'] == binding['routing_key']
    next unless binding['source'] == @config[:mq_exchange]
    next unless binding['vhost'] == @config[:mq_vhost]
    results[binding['destination']] << binding['routing_key']
  end
  results
end

#confirm_select(*args) ⇒ Object



234
235
236
# File 'lib/hutch/broker.rb', line 234

def confirm_select(*args)
  channel.confirm_select(*args)
end

#connect(options = {}) ⇒ Object

Connect to broker

Examples:

Hutch::Broker.new.connect(enable_http_api_use: true) do
  # will disconnect after this block
end

Parameters:

  • options (Hash) (defaults to: {})

    The options to connect with

Options Hash (options):

  • :enable_http_api_use (Boolean)


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/hutch/broker.rb', line 28

def connect(options = {})
  @options = options
  set_up_amqp_connection
  if http_api_use_enabled?
    logger.info "HTTP API use is enabled"
    set_up_api_connection
  else
    logger.info "HTTP API use is disabled"
  end

  if tracing_enabled?
    logger.info "tracing is enabled using #{@config[:tracer]}"
  else
    logger.info "tracing is disabled"
  end

  if block_given?
    begin
      yield
    ensure
      disconnect
    end
  end
end

#declare_exchange(ch = channel) ⇒ Object



105
106
107
108
109
110
111
112
113
# File 'lib/hutch/broker.rb', line 105

def declare_exchange(ch = channel)
  exchange_name = @config[:mq_exchange]
  exchange_options = { durable: true }.merge(@config[:mq_exchange_options])
  logger.info "using topic exchange '#{exchange_name}'"

  with_bunny_precondition_handler('exchange') do
    ch.topic(exchange_name, exchange_options)
  end
end

#declare_exchange!(*args) ⇒ Object



115
116
117
# File 'lib/hutch/broker.rb', line 115

def declare_exchange!(*args)
  @exchange = declare_exchange(*args)
end

#declare_publisher!Object



119
120
121
# File 'lib/hutch/broker.rb', line 119

def declare_publisher!
  @publisher = Hutch::Publisher.new(connection, channel, exchange, @config)
end

#disconnectObject



53
54
55
56
57
58
59
60
# File 'lib/hutch/broker.rb', line 53

def disconnect
  @channel.close    if @channel
  @connection.close if @connection
  @channel = nil
  @connection = nil
  @exchange = nil
  @api_client = nil
end

#http_api_use_enabled?Boolean

Returns:

  • (Boolean)


139
140
141
142
143
144
145
146
147
148
# File 'lib/hutch/broker.rb', line 139

def http_api_use_enabled?
  op = @options.fetch(:enable_http_api_use, true)
  cf = if @config[:enable_http_api_use].nil?
         true
       else
         @config[:enable_http_api_use]
       end

  op && cf
end

#nack(delivery_tag) ⇒ Object



226
227
228
# File 'lib/hutch/broker.rb', line 226

def nack(delivery_tag)
  channel.nack(delivery_tag, false, false)
end

#open_channelObject



90
91
92
93
94
95
96
97
98
99
# File 'lib/hutch/broker.rb', line 90

def open_channel
  logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}, abort on exception #{consumer_pool_abort_on_exception}"
  connection.create_channel(nil, consumer_pool_size, consumer_pool_abort_on_exception).tap do |ch|
    connection.prefetch_channel(ch, @config[:channel_prefetch])
    if @config[:publisher_confirms] || @config[:force_publisher_confirms]
      logger.info 'enabling publisher confirms'
      ch.confirm_select
    end
  end
end

#open_channel!Object



101
102
103
# File 'lib/hutch/broker.rb', line 101

def open_channel!
  @channel = open_channel
end

#open_connectionObject



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/hutch/broker.rb', line 73

def open_connection
  logger.info "connecting to rabbitmq (#{sanitized_uri})"

  connection = Hutch::Adapter.new(connection_params)

  with_bunny_connection_handler(sanitized_uri) do
    connection.start
  end

  logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}"
  connection
end

#open_connection!Object



86
87
88
# File 'lib/hutch/broker.rb', line 86

def open_connection!
  @connection = open_connection
end

#publish(*args) ⇒ Object



230
231
232
# File 'lib/hutch/broker.rb', line 230

def publish(*args)
  @publisher.publish(*args)
end

#queue(name, arguments = {}) ⇒ Object

Create / get a durable queue and apply namespace if it exists.



155
156
157
158
159
160
161
# File 'lib/hutch/broker.rb', line 155

def queue(name, arguments = {})
  with_bunny_precondition_handler('queue') do
    namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
    name = name.prepend(namespace + ":") if namespace.present?
    channel.queue(name, durable: true, arguments: arguments)
  end
end

#reject(delivery_tag, requeue = false) ⇒ Object



218
219
220
# File 'lib/hutch/broker.rb', line 218

def reject(delivery_tag, requeue=false)
  channel.reject(delivery_tag, requeue)
end

#requeue(delivery_tag) ⇒ Object



214
215
216
# File 'lib/hutch/broker.rb', line 214

def requeue(delivery_tag)
  channel.reject(delivery_tag, true)
end

#set_up_amqp_connectionObject

Connect to RabbitMQ via AMQP

This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existence of the exchange we’ll be using.



66
67
68
69
70
71
# File 'lib/hutch/broker.rb', line 66

def set_up_amqp_connection
  open_connection!
  open_channel!
  declare_exchange!
  declare_publisher!
end

#set_up_api_connectionObject

Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/hutch/broker.rb', line 126

def set_up_api_connection
  logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})"

  with_authentication_error_handler do
    with_connection_error_handler do
      @api_client = CarrotTop.new(host: api_config.host, port: api_config.port,
                                  user: api_config.username, password: api_config.password,
                                  ssl: api_config.ssl)
      @api_client.exchanges
    end
  end
end

#stopObject



201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/hutch/broker.rb', line 201

def stop
  if defined?(JRUBY_VERSION)
    channel.close
  else
    # Enqueue a failing job that kills the consumer loop
    channel_work_pool.shutdown
    # Give `timeout` seconds to jobs that are still being processed
    channel_work_pool.join(@config[:graceful_exit_timeout])
    # If after `timeout` they are still running, they are killed
    channel_work_pool.kill
  end
end

#tracing_enabled?Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/hutch/broker.rb', line 150

def tracing_enabled?
  @config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer
end

#unbind_redundant_bindings(queue, routing_keys) ⇒ Object

Find the existing bindings, and unbind any redundant bindings



176
177
178
179
180
181
182
183
184
185
186
# File 'lib/hutch/broker.rb', line 176

def unbind_redundant_bindings(queue, routing_keys)
  return unless http_api_use_enabled?

  bindings.each do |dest, keys|
    next unless dest == queue.name
    keys.reject { |key| routing_keys.include?(key) }.each do |key|
      logger.debug "removing redundant binding #{queue.name} <--> #{key}"
      queue.unbind(exchange, routing_key: key)
    end
  end
end

#using_publisher_confirmations?Boolean

Returns True if channel is set up to use publisher confirmations.

Returns:

  • (Boolean)

    True if channel is set up to use publisher confirmations.



243
244
245
# File 'lib/hutch/broker.rb', line 243

def using_publisher_confirmations?
  channel.using_publisher_confirmations?
end

#wait_for_confirmsObject



238
239
240
# File 'lib/hutch/broker.rb', line 238

def wait_for_confirms
  channel.wait_for_confirms
end