Class: Vissen::Input::Broker
- Inherits:
-
Object
- Object
- Vissen::Input::Broker
- Defined in:
- lib/vissen/input/broker.rb
Overview
Message broker that consumes a stream of messages and exposes a simple subscription interface.
== Usage This example subscribes to note messages on channel 1 and calls the fictitious method play when a matching message is published and processed.
broker = Broker.new broker.subscribe Message::Note[1], priority: 1 do |message| play message.note end
message = Message::Note.create 42, channel: 1 broker.publish message broker.run_once
The next example sets up two different priority listeners, one of which blocks the other for some messages.
broker = Broker.new broker.subscribe Message::Note[1], priority: 1 do |message| play message.note end
broker.subscribe Message::Note[1], priority: 2 do |message, ctrl| ctrl.stop! if message.note % 2 == 0 end
Defined Under Namespace
Classes: PropagationControl
Instance Method Summary collapse
-
#call(message, ctrl) ⇒ nil
Processes one message.
-
#initialize ⇒ Broker
constructor
A new instance of Broker.
-
#publish(*message) ⇒ Object
Insert a new message into the message queue.
-
#run_once ⇒ true, false
Takes one message from the message queue and handles it.
-
#subscribe(matcher, handler = nil, priority: 0, &block) ⇒ Subscription
Register a callback for the broker to run when a message matched by the given matcher is published.
-
#unsubscribe(subscription) ⇒ Subscription?
Removes the given subscription.
Constructor Details
#initialize ⇒ Broker
34 35 36 37 |
# File 'lib/vissen/input/broker.rb', line 34 def initialize @subscriptions = [] @message_queue = Queue.new end |
Instance Method Details
#call(message, ctrl) ⇒ nil
Processes one message. By design, implementing this method allows for multiple brokers being chained.
101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/vissen/input/broker.rb', line 101 def call(, ctrl) # TODO: Remap the message if needed. @subscriptions.each do |subscription| break if ctrl.stop?(subscription.priority) subscription.match do |msg| subscription.handle msg, ctrl = msg end end nil end |
#publish(*message) ⇒ Object
Insert a new message into the message queue. The message is handled at a
later time in #run_once
.
78 79 80 81 82 |
# File 'lib/vissen/input/broker.rb', line 78 def publish(*) .each do |m| @message_queue.push m end end |
#run_once ⇒ true, false
Takes one message from the message queue and handles it.
88 89 90 91 92 93 |
# File 'lib/vissen/input/broker.rb', line 88 def run_once return false if @message_queue.empty? ctrl = PropagationControl.new call @message_queue.shift, ctrl true end |
#subscribe(matcher, handler = nil, priority: 0, &block) ⇒ Subscription
Register a callback for the broker to run when a message matched by the given matcher is published.
By specifying a priority a subscriber added after another can still be
handled at an earlier time. The handler can either be specified as an
object responding to #call
or as a block.
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/vissen/input/broker.rb', line 54 def subscribe(matcher, handler = nil, priority: 0, &block) if block_given? raise ArgumentError if handler handler = block else raise ArgumentError unless handler end insert_subscription Subscription.new(matcher, handler, priority) end |
#unsubscribe(subscription) ⇒ Subscription?
Removes the given subscription.
70 71 72 |
# File 'lib/vissen/input/broker.rb', line 70 def unsubscribe(subscription) @subscriptions.delete subscription end |