Class: BatchReactor::Reactor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
MonitorMixin
Defined in:
lib/batch_reactor/reactor.rb

Defined Under Namespace

Classes: Work

Instance Method Summary collapse

Constructor Details

#initialize(options, &yield_batch_callback) ⇒ Reactor

Returns a new instance of Reactor.



6
7
8
9
10
11
12
13
14
15
# File 'lib/batch_reactor/reactor.rb', line 6

def initialize(options, &yield_batch_callback)
  @yield_batch_callback = yield_batch_callback
  @front_buffer = []
  @back_buffer = []
  @stopping_promise = Ione::Promise.new
  @stopped_promise = Ione::Promise.new
  @max_batch_size = options.fetch(:max_batch_size) { 100 }
  @no_work_backoff = options.fetch(:no_work_backoff) { 0.1 }
  super()
end

Instance Method Details

#perform_within_batch(&block) ⇒ Object



41
42
43
44
45
46
47
48
49
# File 'lib/batch_reactor/reactor.rb', line 41

def perform_within_batch(&block)
  promise = Ione::Promise.new
  if @stopping_promise.future.resolved?
    promise.fail(StandardError.new('Reactor stopped!'))
  else
    synchronize { @back_buffer << Work.new(block, promise) }
  end
  promise.future
end

#startObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/batch_reactor/reactor.rb', line 17

def start
  return @started_promise.future if @started_promise

  @started_promise = Ione::Promise.new
  Thread.start do
    @started_promise.fulfill(self)

    last_batch_future = Ione::Future.resolved(nil)
    until @stopping
      swap_buffers if needs_work?
      next if no_work?
      last_batch_future = process_batch
    end

    last_batch_future.on_complete { |_, _| shutdown }
  end
  @started_promise.future
end

#stopObject



36
37
38
39
# File 'lib/batch_reactor/reactor.rb', line 36

def stop
  @stopping = true
  @stopped_promise.future
end