Class: Concurrent::Actor::Core

Inherits:
Object
  • Object
show all
Includes:
TypeCheck, Logging, Synchronization
Defined in:
lib/concurrent/actor/core.rb

Overview

Note:

Whole class should be considered private. An user should use Contexts and References only.

Note:

devel: core should not block on anything, e.g. it cannot wait on children to terminate that would eat up all threads in task pool and deadlock

Core of the actor

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initialize(opts = {}, &block) ⇒ Core

Returns a new instance of Core.

Parameters:

  • block (Proc)

    for class instantiation

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • name (String)
  • parent (Reference, nil)

    of an actor spawning this one

  • reference (Class)

    a custom descendant of Reference to use

  • actor_class (Context)

    a class to be instantiated defining Actor’s behaviour

  • args (Array<Object>)

    arguments for actor_class instantiation

  • executor, (Executor)

    default is ‘Concurrent.configuration.global_task_pool`

  • link, (true, false)

    atomically link the actor to its parent

  • supervise, (true, false)

    atomically supervise the actor by its parent

  • behaviour_definition, (Array<Array(Behavior::Abstract, Array<Object>)>)

    array of pairs where each pair is behaviour class and its args, see Behaviour.basic_behaviour_definition

  • initialized, (IVar, nil)

    if present it’ll be set or failed after Concurrent::Actor::Context initialization

  • logger (Proc, nil)

    a proc accepting (level, progname, message = nil, &block) params, can be used to hook actor instance to any logging system



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/concurrent/actor/core.rb', line 45

def initialize(opts = {}, &block)
  synchronize do
    @mailbox              = Array.new
    @serialized_execution = SerializedExecution.new
    @children             = Set.new

    @context_class = Child! opts.fetch(:class), AbstractContext
    allocate_context

    @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor
    raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor

    @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
    @name      = (Type! opts.fetch(:name), String, Symbol).to_s

    parent       = opts[:parent]
    @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
    if @parent_core.nil? && @name != '/'
      raise 'only root has no parent'
    end

    @path   = @parent_core ? File.join(@parent_core.path, @name) : @name
    @logger = opts[:logger]

    @parent_core.add_child reference if @parent_core

    initialize_behaviours opts

    @args       = opts.fetch(:args, [])
    @block      = block
    initialized = Type! opts[:initialized], IVar, NilClass

    messages = []
    messages << :link if opts[:link]
    messages << :supervise if opts[:supervise]

    schedule_execution do
      begin
        build_context

        messages.each do |message|
          handle_envelope Envelope.new(message, nil, parent, reference)
        end

        initialized.set reference if initialized
      rescue => ex
        log ERROR, ex
        @first_behaviour.terminate!
        initialized.fail ex if initialized
      end
    end
  end
end

Instance Attribute Details

#actor_classContext (readonly)

Returns a class including Concurrent::Actor::Context representing Actor’s behaviour.

Returns:



29
# File 'lib/concurrent/actor/core.rb', line 29

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#behaviour_definitionObject (readonly)

Returns the value of attribute behaviour_definition.



29
30
31
# File 'lib/concurrent/actor/core.rb', line 29

def behaviour_definition
  @behaviour_definition
end

#contextObject (readonly)

Returns the value of attribute context.



29
30
31
# File 'lib/concurrent/actor/core.rb', line 29

def context
  @context
end

#context_classObject (readonly)

Returns the value of attribute context_class.



29
30
31
# File 'lib/concurrent/actor/core.rb', line 29

def context_class
  @context_class
end

#executorExecutor (readonly)

Returns which is used to process messages.

Returns:

  • (Executor)

    which is used to process messages



29
# File 'lib/concurrent/actor/core.rb', line 29

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#nameString (readonly)

Returns the name of this instance, it should be uniq (not enforced right now).

Returns:

  • (String)

    the name of this instance, it should be uniq (not enforced right now)



29
# File 'lib/concurrent/actor/core.rb', line 29

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#pathString (readonly)

Returns a path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: ‘parent.path + self.name` up to a Concurrent::Actor.root, e.g. `/an_actor/its_child`. (It will also probably form a supervision path (failures will be reported up to parents) in future versions.).

Returns:

  • (String)

    a path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: ‘parent.path + self.name` up to a Concurrent::Actor.root, e.g. `/an_actor/its_child`. (It will also probably form a supervision path (failures will be reported up to parents) in future versions.)



29
# File 'lib/concurrent/actor/core.rb', line 29

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#referenceReference (readonly)

Returns reference to this actor which can be safely passed around.

Returns:

  • (Reference)

    reference to this actor which can be safely passed around



29
30
31
# File 'lib/concurrent/actor/core.rb', line 29

def reference
  @reference
end

Instance Method Details

#add_child(child) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



116
117
118
119
120
121
# File 'lib/concurrent/actor/core.rb', line 116

def add_child(child)
  guard!
  Type! child, Reference
  @children.add child
  nil
end

#allocate_contextObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



187
188
189
# File 'lib/concurrent/actor/core.rb', line 187

def allocate_context
  @context = @context_class.allocate
end

#behaviour(behaviour_class) ⇒ Behaviour::Abstract?

Returns based on behaviour_class.

Parameters:

  • behaviour_class (Class)

Returns:



175
176
177
# File 'lib/concurrent/actor/core.rb', line 175

def behaviour(behaviour_class)
  @behaviours[behaviour_class]
end

#behaviour!(behaviour_class) ⇒ Behaviour::Abstract

Returns based on behaviour_class.

Parameters:

  • behaviour_class (Class)

Returns:

Raises:

  • (KeyError)

    when no behaviour



182
183
184
# File 'lib/concurrent/actor/core.rb', line 182

def behaviour!(behaviour_class)
  @behaviours.fetch behaviour_class
end

#broadcast(event) ⇒ Object



169
170
171
# File 'lib/concurrent/actor/core.rb', line 169

def broadcast(event)
  @first_behaviour.on_event(event)
end

#build_contextObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



192
193
194
195
# File 'lib/concurrent/actor/core.rb', line 192

def build_context
  @context.send :initialize_core, self
  @context.send :initialize, *@args, &@block
end

#childrenArray<Reference>

Returns of children actors.

Returns:



110
111
112
113
# File 'lib/concurrent/actor/core.rb', line 110

def children
  guard!
  @children.to_a
end

#dead_letter_routingObject



105
106
107
# File 'lib/concurrent/actor/core.rb', line 105

def dead_letter_routing
  @context.dead_letter_routing
end

#guard!Object

ensures that we are inside of the executor



140
141
142
143
144
# File 'lib/concurrent/actor/core.rb', line 140

def guard!
  unless Actor.current == reference
    raise "can be called only inside actor #{reference} but was #{Actor.current}"
  end
end

#log(level, message = nil, &block) ⇒ Object



146
147
148
# File 'lib/concurrent/actor/core.rb', line 146

def log(level, message = nil, &block)
  super level, @path, message, &block
end

#on_envelope(envelope) ⇒ Object

is executed by Reference scheduling processing of new messages can be called from other alternative Reference implementations

Parameters:



134
135
136
137
# File 'lib/concurrent/actor/core.rb', line 134

def on_envelope(envelope)
  schedule_execution { handle_envelope envelope }
  nil
end

#parentReference?

Returns of parent actor.

Returns:



100
101
102
# File 'lib/concurrent/actor/core.rb', line 100

def parent
  @parent_core && @parent_core.reference
end

#remove_child(child) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



124
125
126
127
128
129
# File 'lib/concurrent/actor/core.rb', line 124

def remove_child(child)
  guard!
  Type! child, Reference
  @children.delete child
  nil
end

#schedule_executionObject

Schedules blocks to be executed on executor sequentially, sets Actress.current



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/concurrent/actor/core.rb', line 152

def schedule_execution
  @serialized_execution.post(@executor) do
    synchronize do
      begin
        Thread.current[:__current_actor__] = reference
        yield
      rescue => e
        log FATAL, e
      ensure
        Thread.current[:__current_actor__] = nil
      end
    end
  end

  nil
end