Class: Concurrent::Actor::Utils::Pool

Inherits:
RestartingContext show all
Defined in:
lib/concurrent/actor/utils/pool.rb

Overview

Allows to create a pool of workers and distribute work between them

Examples:

class Worker < Concurrent::Actor::Utils::AbstractWorker
  def work(message)
    p message * 5
  end
end

pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |balancer, index|
  Worker.spawn name: "worker-#{index}", supervise: true, args: [balancer]
end

pool << 'asd' << 2
# prints:
# "asdasdasdasdasd"
# 10

Yields:

  • (balancer, index)

    a block spawning an worker instance. called size times. The worker should be descendant of AbstractWorker and supervised, see example.

Yield Parameters:

  • balancer (Balancer)

    to pass to the worker

  • index (Integer)

    of the worker, usually used in its name

Yield Returns:

  • (Reference)

    the reference of newly created worker

Instance Attribute Summary

Attributes inherited from AbstractContext

#core

Instance Method Summary collapse

Methods inherited from RestartingContext

#behaviour_definition

Methods inherited from AbstractContext

#ask, #behaviour_definition, #dead_letter_routing, #default_reference_class, #envelope, #on_envelope, #on_event, #pass, #tell

Methods included from InternalDelegations

#behaviour, #behaviour!, #children, #context, #dead_letter_routing, #log, #redirect, #terminate!

Methods included from PublicDelegations

#context_class, #executor, #name, #parent, #path, #reference

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initialize(size, &worker_initializer) ⇒ Pool

Returns a new instance of Pool.



30
31
32
33
34
# File 'lib/concurrent/actor/utils/pool.rb', line 30

def initialize(size, &worker_initializer)
  @balancer = Balancer.spawn name: :balancer, supervise: true
  @workers  = Array.new(size, &worker_initializer.curry[@balancer])
  @workers.each { |w| Type! w, Reference }
end

Instance Method Details

#on_message(message) ⇒ Object



36
37
38
# File 'lib/concurrent/actor/utils/pool.rb', line 36

def on_message(message)
  redirect @balancer
end