Class: Concurrent::Actor::Utils::Broadcast

Inherits:
RestartingContext show all
Defined in:
lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb

Overview

Allows to build pub/sub easily.

Examples:

news

news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news

2.times do |i|
  Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do
    news_channel << :subscribe
    -> message { puts message }
  end
end

news_channel << 'Ruby rocks!'
# prints: 'Ruby rocks!' twice

Instance Attribute Summary

Attributes inherited from AbstractContext

#core

Instance Method Summary collapse

Methods inherited from RestartingContext

#behaviour_definition

Methods inherited from AbstractContext

#ask, #behaviour_definition, #dead_letter_routing, #default_executor, #default_reference_class, #envelope, #on_envelope, #on_event, #pass, spawn, spawn!, #tell

Methods included from InternalDelegations

#behaviour, #behaviour!, #children, #context, #dead_letter_routing, #log, #redirect, #terminate!, #terminated?

Methods included from PublicDelegations

#context_class, #executor, #name, #parent, #path, #reference

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initializeBroadcast

Returns a new instance of Broadcast.



23
24
25
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 23

def initialize
  @receivers = Set.new
end

Instance Method Details

#filtered_receiversObject

override to define different behaviour, filtering etc



46
47
48
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 46

def filtered_receivers
  @receivers
end

#on_message(message) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 27

def on_message(message)
  case message
  when :subscribe
    if envelope.sender.is_a? Reference
      @receivers.add envelope.sender
      true
    else
      false
    end
  when :unsubscribe
    !!@receivers.delete(envelope.sender)
  when :subscribed?
    @receivers.include? envelope.sender
  else
    filtered_receivers.each { |r| r << message }
  end
end