Class: Concurrent::Actor::Behaviour::Buffer

Inherits:
Abstract
  • Object
show all
Defined in:
lib/concurrent/actor/behaviour/buffer.rb

Overview

Any message reaching this behaviour is buffered. Only one message is is scheduled at any given time. Others are kept in buffer until another one can be scheduled. This effective means that messages handled by behaviours before buffer have higher priority and they can be processed before messages arriving into buffer. This allows to process internal actor messages like (‘:link`, `:supervise`) processed first.

Instance Attribute Summary

Attributes inherited from Abstract

#core, #subsequent

Instance Method Summary collapse

Methods inherited from Abstract

#broadcast, #pass, #reject_envelope

Methods included from InternalDelegations

#behaviour, #behaviour!, #children, #context, #dead_letter_routing, #log, #redirect, #terminate!

Methods included from PublicDelegations

#context_class, #executor, #name, #parent, #path, #reference

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initialize(core, subsequent) ⇒ Buffer

Returns a new instance of Buffer.



11
12
13
14
15
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 11

def initialize(core, subsequent)
  super core, subsequent
  @buffer                     = []
  @receive_envelope_scheduled = false
end

Instance Method Details

#on_envelope(envelope) ⇒ Object



17
18
19
20
21
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 17

def on_envelope(envelope)
  @buffer.push envelope
  process_envelopes?
  MESSAGE_PROCESSED
end

#on_event(event) ⇒ Object



43
44
45
46
47
48
49
50
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 43

def on_event(event)
  case event
  when :terminated, :restarted
    @buffer.each { |envelope| reject_envelope envelope }
    @buffer.clear
  end
  super event
end

#process_envelopeObject



34
35
36
37
38
39
40
41
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 34

def process_envelope
  envelope = @buffer.shift
  return nil unless envelope
  pass envelope
ensure
  @receive_envelope_scheduled = false
  core.schedule_execution { process_envelopes? }
end

#process_envelopes?Boolean

Ensures that only one envelope processing is scheduled with #schedule_execution, this allows other scheduled blocks to be executed before next envelope processing. Simply put this ensures that Core is still responsive to internal calls (like add_child) even though the Actor is flooded with messages.

Returns:

  • (Boolean)


27
28
29
30
31
32
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 27

def process_envelopes?
  unless @buffer.empty? || @receive_envelope_scheduled
    @receive_envelope_scheduled = true
    process_envelope
  end
end