Class: Hutch::Broker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/broker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(config = nil) ⇒ Broker

Returns a new instance of Broker.



13
14
15
# File 'lib/hutch/broker.rb', line 13

def initialize(config = nil)
  @config = config || Hutch::Config
end

Instance Attribute Details

#api_clientObject

Returns the value of attribute api_client.



11
12
13
# File 'lib/hutch/broker.rb', line 11

def api_client
  @api_client
end

#channelObject

Returns the value of attribute channel.



11
12
13
# File 'lib/hutch/broker.rb', line 11

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



11
12
13
# File 'lib/hutch/broker.rb', line 11

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.



11
12
13
# File 'lib/hutch/broker.rb', line 11

def exchange
  @exchange
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



150
151
152
# File 'lib/hutch/broker.rb', line 150

def ack(delivery_tag)
  @channel.ack(delivery_tag, false)
end

#bind_queue(queue, routing_keys) ⇒ Object

Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/hutch/broker.rb', line 115

def bind_queue(queue, routing_keys)
  # Find the existing bindings, and unbind any redundant bindings
  queue_bindings = bindings.select { |dest, keys| dest == queue.name }
  queue_bindings.each do |dest, keys|
    keys.reject { |key| routing_keys.include?(key) }.each do |key|
      logger.debug "removing redundant binding #{queue.name} <--> #{key}"
      queue.unbind(@exchange, routing_key: key)
    end
  end

  # Ensure all the desired bindings are present
  routing_keys.each do |routing_key|
    logger.debug "creating binding #{queue.name} <--> #{routing_key}"
    queue.bind(@exchange, routing_key: routing_key)
  end
end

#bindingsObject

Return a mapping of queue names to the routing keys they’re bound to.



101
102
103
104
105
106
107
108
109
110
# File 'lib/hutch/broker.rb', line 101

def bindings
  results = Hash.new { |hash, key| hash[key] = [] }
  @api_client.bindings.each do |binding|
    next if binding['destination'] == binding['routing_key']
    next unless binding['source'] == @config[:mq_exchange]
    next unless binding['vhost'] == @config[:mq_vhost]
    results[binding['destination']] << binding['routing_key']
  end
  results
end

#connectObject



17
18
19
20
21
22
23
24
25
# File 'lib/hutch/broker.rb', line 17

def connect
  set_up_amqp_connection
  set_up_api_connection

  if block_given?
    yield
    disconnect
  end
end

#disconnectObject



27
28
29
30
31
# File 'lib/hutch/broker.rb', line 27

def disconnect
  @channel.close    if @channel
  @connection.close if @connection
  @channel, @connection, @exchange, @api_client = nil, nil, nil, nil
end

#publish(routing_key, message) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/hutch/broker.rb', line 154

def publish(routing_key, message)
  payload = JSON.dump(message)

  if @connection && @connection.open?
    logger.info "publishing message '#{message.inspect}' to #{routing_key}"
    @exchange.publish(payload, routing_key: routing_key, persistent: true,
                      timestamp: Time.now.to_i, message_id: generate_id)
  else
    logger.error "Unable to publish : routing key: #{routing_key}, " +
                 "message: #{message}"
  end
end

#queue(name) ⇒ Object

Create / get a durable queue.



96
97
98
# File 'lib/hutch/broker.rb', line 96

def queue(name)
  @channel.queue(name, durable: true)
end

#set_up_amqp_connectionObject

Connect to RabbitMQ via AMQP. This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existance of the exchange we’ll be using.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/hutch/broker.rb', line 36

def set_up_amqp_connection
  host, port, vhost = @config[:mq_host], @config[:mq_port]
  username, password = @config[:mq_username], @config[:mq_password]
  vhost, tls = @config[:mq_vhost], @config[:mq_tls]
  protocol = tls ? "amqps://" : "amqp://"
  uri = "#{username}:#{password}@#{host}:#{port}/#{vhost.sub(/^\//, '')}"
  logger.info "connecting to rabbitmq (#{protocol}#{uri})"

  @connection = Bunny.new(host: host, port: port, vhost: vhost, tls: tls,
                          username: username, password: password,
                          heartbeat: 1, automatically_recover: true,
                          network_recovery_interval: 1)
  @connection.start

  logger.info 'opening rabbitmq channel'
  @channel = @connection.create_channel

  exchange_name = @config[:mq_exchange]
  logger.info "using topic exchange '#{exchange_name}'"
  @exchange = @channel.topic(exchange_name, durable: true)
rescue Bunny::TCPConnectionFailed => ex
  logger.error "amqp connection error: #{ex.message.downcase}"
  uri = "#{protocol}#{host}:#{port}"
  raise ConnectionError.new("couldn't connect to rabbitmq at #{uri}")
rescue Bunny::PreconditionFailed => ex
  logger.error ex.message
  raise WorkerSetupError.new('could not create exchange due to a type ' +
                             'conflict with an existing exchange, ' +
                             'remove the existing exchange and try again')
end

#set_up_api_connectionObject

Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/hutch/broker.rb', line 70

def set_up_api_connection
  host, port = @config[:mq_api_host], @config[:mq_api_port]
  username, password = @config[:mq_username], @config[:mq_password]
  ssl = @config[:mq_api_ssl]

  protocol = ssl ? "https://" : "http://"
  management_uri = "#{protocol}#{username}:#{password}@#{host}:#{port}/"
  logger.info "connecting to rabbitmq management api (#{management_uri})"

  @api_client = CarrotTop.new(host: host, port: port,
                              user: username, password: password,
                              ssl: ssl)
  @api_client.exchanges
rescue Errno::ECONNREFUSED => ex
  logger.error "api connection error: #{ex.message.downcase}"
  raise ConnectionError.new("couldn't connect to api at #{management_uri}")
rescue Net::HTTPServerException => ex
  logger.error "api connection error: #{ex.message.downcase}"
  if ex.response.code == '401'
    raise AuthenticationError.new('invalid api credentials')
  else
    raise
  end
end

#stopObject



146
147
148
# File 'lib/hutch/broker.rb', line 146

def stop
  @channel.work_pool.kill
end

#wait_on_threads(timeout) ⇒ Object

Each subscriber is run in a thread. This calls Thread#join on each of the subscriber threads.



134
135
136
137
138
139
140
141
142
143
144
# File 'lib/hutch/broker.rb', line 134

def wait_on_threads(timeout)
  # HACK: work_pool.join doesn't allow a timeout to be passed in, so we
  #       use instance_variable_get to gain access to the threadpool, and
  #       manuall call thread.join with a timeout
  threads = work_pool_threads

  # Thread#join returns nil when the timeout is hit. If any return nil,
  # the threads didn't all join so we return false.
  per_thread_timeout = timeout.to_f / threads.length
  threads.none? { |thread| thread.join(per_thread_timeout).nil? }
end