Module: Messaging::Routing

Included in:
Adapters::Kafka::Consumer, Adapters::Test::Consumer, Routes
Defined in:
lib/messaging/routing.rb,
lib/messaging/routing/route.rb,
lib/messaging/routing/enqueued_route.rb,
lib/messaging/routing/message_matcher.rb,
lib/messaging/routing/enqueue_message_handler.rb

Defined Under Namespace

Classes: EnqueueMessageHandler, EnqueuedRoute, MessageMatcher, Route

Instance Method Summary collapse

Instance Method Details

#clear_routes!Object

Internal: Used by Rails reloading in development.



53
54
55
# File 'lib/messaging/routing.rb', line 53

def clear_routes!
  @routes = Set.new
end

#handle(message) ⇒ Object

Internal: Handles the message with the matching subscribers



47
48
49
50
# File 'lib/messaging/routing.rb', line 47

def handle(message)
  routes.map { |route| route.call(message) }
  message
end

#on(pattern = /.*/, call: nil, enqueue: nil, &block) ⇒ Object

Public: Sets up routes for the events that matches the given pattern

pattern - Which messages to route. Can be a string, a regexp,

a Message class, a module or anything that responds to call.

call: - Any object that responds to call.

Will be called immediately for matching messages.

enqueue: - A constant that responds to call.

Will be enqueued with Sidekiq for matching messages.
Needs to be a constant that Sidekiq can serialize to a string
and back again to a constant as you can't store procs in Redis.

block - An optional block that will be called with each matching message.

Examples

Messaging.routes.draw do

on 'Events::BidPlaced', call: NotifyOtherBidders

on Events::BidPlaced, enqueue: NotifyOtherBidders

on Events, do |event|
  puts event.inspect
end

on /.*Updated$/, enqueue: AuditChanges

on ->(m) { m.topic == 'my-topic' }, call: DoSometing, enqueue: DoSomethingElseWithSidekiq

end



40
41
42
43
44
# File 'lib/messaging/routing.rb', line 40

def on(pattern = /.*/, call: nil, enqueue: nil, &block)
  routes << Route.new(pattern, call) if call
  routes << Route.new(pattern, block) if block_given?
  routes << EnqueuedRoute.new(pattern, enqueue) if enqueue
end