Class: Hutch::Broker
Instance Attribute Summary collapse
-
#api_client ⇒ Object
Returns the value of attribute api_client.
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
Instance Method Summary collapse
- #ack(delivery_tag) ⇒ Object
-
#bind_queue(queue, routing_keys) ⇒ Object
Bind a queue to the broker’s exchange on the routing keys provided.
-
#bindings ⇒ Object
Return a mapping of queue names to the routing keys they’re bound to.
- #connect ⇒ Object
- #disconnect ⇒ Object
-
#initialize(config = nil) ⇒ Broker
constructor
A new instance of Broker.
- #publish(routing_key, message) ⇒ Object
-
#queue(name) ⇒ Object
Create / get a durable queue.
-
#set_up_amqp_connection ⇒ Object
Connect to RabbitMQ via AMQP.
-
#set_up_api_connection ⇒ Object
Set up the connection to the RabbitMQ management API.
- #stop ⇒ Object
-
#wait_on_threads(timeout) ⇒ Object
Each subscriber is run in a thread.
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
#initialize(config = nil) ⇒ Broker
Returns a new instance of Broker.
13 14 15 |
# File 'lib/hutch/broker.rb', line 13 def initialize(config = nil) @config = config || Hutch::Config end |
Instance Attribute Details
#api_client ⇒ Object
Returns the value of attribute api_client.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def api_client @api_client end |
#channel ⇒ Object
Returns the value of attribute channel.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def channel @channel end |
#connection ⇒ Object
Returns the value of attribute connection.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def connection @connection end |
#exchange ⇒ Object
Returns the value of attribute exchange.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def exchange @exchange end |
Instance Method Details
#ack(delivery_tag) ⇒ Object
150 151 152 |
# File 'lib/hutch/broker.rb', line 150 def ack(delivery_tag) @channel.ack(delivery_tag, false) end |
#bind_queue(queue, routing_keys) ⇒ Object
Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/hutch/broker.rb', line 115 def bind_queue(queue, routing_keys) # Find the existing bindings, and unbind any redundant bindings queue_bindings = bindings.select { |dest, keys| dest == queue.name } queue_bindings.each do |dest, keys| keys.reject { |key| routing_keys.include?(key) }.each do |key| logger.debug "removing redundant binding #{queue.name} <--> #{key}" queue.unbind(@exchange, routing_key: key) end end # Ensure all the desired bindings are present routing_keys.each do |routing_key| logger.debug "creating binding #{queue.name} <--> #{routing_key}" queue.bind(@exchange, routing_key: routing_key) end end |
#bindings ⇒ Object
Return a mapping of queue names to the routing keys they’re bound to.
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/hutch/broker.rb', line 101 def bindings results = Hash.new { |hash, key| hash[key] = [] } @api_client.bindings.each do |binding| next if binding['destination'] == binding['routing_key'] next unless binding['source'] == @config[:mq_exchange] next unless binding['vhost'] == @config[:mq_vhost] results[binding['destination']] << binding['routing_key'] end results end |
#connect ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/hutch/broker.rb', line 17 def connect set_up_amqp_connection set_up_api_connection if block_given? yield disconnect end end |
#disconnect ⇒ Object
27 28 29 30 31 |
# File 'lib/hutch/broker.rb', line 27 def disconnect @channel.close if @channel @connection.close if @connection @channel, @connection, @exchange, @api_client = nil, nil, nil, nil end |
#publish(routing_key, message) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/hutch/broker.rb', line 154 def publish(routing_key, ) payload = JSON.dump() if @connection && @connection.open? logger.info "publishing message '#{message.inspect}' to #{routing_key}" @exchange.publish(payload, routing_key: routing_key, persistent: true, timestamp: Time.now.to_i, message_id: generate_id) else logger.error "Unable to publish : routing key: #{routing_key}, " + "message: #{message}" end end |
#queue(name) ⇒ Object
Create / get a durable queue.
96 97 98 |
# File 'lib/hutch/broker.rb', line 96 def queue(name) @channel.queue(name, durable: true) end |
#set_up_amqp_connection ⇒ Object
Connect to RabbitMQ via AMQP. This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existance of the exchange we’ll be using.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/hutch/broker.rb', line 36 def set_up_amqp_connection host, port, vhost = @config[:mq_host], @config[:mq_port] username, password = @config[:mq_username], @config[:mq_password] vhost, tls = @config[:mq_vhost], @config[:mq_tls] protocol = tls ? "amqps://" : "amqp://" uri = "#{username}:#{password}@#{host}:#{port}/#{vhost.sub(/^\//, '')}" logger.info "connecting to rabbitmq (#{protocol}#{uri})" @connection = Bunny.new(host: host, port: port, vhost: vhost, tls: tls, username: username, password: password, heartbeat: 1, automatically_recover: true, network_recovery_interval: 1) @connection.start logger.info 'opening rabbitmq channel' @channel = @connection.create_channel exchange_name = @config[:mq_exchange] logger.info "using topic exchange '#{exchange_name}'" @exchange = @channel.topic(exchange_name, durable: true) rescue Bunny::TCPConnectionFailed => ex logger.error "amqp connection error: #{ex.message.downcase}" uri = "#{protocol}#{host}:#{port}" raise ConnectionError.new("couldn't connect to rabbitmq at #{uri}") rescue Bunny::PreconditionFailed => ex logger.error ex. raise WorkerSetupError.new('could not create exchange due to a type ' + 'conflict with an existing exchange, ' + 'remove the existing exchange and try again') end |
#set_up_api_connection ⇒ Object
Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/hutch/broker.rb', line 70 def set_up_api_connection host, port = @config[:mq_api_host], @config[:mq_api_port] username, password = @config[:mq_username], @config[:mq_password] ssl = @config[:mq_api_ssl] protocol = ssl ? "https://" : "http://" management_uri = "#{protocol}#{username}:#{password}@#{host}:#{port}/" logger.info "connecting to rabbitmq management api (#{management_uri})" @api_client = CarrotTop.new(host: host, port: port, user: username, password: password, ssl: ssl) @api_client.exchanges rescue Errno::ECONNREFUSED => ex logger.error "api connection error: #{ex.message.downcase}" raise ConnectionError.new("couldn't connect to api at #{management_uri}") rescue Net::HTTPServerException => ex logger.error "api connection error: #{ex.message.downcase}" if ex.response.code == '401' raise AuthenticationError.new('invalid api credentials') else raise end end |
#stop ⇒ Object
146 147 148 |
# File 'lib/hutch/broker.rb', line 146 def stop @channel.work_pool.kill end |
#wait_on_threads(timeout) ⇒ Object
Each subscriber is run in a thread. This calls Thread#join on each of the subscriber threads.
134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/hutch/broker.rb', line 134 def wait_on_threads(timeout) # HACK: work_pool.join doesn't allow a timeout to be passed in, so we # use instance_variable_get to gain access to the threadpool, and # manuall call thread.join with a timeout threads = work_pool_threads # Thread#join returns nil when the timeout is hit. If any return nil, # the threads didn't all join so we return false. per_thread_timeout = timeout.to_f / threads.length threads.none? { |thread| thread.join(per_thread_timeout).nil? } end |