Class: BatchReactor::ReactorCluster
- Inherits:
-
Object
- Object
- BatchReactor::ReactorCluster
- Defined in:
- lib/batch_reactor/reactor_cluster.rb
Instance Method Summary collapse
- #define_partitioner(&partitioner_callback) ⇒ Object
-
#initialize(count, options, &batch_callback) ⇒ ReactorCluster
constructor
A new instance of ReactorCluster.
- #perform_within_batch(key, &block) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(count, options, &batch_callback) ⇒ ReactorCluster
Returns a new instance of ReactorCluster.
4 5 6 7 8 |
# File 'lib/batch_reactor/reactor_cluster.rb', line 4 def initialize(count, , &batch_callback) @reactors = count.times.map do |index| Reactor.new() { |&block| batch_callback.call(index, &block) } end end |
Instance Method Details
#define_partitioner(&partitioner_callback) ⇒ Object
10 11 12 |
# File 'lib/batch_reactor/reactor_cluster.rb', line 10 def define_partitioner(&partitioner_callback) @partitioner_callback = partitioner_callback end |
#perform_within_batch(key, &block) ⇒ Object
24 25 26 27 |
# File 'lib/batch_reactor/reactor_cluster.rb', line 24 def perform_within_batch(key, &block) partition = @partitioner_callback.call(key) @reactors[partition].perform_within_batch(&block) end |
#start ⇒ Object
14 15 16 17 |
# File 'lib/batch_reactor/reactor_cluster.rb', line 14 def start futures = @reactors.map(&:start) Ione::Future.all(futures) end |
#stop ⇒ Object
19 20 21 22 |
# File 'lib/batch_reactor/reactor_cluster.rb', line 19 def stop futures = @reactors.map(&:stop) Ione::Future.all(futures) end |