Class: ActionPubsub::Balancer

Inherits:
Concurrent::Actor::Utils::Balancer
  • Object
show all
Defined in:
lib/action_pubsub/balancer.rb

Direct Known Subclasses

Queue

Instance Method Summary collapse

Constructor Details

#initializeBalancer

Returns a new instance of Balancer.



4
5
6
7
# File 'lib/action_pubsub/balancer.rb', line 4

def initialize
  @receivers = []
  @buffer    = []
end

Instance Method Details

#dead_letter_routingObject



33
34
35
# File 'lib/action_pubsub/balancer.rb', line 33

def dead_letter_routing
  ::ActionPubsub.silent_dead_letter_handler
end

#distributeObject



27
28
29
30
31
# File 'lib/action_pubsub/balancer.rb', line 27

def distribute
  while !@receivers.empty? && !@buffer.empty?
    redirect @receivers.shift, @buffer.shift
  end
end

#on_message(message) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/action_pubsub/balancer.rb', line 9

def on_message(message)
  case message
  when :subscribe
    @receivers << envelope.sender
    distribute
    true
  when :unsubscribe
    @receivers.delete envelope.sender
    true
  when :subscribed?
    @receivers.include? envelope.sender
  else
    @buffer << envelope
    distribute
    ::Concurrent::Actor::Behaviour::MESSAGE_PROCESSED
  end
end