Class: Aggro::EventBus

Inherits:
Object
  • Object
show all
Defined in:
lib/aggro/event_bus.rb

Overview

Public: Publishes events to any subscribed listeners.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventBus

Returns a new instance of EventBus.



6
7
8
# File 'lib/aggro/event_bus.rb', line 6

def initialize
  @remote_publishers = {}
end

Instance Attribute Details

#remote_publishersObject (readonly)

Returns the value of attribute remote_publishers.



4
5
6
# File 'lib/aggro/event_bus.rb', line 4

def remote_publishers
  @remote_publishers
end

Instance Method Details

#publish(topic, event) ⇒ Object



10
11
12
13
14
15
16
17
18
19
# File 'lib/aggro/event_bus.rb', line 10

def publish(topic, event)
  Aggro.server.publish Message::Events.new(topic, [event])

  return unless subscriptions.key? topic

  subscriptions[topic].each do |subscription|
    sleep 0.01 until subscription.caught_up
    subscription.handle_event event
  end
end

#shutdownObject



39
40
41
# File 'lib/aggro/event_bus.rb', line 39

def shutdown
  remote_publishers.values.each(&:stop)
end

#subscribe(topic, subscriber, event_namespace = nil, filters = {}) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/aggro/event_bus.rb', line 21

def subscribe(topic, subscriber, event_namespace = nil, filters = {})
  subscription = Subscription.new(topic, subscriber, event_namespace,
                                  filters, 0)

  catchup_subscriber topic, subscription

  subscriptions[topic] ||= []
  subscriptions[topic] << subscription

  subscribe_bus_to_publisher topic

  subscription
end

#unsubscribe(topic, subscriber) ⇒ Object



35
36
37
# File 'lib/aggro/event_bus.rb', line 35

def unsubscribe(topic, subscriber)
  subscriptions[topic].delete subscriber
end