Class: Startback::Bus::Bunny::Async

Inherits:
Object
  • Object
show all
Includes:
Support::Robustness
Defined in:
lib/startback/bus/bunny/async.rb

Overview

Asynchronous implementation of the bus abstraction, on top of RabbitMQ and using the ‘bunny’ gem (you need to include it in your Gemfile yourself: it is NOT a startback official dependency).

This bus implementation emits events by dumping them to RabbitMQ using the event type as exchange name. Listeners may use the ‘processor` parameter to specify the queue name ; otherwise a default “main” queue is used.

Examples:

# Connects to RabbitMQ using all default options
#
# Uses the STARTBACK_BUS_BUNNY_ASYNC_URL environment variable for
# connection URL if present.
Startback::Bus::Bunny::Async.new

# Connects to RabbitMQ using a specific URL
Startback::Bus::Bunny::Async.new("amqp://rabbituser:[email protected]")
Startback::Bus::Bunny::Async.new(url: "amqp://rabbituser:[email protected]")

# Connects to RabbitMQ using specific connection options. See Bunny's own
# documentation
Startback::Bus::Bunny::Async.new({
  connection_options: {
    host: "192.168.17.17"
  }
})

Constant Summary collapse

DEFAULT_OPTIONS =
{
  # (optional) The URL to use for connecting to RabbitMQ.
  url: ENV['STARTBACK_BUS_BUNNY_ASYNC_URL'],

  # (optional) The options has to pass to ::Bunny constructor
  connection_options: nil,

  # (optional) The options to use for the emitter/listener fanout
  fanout_options: {},

  # (optional) The options to use for the listener queue
  queue_options: {},

  # (optional) Default event factory to use, if any
  event_factory: nil,

  # (optional) A default context to use for general logging
  context: nil
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Support::Robustness

#log, #monitor, #stop_errors, #try_max_times

Constructor Details

#initialize(options = {}) ⇒ Async

Creates a bus instance, using the various options provided to fine-tune behavior.



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/startback/bus/bunny/async.rb', line 60

def initialize(options = {})
  options = { url: options } if options.is_a?(String)
  @options = DEFAULT_OPTIONS.merge(options)
  retried = 0
  conn = options[:connection_options] || options[:url]
  try_max_times(10) do
    @bunny = ::Bunny.new(conn)
    @bunny.start
    @channel = @bunny.create_channel
    log(:info, {op: "#{self.class.name}#connect", op_data: conn}, options[:context])
  end
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



72
73
74
# File 'lib/startback/bus/bunny/async.rb', line 72

def channel
  @channel
end

#optionsObject (readonly)

Returns the value of attribute options.



72
73
74
# File 'lib/startback/bus/bunny/async.rb', line 72

def options
  @options
end

Instance Method Details

#emit(event) ⇒ Object



74
75
76
77
78
79
# File 'lib/startback/bus/bunny/async.rb', line 74

def emit(event)
  stop_errors(self, "emit", event.context) do
    fanout = channel.fanout(event.type.to_s, fanout_options)
    fanout.publish(event.to_json)
  end
end

#listen(type, processor = nil, listener = nil, &bl) ⇒ Object

Raises:

  • (ArgumentError)


81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/startback/bus/bunny/async.rb', line 81

def listen(type, processor = nil, listener = nil, &bl)
  raise ArgumentError, "A listener must be provided" unless listener || bl
  fanout = channel.fanout(type.to_s, fanout_options)
  queue = channel.queue((processor || "main").to_s, queue_options)
  queue.bind(fanout)
  queue.subscribe do |delivery_info, properties, body|
    event = stop_errors(self, "listen") do
      factor_event(body)
    end
    stop_errors(self, "listen", event.context) do
      (listener || bl).call(event)
    end
  end
end