Module: RSMP::Distributor

Includes:
Inspect
Included in:
Proxy
Defined in:
lib/rsmp/collect/distributor.rb

Overview

Class which distributes messages to receivers

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Inspect

#inspector

Instance Attribute Details

#receiversObject (readonly)

Returns the value of attribute receivers.



4
5
6
# File 'lib/rsmp/collect/distributor.rb', line 4

def receivers
  @receivers
end

Instance Method Details

#add_receiver(receiver) ⇒ Object

Raises:

  • (ArgumentError)


38
39
40
41
42
# File 'lib/rsmp/collect/distributor.rb', line 38

def add_receiver(receiver)
  raise ArgumentError unless receiver

  @receivers << receiver unless @receivers.include? receiver
end

#clear_deferred_distributionObject



18
19
20
# File 'lib/rsmp/collect/distributor.rb', line 18

def clear_deferred_distribution
  @deferred_messages = []
end

#distribute(message) ⇒ Object

Raises:

  • (ArgumentError)


50
51
52
53
54
55
56
57
58
# File 'lib/rsmp/collect/distributor.rb', line 50

def distribute(message)
  raise ArgumentError unless message

  if @defer_distribution
    @deferred_messages << message
  else
    distribute_immediately message
  end
end

#distribute_error(error, options = {}) ⇒ Object



64
65
66
# File 'lib/rsmp/collect/distributor.rb', line 64

def distribute_error(error, options = {})
  @receivers.each { |receiver| receiver.receive_error error, options }
end

#distribute_immediately(message) ⇒ Object



60
61
62
# File 'lib/rsmp/collect/distributor.rb', line 60

def distribute_immediately(message)
  @receivers.each { |receiver| receiver.receive message }
end

#distribute_queuedObject



32
33
34
35
36
# File 'lib/rsmp/collect/distributor.rb', line 32

def distribute_queued
  @deferred_messages.each { |message| distribute_immediately message }
ensure
  @deferred_messages = []
end

#initialize_distributorObject



12
13
14
15
16
# File 'lib/rsmp/collect/distributor.rb', line 12

def initialize_distributor
  @receivers = []
  @defer_distribution = false
  @deferred_messages = []
end

#inspectObject



8
9
10
# File 'lib/rsmp/collect/distributor.rb', line 8

def inspect
  "#<#{self.class.name}:#{object_id}, #{inspector(:@receivers)}>"
end

#remove_receiver(receiver) ⇒ Object

Raises:

  • (ArgumentError)


44
45
46
47
48
# File 'lib/rsmp/collect/distributor.rb', line 44

def remove_receiver(receiver)
  raise ArgumentError unless receiver

  @receivers.delete receiver
end

#with_deferred_distributionObject



22
23
24
25
26
27
28
29
30
# File 'lib/rsmp/collect/distributor.rb', line 22

def with_deferred_distribution
  was = @defer_distribution
  @defer_distribution = true
  yield
  distribute_queued
ensure
  @defer_distribution = was
  @deferred_messages = []
end