Class: Vissen::Input::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/vissen/input/broker.rb

Overview

Message broker that consumes a stream of messages and exposes a simple subscription interface.

== Usage This example subscribes to note messages on channel 1 and calls the fictitious method play when a matching message is published and processed.

broker = Broker.new broker.subscribe Message::Note[1], priority: 1 do |message| play message.note end

message = Message::Note.create 42, channel: 1 broker.publish message broker.run_once

The next example sets up two different priority listeners, one of which blocks the other for some messages.

broker = Broker.new broker.subscribe Message::Note[1], priority: 1 do |message| play message.note end

broker.subscribe Message::Note[1], priority: 2 do |message, ctrl| ctrl.stop! if message.note % 2 == 0 end

Defined Under Namespace

Classes: PropagationControl

Instance Method Summary collapse

Constructor Details

#initializeBroker



34
35
36
37
# File 'lib/vissen/input/broker.rb', line 34

def initialize
  @subscriptions = []
  @message_queue = Queue.new
end

Instance Method Details

#call(message, ctrl) ⇒ nil

Processes one message. By design, implementing this method allows for multiple brokers being chained.



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/vissen/input/broker.rb', line 101

def call(message, ctrl)
  # TODO: Remap the message if needed.
  @subscriptions.each do |subscription|
    break if ctrl.stop?(subscription.priority)

    subscription.match message do |msg|
      subscription.handle msg, ctrl
      message = msg
    end
  end
  nil
end

#publish(*message) ⇒ Object

Insert a new message into the message queue. The message is handled at a later time in #run_once.



78
79
80
81
82
# File 'lib/vissen/input/broker.rb', line 78

def publish(*message)
  message.each do |m|
    @message_queue.push m
  end
end

#run_oncetrue, false

Takes one message from the message queue and handles it.



88
89
90
91
92
93
# File 'lib/vissen/input/broker.rb', line 88

def run_once
  return false if @message_queue.empty?
  ctrl = PropagationControl.new
  call @message_queue.shift, ctrl
  true
end

#subscribe(matcher, handler = nil, priority: 0, &block) ⇒ Subscription

Register a callback for the broker to run when a message matched by the given matcher is published.

By specifying a priority a subscriber added after another can still be handled at an earlier time. The handler can either be specified as an object responding to #call or as a block.



54
55
56
57
58
59
60
61
62
63
# File 'lib/vissen/input/broker.rb', line 54

def subscribe(matcher, handler = nil, priority: 0, &block)
  if block_given?
    raise ArgumentError if handler
    handler = block
  else
    raise ArgumentError unless handler
  end

  insert_subscription Subscription.new(matcher, handler, priority)
end

#unsubscribe(subscription) ⇒ Subscription?

Removes the given subscription.



70
71
72
# File 'lib/vissen/input/broker.rb', line 70

def unsubscribe(subscription)
  @subscriptions.delete subscription
end