Class: Beetle::Client

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/beetle/client.rb

Overview

This class provides the interface through which messaging is configured for both message producers and consumers. It keeps references to an instance of a Beetle::Subscriber, a Beetle::Publisher (both of which are instantiated on demand), and a reference to an instance of Beetle::DeduplicationStore.

Configuration of exchanges, queues, messages, and message handlers is done by calls to corresponding register_ methods. Note that these methods just build up the configuration, they don’t interact with the AMQP servers.

On the publisher side, publishing a message will ensure that the exchange it will be sent to, and each of the queues bound to the exchange, will be created on demand. On the subscriber side, exchanges, queues, bindings and queue subscriptions will be created when the application calls the listen_queues method. An application can decide to subscribe to only a subset of the configured queues by passing a list of queue names to the listen method.

The net effect of this strategy is that producers and consumers can be started in any order, so that no message is lost if message producers are accidentally started before the corresponding consumers.

Defined Under Namespace

Classes: Configurator

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger

Constructor Details

#initialize(config = Beetle.config) ⇒ Client

create a fresh Client instance from a given configuration object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/beetle/client.rb', line 51

def initialize(config = Beetle.config)
  @config  = config
  @exchanges = {}
  @queues = {}
  @messages = {}
  @bindings = {}
  @deduplication_store = DeduplicationStore.new(config)
  @queue_properties = QueueProperties.new(config)
  load_brokers_from_config
  register_exchange(config.beetle_policy_exchange_name)
  # make sure dead lettering is false for the policy update queue
  register_queue(
    config.beetle_policy_updates_queue_name,
    :exchange => config.beetle_policy_exchange_name,
    :key => config.beetle_policy_updates_routing_key,
    :dead_lettering => false,
    :lazy => false,
  )
end

Instance Attribute Details

#additional_subscription_serversObject (readonly)

additional AMQP servers available for subscribing. useful for migration scenarios.



30
31
32
# File 'lib/beetle/client.rb', line 30

def additional_subscription_servers
  @additional_subscription_servers
end

#bindingsObject (readonly)

an options hash for the configured queue bindings



39
40
41
# File 'lib/beetle/client.rb', line 39

def bindings
  @bindings
end

#configObject (readonly)

accessor for the beetle configuration



48
49
50
# File 'lib/beetle/client.rb', line 48

def config
  @config
end

#deduplication_storeObject (readonly)

the deduplication store to use for this client



45
46
47
# File 'lib/beetle/client.rb', line 45

def deduplication_store
  @deduplication_store
end

#exchangesObject (readonly)

an options hash for the configured exchanges



33
34
35
# File 'lib/beetle/client.rb', line 33

def exchanges
  @exchanges
end

#messagesObject (readonly)

an options hash for the configured messages



42
43
44
# File 'lib/beetle/client.rb', line 42

def messages
  @messages
end

#queuesObject (readonly)

an options hash for the configured queues



36
37
38
# File 'lib/beetle/client.rb', line 36

def queues
  @queues
end

#serversObject (readonly)

the AMQP servers available for publishing



27
28
29
# File 'lib/beetle/client.rb', line 27

def servers
  @servers
end

Instance Method Details

#configure(options = {}, &block) ⇒ Object

this is a convenience method to configure exchanges, queues, messages and handlers with a common set of options. allows one to call all register methods without the register_ prefix. returns self. if the passed in block has no parameters, the block will be evaluated in the context of the client configurator.

Example: (block with config argument)

client = Beetle.client.new.configure :exchange => :foobar do |config|
  config.queue :q1, :key => "foo"
  config.queue :q2, :key => "bar"
  config.message :foo
  config.message :bar
  config.handler :q1 { puts "got foo"}
  config.handler :q2 { puts "got bar"}
end

Example: (block without config argument)

client = Beetle.client.new.configure :exchange => :foobar do
  queue :q1, :key => "foo"
  queue :q2, :key => "bar"
  message :foo
  message :bar
  handler :q1 { puts "got foo"}
  handler :q2 { puts "got bar"}
end


194
195
196
197
198
199
200
201
202
# File 'lib/beetle/client.rb', line 194

def configure(options={}, &block)
  configurator = Configurator.new(self, options)
  if block.arity == 1
    yield configurator
  else
    configurator.instance_eval(&block)
  end
  self
end

#listen(_deprecated_messages = nil, &block) ⇒ Object

start listening to all registered queues. Calls #listen_queues internally runs the given block before entering the eventmachine loop.

Raises:



227
228
229
230
# File 'lib/beetle/client.rb', line 227

def listen(_deprecated_messages=nil, &block)
  raise Error.new("Beetle::Client#listen no longer works with arguments. Please use #listen_queues(['queue1', 'queue2']) instead") if _deprecated_messages
  listen_queues(&block)
end

#listen_queues(*queues, &block) ⇒ Object

start listening to a list of queues (default to all registered queues). runs the given block before entering the eventmachine loop.



234
235
236
237
# File 'lib/beetle/client.rb', line 234

def listen_queues(*queues, &block)
  queues = determine_queue_names(queues)
  subscriber.listen_queues(queues, &block)
end

#load(glob) ⇒ Object

evaluate the ruby files matching the given glob pattern in the context of the client instance.



313
314
315
316
317
318
# File 'lib/beetle/client.rb', line 313

def load(glob)
  b = binding
  Dir[glob].each do |f|
    eval(File.read(f), b, f)
  end
end

#pause_listening(*queues) ⇒ Object

pause listening on a list of queues



251
252
253
254
# File 'lib/beetle/client.rb', line 251

def pause_listening(*queues)
  queues = determine_queue_names(queues)
  subscriber.pause_listening(queues)
end

#publish(message_name, data = nil, opts = {}) ⇒ Object

publishes a message. the given options hash is merged with options given on message registration. WARNING: empty message bodies can lead to problems.



206
207
208
209
# File 'lib/beetle/client.rb', line 206

def publish(message_name, data=nil, opts={})
  message_name = validated_message_name(message_name)
  publisher.publish(message_name, data, opts)
end

#purge(*queues) ⇒ Object

purges the given queues on all configured servers



220
221
222
223
# File 'lib/beetle/client.rb', line 220

def purge(*queues)
  queues = determine_queue_names(queues)
  publisher.purge(queues)
end

#register_binding(queue_name, options = {}) ⇒ Object

register an additional binding for an already configured queue name and an options hash:

:exchange

the name of the exchange this queue will be bound to (defaults to the name of the queue)

:key

the binding key (defaults to the name of the queue)

automatically registers the specified exchange if it hasn’t been registered yet



116
117
118
119
120
121
122
123
124
125
# File 'lib/beetle/client.rb', line 116

def register_binding(queue_name, options={})
  name = queue_name.to_s
  opts = options.symbolize_keys
  exchange = (opts[:exchange] || name).to_s
  key = (opts[:key] || name).to_s
  (bindings[name] ||= []) << {:exchange => exchange, :key => key}
  register_exchange(exchange) unless exchanges.include?(exchange)
  queues = exchanges[exchange][:queues]
  queues << name unless queues.include?(name)
end

#register_exchange(name, options = {}) ⇒ Object

register an exchange with the given name and a set of options:

:type

the type option will be overwritten and always be :topic, beetle does not allow fanout exchanges

:durable

the durable option will be overwritten and always be true. this is done to ensure that exchanges are never deleted

Raises:



77
78
79
80
81
# File 'lib/beetle/client.rb', line 77

def register_exchange(name, options={})
  name = name.to_s
  raise ConfigurationError.new("exchange #{name} already configured") if exchanges.include?(name)
  exchanges[name] = options.symbolize_keys.merge(:type => :topic, :durable => true, :queues => [])
end

#register_handler(queues, *args, &block) ⇒ Object

registers a handler for a list of queues (which must have been registered previously). The handler will be invoked when any messages arrive on the queue.

Examples:

register_handler([:foo, :bar], :timeout => 10.seconds) { |message| puts "received #{message}" }

on_error   = lambda{ puts "something went wrong with baz" }
on_failure = lambda{ puts "baz has finally failed" }

register_handler(:baz, :exceptions => 1, :errback => on_error, :failback => on_failure) { puts "received baz" }

register_handler(:bar, BarHandler)

For details on handler classes see class Beetle::Handler

Raises:

  • (ArgumentError)


161
162
163
164
165
166
167
# File 'lib/beetle/client.rb', line 161

def register_handler(queues, *args, &block)
  queues = determine_queue_names(Array(queues))
  opts = args.last.is_a?(Hash) ? args.pop : {}
  handler = args.shift
  raise ArgumentError.new("too many arguments for handler registration") unless args.empty?
  subscriber.register_handler(queues, opts, handler, &block)
end

#register_message(message_name, options = {}) ⇒ Object

register a persistent message with a given name and an options hash:

:key

specifies the routing key for message publishing (defaults to the name of the message)

:ttl

specifies the time interval after which the message will be silently dropped (seconds). defaults to Message::DEFAULT_TTL.

:redundant

specifies whether the message should be published redundantly (defaults to false)

Raises:



136
137
138
139
140
141
142
143
144
# File 'lib/beetle/client.rb', line 136

def register_message(message_name, options={})
  name = message_name.to_s
  raise ConfigurationError.new("message #{name} already configured") if messages.include?(name)
  opts = {:exchange => name, :key => name}.merge!(options.symbolize_keys)
  opts.merge! :persistent => true
  exchange = opts[:exchange] = opts[:exchange].to_s
  register_exchange(exchange) unless exchanges.include?(exchange)
  messages[name] = opts
end

#register_queue(name, options = {}) ⇒ Object

register a durable, non passive, non auto_deleted queue with the given name and an options hash:

:exchange

the name of the exchange this queue will be bound to (defaults to the name of the queue)

:key

the binding key (defaults to the name of the queue)

:lazy

whether the queue should use lazy mode (defaults to config.lazy_queues_enabled)

:dead_lettering

whether the queue should use dead lettering (defaults to config.dead_lettering_enabled)

automatically registers the specified exchange if it hasn’t been registered yet

Raises:



94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/beetle/client.rb', line 94

def register_queue(name, options={})
  name = name.to_s
  raise ConfigurationError.new("queue #{name} already configured") if queues.include?(name)
  opts = {
    :exchange => name, :key => name, :auto_delete => false, :amqp_name => name,
    :lazy => config.lazy_queues_enabled, :dead_lettering => config.dead_lettering_enabled,
    :dead_lettering_msg_ttl => config.dead_lettering_msg_ttl
  }.merge!(options.symbolize_keys)
  opts.merge! :durable => true, :passive => false, :exclusive => false
  exchange = opts.delete(:exchange).to_s
  key = opts.delete(:key)
  queues[name] = opts
  register_binding(name, :exchange => exchange, :key => key)
end

#resetObject



320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/beetle/client.rb', line 320

def reset
  stop_publishing
  stop_listening
  config.reload
  load_brokers_from_config
rescue Exception => e
  logger.warn("Error resetting client")
  logger.warn(e)
ensure
  @publisher = nil
  @subscriber = nil
end

#resume_listening(*queues) ⇒ Object

resume listening on a list of queues



257
258
259
260
# File 'lib/beetle/client.rb', line 257

def resume_listening(*queues)
  queues = determine_queue_names(queues)
  subscriber.resume_listening(queues)
end

#rpc(message_name, data = nil, opts = {}) ⇒ Object

sends the given message to one of the configured servers and returns the result of running the associated handler.

unexpected behavior can ensue if the message gets routed to more than one recipient, so be careful.



214
215
216
217
# File 'lib/beetle/client.rb', line 214

def rpc(message_name, data=nil, opts={})
  message_name = validated_message_name(message_name)
  publisher.rpc(message_name, data, opts)
end

#setup_queues_and_policiesObject

set up queues and policies for all configured queues. Otherwise this will happen on first use of an exchange, which can be undesired for latency sensitive endpoints. Only needs to be called once.



286
287
288
# File 'lib/beetle/client.rb', line 286

def setup_queues_and_policies
  publisher.setup_queues_and_policies
end

#stop_listeningObject

stops the subscriber by closing all channels and connections. note this an asynchronous operation due to the underlying eventmachine mechanism.



241
242
243
# File 'lib/beetle/client.rb', line 241

def stop_listening
  @subscriber.stop! if @subscriber
end

#stop_publishingObject

disconnects the publisher from all servers it’s currently connected to



246
247
248
# File 'lib/beetle/client.rb', line 246

def stop_publishing
  @publisher.stop if @publisher
end

#throttle(throttling_options) ⇒ Object

throttle publishing of messages based on queue lengths.

client.throttle(:foo => 10000, :bar => 10_000)

throttle publisher to 1 message per second as long as queue foo has more than 10000 entries or queue bar has more than 100000 entries across all servers. Queue lenghts are periodically determined during publishing. You only want to use this feature if you plan to publish a huge amount of messages to slow consumers so as to not overload the broker or the redis deduplication store. You’ll want to use this feature when running background jobs that publish huge amounts of messages to avoid overloading brokers and the message deduplication store.



274
275
276
# File 'lib/beetle/client.rb', line 274

def throttle(throttling_options)
  publisher.throttle(throttling_options.stringify_keys)
end

#throttled?Boolean

is the publisher currently throttled?

Returns:

  • (Boolean)


279
280
281
# File 'lib/beetle/client.rb', line 279

def throttled?
  publisher.throttled?
end

#trace(queue_names = self.queues.keys, tracer = nil, &block) ⇒ Object

traces queues without consuming them. useful for debugging message flow.



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/beetle/client.rb', line 291

def trace(queue_names=self.queues.keys, tracer=nil, &block)
  queues_to_trace = self.queues.slice(*queue_names)
  queues_to_trace.each do |name, opts|
    opts.merge! :durable => false, :auto_delete => true, :amqp_name => queue_name_for_tracing(opts[:amqp_name])
  end
  tracer ||=
    lambda do |msg|
      puts "-----===== new message =====-----"
      puts "SERVER: #{msg.server}"
      puts "HEADER: #{msg.header.attributes[:headers].inspect}"
      puts "EXCHANGE: #{msg.header.method.exchange}"
      puts "KEY: #{msg.header.method.routing_key}"
      puts "MSGID: #{msg.msg_id}"
      puts "DATA: #{msg.data}"
    end
  register_handler(queue_names){|msg| tracer.call msg }
  @subscriber.tracing = true
  listen_queues(queue_names, &block)
  @subscriber.tracing = false
end

#update_queue_properties!(message_payload) ⇒ Object



333
334
335
# File 'lib/beetle/client.rb', line 333

def update_queue_properties!(message_payload)
  @queue_properties.update_queue_properties!(message_payload)
end