Class: Startback::Bus::Bunny::Async
- Inherits:
-
Object
- Object
- Startback::Bus::Bunny::Async
- Includes:
- Support::Robustness
- Defined in:
- lib/startback/bus/bunny/async.rb
Overview
Asynchronous implementation of the bus abstraction, on top of RabbitMQ and using the ‘bunny’ gem (you need to include it in your Gemfile yourself: it is NOT a startback official dependency).
This bus implementation emits events by dumping them to RabbitMQ using the event type as exchange name. Listeners may use the ‘processor` parameter to specify the queue name ; otherwise a default “main” queue is used.
Examples:
# Connects to RabbitMQ using all default options
#
# Uses the STARTBACK_BUS_BUNNY_ASYNC_URL environment variable for
# connection URL if present.
Startback::Bus::Bunny::Async.new
# Connects to RabbitMQ using a specific URL
Startback::Bus::Bunny::Async.new("amqp://rabbituser:[email protected]")
Startback::Bus::Bunny::Async.new(url: "amqp://rabbituser:[email protected]")
# Connects to RabbitMQ using specific connection options. See Bunny's own
# documentation
Startback::Bus::Bunny::Async.new({
connection_options: {
host: "192.168.17.17"
}
})
Constant Summary collapse
- DEFAULT_OPTIONS =
{ # (optional) The URL to use for connecting to RabbitMQ. url: ENV['STARTBACK_BUS_BUNNY_ASYNC_URL'], # (optional) The options has to pass to ::Bunny constructor connection_options: nil, # (optional) The options to use for the emitter/listener fanout fanout_options: {}, # (optional) The options to use for the listener queue queue_options: {}, # (optional) Default event factory to use, if any event_factory: nil, # (optional) A default context to use for general logging context: nil }
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #emit(event) ⇒ Object
-
#initialize(options = {}) ⇒ Async
constructor
Creates a bus instance, using the various options provided to fine-tune behavior.
- #listen(type, processor = nil, listener = nil, &bl) ⇒ Object
Methods included from Support::Robustness
#log, #monitor, #stop_errors, #try_max_times
Constructor Details
#initialize(options = {}) ⇒ Async
Creates a bus instance, using the various options provided to fine-tune behavior.
60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/startback/bus/bunny/async.rb', line 60 def initialize( = {}) = { url: } if .is_a?(String) @options = DEFAULT_OPTIONS.merge() retried = 0 conn = [:connection_options] || [:url] try_max_times(10) do @bunny = ::Bunny.new(conn) @bunny.start @channel = @bunny.create_channel log(:info, {op: "#{self.class.name}#connect", op_data: conn}, [:context]) end end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
72 73 74 |
# File 'lib/startback/bus/bunny/async.rb', line 72 def channel @channel end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
72 73 74 |
# File 'lib/startback/bus/bunny/async.rb', line 72 def @options end |
Instance Method Details
#emit(event) ⇒ Object
74 75 76 77 78 79 |
# File 'lib/startback/bus/bunny/async.rb', line 74 def emit(event) stop_errors(self, "emit", event.context) do fanout = channel.fanout(event.type.to_s, ) fanout.publish(event.to_json) end end |
#listen(type, processor = nil, listener = nil, &bl) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/startback/bus/bunny/async.rb', line 81 def listen(type, processor = nil, listener = nil, &bl) raise ArgumentError, "A listener must be provided" unless listener || bl fanout = channel.fanout(type.to_s, ) queue = channel.queue((processor || "main").to_s, ) queue.bind(fanout) queue.subscribe do |delivery_info, properties, body| event = stop_errors(self, "listen") do factor_event(body) end stop_errors(self, "listen", event.context) do (listener || bl).call(event) end end end |