Class: Concurrent::Actor::Behaviour::Termination

Inherits:
Abstract
  • Object
show all
Defined in:
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb

Overview

Note:

Actor rejects envelopes when terminated.

Note:

TODO missing example

Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.

Instance Attribute Summary collapse

Attributes inherited from Abstract

#core, #subsequent

Instance Method Summary collapse

Methods inherited from Abstract

#broadcast, #on_event, #pass, #reject_envelope

Methods included from InternalDelegations

#behaviour, #behaviour!, #children, #context, #dead_letter_routing, #log, #redirect

Methods included from PublicDelegations

#context_class, #executor, #name, #parent, #path, #reference

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initialize(core, subsequent, core_options, trapping = false, terminate_children = true) ⇒ Termination

Returns a new instance of Termination.



17
18
19
20
21
22
23
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb', line 17

def initialize(core, subsequent, core_options, trapping = false, terminate_children = true)
  super core, subsequent, core_options
  @terminated         = Concurrent::Promises.resolvable_future
  @public_terminated  = @terminated.with_hidden_resolvable
  @trapping           = trapping
  @terminate_children = terminate_children
end

Instance Attribute Details

#terminatedObject (readonly)



15
16
17
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb', line 15

def terminated
  @terminated
end

Instance Method Details

#on_envelope(envelope) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb', line 39

def on_envelope(envelope)
  command, reason = envelope.message
  case command
  when :terminated?
    terminated?
  when :terminate!
    if trapping? && reason != :kill
      pass envelope
    else
      terminate! reason, envelope
    end
  when :termination_event
    @public_terminated
  else
    if terminated?
      reject_envelope envelope
      MESSAGE_PROCESSED
    else
      pass envelope
    end
  end
end

#terminate!(reason = nil, envelope = nil) ⇒ Object

Terminates the actor. Any Envelope received after termination is rejected. Terminates all its children, does not wait until they are terminated.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb', line 64

def terminate!(reason = nil, envelope = nil)
  return true if terminated?

  self_termination = Concurrent::Promises.resolved_future(reason.nil?, reason.nil? || nil, reason)
  all_terminations = if @terminate_children
                       Concurrent::Promises.zip(*children.map { |ch| ch.ask(:terminate!) }, self_termination)
                     else
                       self_termination
                     end

  all_terminations.chain_resolvable(@terminated)
  if envelope && envelope.future
    all_terminations.chain { |fulfilled, _, t_reason| envelope.future.resolve fulfilled, true, t_reason }
  end

  broadcast(true, [:terminated, reason]) # TODO do not end up in Dead Letter Router
  parent << :remove_child if parent

  MESSAGE_PROCESSED
end

#terminated?true, false

Note:

Actor rejects envelopes when terminated.

Returns if actor is terminated.

Returns:

  • (true, false)

    if actor is terminated



27
28
29
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb', line 27

def terminated?
  @terminated.resolved?
end

#trapping=(val) ⇒ Object



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb', line 35

def trapping=(val)
  @trapping = !!val
end

#trapping?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb', line 31

def trapping?
  @trapping
end