Module: Concurrent::Actor::Behaviour
- Defined in:
- lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb
Overview
Actors have modular architecture, which is achieved by combining a light core with chain of behaviours. Each message or internal event propagates through the chain allowing the behaviours react based on their responsibility.
-
> Links the actor to other actors and sends actor’s events to them, like: ‘:terminated`, `:paused`, `:resumed`, errors, etc. Linked actor needs to handle those messages.
<span class='rubyid_listener identifier id'>listener</span> <span class='assign token'>=</span> <span class='rubyid_AdHoc constant id'>AdHoc</span><span class='dot token'>.</span><span class='rubyid_spawn identifier id'>spawn</span> <span class='label val'>name:</span> <span class='symbol val'>:listener</span> <span class='rubyid_do do kw'>do</span> <span class='rubyid_lambda identifier id'>lambda</span> <span class='rubyid_do do kw'>do</span> <span class='bitor op'>|</span><span class='rubyid_message identifier id'>message</span><span class='bitor op'>|</span> <span class='rubyid_case case kw'>case</span> <span class='rubyid_message identifier id'></span> <span class='rubyid_when when kw'>when</span> <span class='rubyid_Reference constant id'>Reference</span> <span class='rubyid_if if kw'>if</span> <span class='rubyid_message identifier id'></span><span class='dot token'>.</span><span class='rubyid_ask! fid id'>ask!</span><span class='lparen token'>(</span><span class='symbol val'>:linked?</span><span class='rparen token'>)</span> <span class='rubyid_message identifier id'></span> <span class='lshft op'><<</span> <span class='symbol val'>:unlink</span> <span class='rubyid_else else kw'>else</span> <span class='rubyid_message identifier id'></span> <span class='lshft op'><<</span> <span class='symbol val'>:link</span> <span class='rubyid_end end kw'>end</span> <span class='rubyid_else else kw'>else</span> <span class='rubyid_puts identifier id'>puts</span> <span class='dstring node'>"got event #{message.inspect} from #{envelope.sender}"</span> <span class='rubyid_end end kw'>end</span> <span class='rubyid_end end kw'>end</span> <span class='rubyid_end end kw'>end</span> <span class='rubyid_an_actor identifier id'>an_actor</span> <span class='assign token'>=</span> <span class='rubyid_AdHoc constant id'>AdHoc</span><span class='dot token'>.</span><span class='rubyid_spawn identifier id'>spawn</span> <span class='label val'>name:</span> <span class='symbol val'>:an_actor</span><span class='comma token'>,</span> <span class='label val'>supervise:</span> <span class='rubyid_true true kw'>true</span><span class='comma token'>,</span> <span class='label val'>behaviour_definition:</span> <span class='rubyid_Behaviour constant id'>Behaviour</span><span class='dot token'>.</span><span class='rubyid_restarting_behaviour_definition identifier id'>restarting_behaviour_definition</span> <span class='rubyid_do do kw'>do</span> <span class='rubyid_lambda identifier id'>lambda</span> <span class='lbrace token'>{</span> <span class='bitor op'>|</span><span class='rubyid_message identifier id'>message</span><span class='bitor op'>|</span> <span class='rubyid_raise identifier id'>raise</span> <span class='string val'>'failed'</span><span class='rbrace token'>}</span> <span class='rubyid_end end kw'>end</span> <span class='comment val'># link the actor</span> <span class='rubyid_listener identifier id'>listener</span><span class='dot token'>.</span><span class='rubyid_ask identifier id'>ask</span><span class='lparen token'>(</span><span class='rubyid_an_actor identifier id'>an_actor</span><span class='rparen token'>)</span><span class='dot token'>.</span><span class='rubyid_wait identifier id'>wait</span> <span class='rubyid_an_actor identifier id'>an_actor</span><span class='dot token'>.</span><span class='rubyid_ask identifier id'>ask</span><span class='lparen token'>(</span><span class='symbol val'>:fail</span><span class='rparen token'>)</span><span class='dot token'>.</span><span class='rubyid_wait identifier id'>wait</span> <span class='comment val'># unlink the actor</span> <span class='rubyid_listener identifier id'>listener</span><span class='dot token'>.</span><span class='rubyid_ask identifier id'>ask</span><span class='lparen token'>(</span><span class='rubyid_an_actor identifier id'>an_actor</span><span class='rparen token'>)</span><span class='dot token'>.</span><span class='rubyid_wait identifier id'>wait</span> <span class='rubyid_an_actor identifier id'>an_actor</span><span class='dot token'>.</span><span class='rubyid_ask identifier id'>ask</span><span class='lparen token'>(</span><span class='symbol val'>:fail</span><span class='rparen token'>)</span><span class='dot token'>.</span><span class='rubyid_wait identifier id'>wait</span> <span class='rubyid_an_actor identifier id'>an_actor</span> <span class='lshft op'><<</span> <span class='symbol val'>:terminate!</span>produces only two events, other events happened after unlinking
<span class='rubyid_got identifier id'>got</span> <span class='rubyid_event identifier id'>event</span> <span class='comment val'>#<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)></span> <span class='rubyid_got identifier id'>got</span> <span class='rubyid_event identifier id'>event</span> <span class='symbol val'>:reset</span> <span class='rubyid_from identifier id'>from</span> <span class='comment val'>#<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)></span> -
> Accepts ‘:await` messages. Which allows to wait on Actor to process all previously send messages.
<span class='rubyid_actor identifier id'>actor</span> <span class='lshft op'><<</span> <span class='symbol val'>:a</span> <span class='lshft op'><<</span> <span class='symbol val'>:b</span> <span class='rubyid_actor identifier id'>actor</span><span class='dot token'>.</span><span class='rubyid_ask identifier id'>ask</span><span class='lparen token'>(</span><span class='symbol val'>:await</span><span class='rparen token'>)</span><span class='dot token'>.</span><span class='rubyid_wait identifier id'>wait</span> <span class='comment val'># blocks until :a and :b are processed</span> -
> Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context.
-
> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).
-
> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).
-
> Delegates messages and events to AbstractContext instance.
-
> Simply fails when message arrives here. It’s usually the last behaviour.
-
> Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.
-
> Removes terminated children.
If needed new behaviours can be added, or old one removed to get required behaviour.
-
Context uses Array of behaviours and their construction parameters.
<span class='lbrack token'>[</span><span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_SetResults constant id'>SetResults</span><span class='comma token'>,</span> <span class='symbol val'>:terminate!</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_RemovesChild constant id'>RemovesChild</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Termination constant id'>Termination</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Linking constant id'>Linking</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Awaits constant id'>Awaits</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_ExecutesContext constant id'>ExecutesContext</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_ErrorsOnUnknownMessage constant id'>ErrorsOnUnknownMessage</span><span class='rbrack token'>]</span><span class='rbrack token'>]</span> -
RestartingContext uses Array of behaviours and their construction parameters.
<span class='lbrack token'>[</span><span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_SetResults constant id'>SetResults</span><span class='comma token'>,</span> <span class='symbol val'>:pause!</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_RemovesChild constant id'>RemovesChild</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Termination constant id'>Termination</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Linking constant id'>Linking</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Pausing constant id'>Pausing</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Supervising constant id'>Supervising</span><span class='comma token'>,</span> <span class='symbol val'>:reset!</span><span class='comma token'>,</span> <span class='symbol val'>:one_for_one</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_Awaits constant id'>Awaits</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_ExecutesContext constant id'>ExecutesContext</span><span class='rbrack token'>]</span><span class='comma token'>,</span> <span class='lbrack token'>[</span><span class='rubyid_Behaviour constant id'>Behaviour</span><span class='colon2 op'>::</span><span class='rubyid_ErrorsOnUnknownMessage constant id'>ErrorsOnUnknownMessage</span><span class='rbrack token'>]</span><span class='rbrack token'>]</span>
Defined Under Namespace
Classes: Abstract, Awaits, Buffer, ErrorsOnUnknownMessage, ExecutesContext, Linking, Pausing, RemovesChild, SetResults, Supervising, Termination
Constant Summary collapse
- MESSAGE_PROCESSED =
::Object.new
Class Method Summary collapse
- .base(on_error) ⇒ Object
-
.basic_behaviour_definition ⇒ Object
Array of behaviours and their construction parameters.
- .linking ⇒ Object
-
.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) ⇒ Object
Array of behaviours and their construction parameters.
- .supervised ⇒ Object
- .supervising(handle = :reset!, strategy = :one_for_one) ⇒ Object
- .user_messages ⇒ Object
Class Method Details
.base(on_error) ⇒ Object
105 106 107 108 109 110 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 105 def self.base(on_error) [[SetResults, on_error], # has to be before Termination to be able to remove children from terminated actor RemovesChild, Termination] end |
.basic_behaviour_definition ⇒ Object
Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :terminate!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
77 78 79 80 81 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 77 def self.basic_behaviour_definition [*base(:terminate!), *linking, *] end |
.linking ⇒ Object
113 114 115 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 113 def self.linking [Linking] end |
.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) ⇒ Object
Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :pause!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Pausing],
[Behaviour::Supervising, :reset!, :one_for_one],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
96 97 98 99 100 101 102 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 96 def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) [*base(:pause!), *linking, *supervised, *supervising(handle, strategy), *] end |
.supervised ⇒ Object
118 119 120 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 118 def self.supervised [Pausing] end |
.supervising(handle = :reset!, strategy = :one_for_one) ⇒ Object
123 124 125 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 123 def self.supervising(handle = :reset!, strategy = :one_for_one) [[Behaviour::Supervising, handle, strategy]] end |
.user_messages ⇒ Object
128 129 130 131 132 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 128 def self. [Awaits, ExecutesContext, ErrorsOnUnknownMessage] end |