Class: Concurrent::Actor::Behaviour::Buffer
- Defined in:
- lib/concurrent/actor/behaviour/buffer.rb
Overview
Any message reaching this behaviour is buffered. Only one message is is scheduled at any given time. Others are kept in buffer until another one can be scheduled. This effective means that messages handled by behaviours before buffer have higher priority and they can be processed before messages arriving into buffer. This allows to process internal actor messages like (‘:link`, `:supervise`) processed first.
Instance Attribute Summary
Attributes inherited from Abstract
Instance Method Summary collapse
-
#initialize(core, subsequent) ⇒ Buffer
constructor
A new instance of Buffer.
- #on_envelope(envelope) ⇒ Object
- #on_event(event) ⇒ Object
- #process_envelope ⇒ Object
-
#process_envelopes? ⇒ Boolean
Ensures that only one envelope processing is scheduled with #schedule_execution, this allows other scheduled blocks to be executed before next envelope processing.
Methods inherited from Abstract
#broadcast, #pass, #reject_envelope
Methods included from InternalDelegations
#behaviour, #behaviour!, #children, #context, #dead_letter_routing, #log, #redirect, #terminate!
Methods included from PublicDelegations
#context_class, #executor, #name, #parent, #path, #reference
Methods included from TypeCheck
#Child!, #Child?, #Match!, #Match?, #Type!, #Type?
Constructor Details
#initialize(core, subsequent) ⇒ Buffer
Returns a new instance of Buffer.
11 12 13 14 15 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 11 def initialize(core, subsequent) super core, subsequent @buffer = [] @receive_envelope_scheduled = false end |
Instance Method Details
#on_envelope(envelope) ⇒ Object
17 18 19 20 21 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 17 def on_envelope(envelope) @buffer.push envelope process_envelopes? MESSAGE_PROCESSED end |
#on_event(event) ⇒ Object
43 44 45 46 47 48 49 50 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 43 def on_event(event) case event when :terminated, :restarted @buffer.each { |envelope| reject_envelope envelope } @buffer.clear end super event end |
#process_envelope ⇒ Object
34 35 36 37 38 39 40 41 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 34 def process_envelope envelope = @buffer.shift return nil unless envelope pass envelope ensure @receive_envelope_scheduled = false core.schedule_execution { process_envelopes? } end |
#process_envelopes? ⇒ Boolean
Ensures that only one envelope processing is scheduled with #schedule_execution, this allows other scheduled blocks to be executed before next envelope processing. Simply put this ensures that Core is still responsive to internal calls (like add_child) even though the Actor is flooded with messages.
27 28 29 30 31 32 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 27 def process_envelopes? unless @buffer.empty? || @receive_envelope_scheduled @receive_envelope_scheduled = true process_envelope end end |