Class: BatchReactor::Reactor
- Inherits:
-
Object
- Object
- BatchReactor::Reactor
- Extended by:
- Forwardable
- Includes:
- MonitorMixin
- Defined in:
- lib/batch_reactor/reactor.rb
Defined Under Namespace
Classes: Work
Instance Method Summary collapse
-
#initialize(options, &yield_batch_callback) ⇒ Reactor
constructor
A new instance of Reactor.
- #perform_within_batch(&block) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(options, &yield_batch_callback) ⇒ Reactor
Returns a new instance of Reactor.
6 7 8 9 10 11 12 13 14 15 |
# File 'lib/batch_reactor/reactor.rb', line 6 def initialize(, &yield_batch_callback) @yield_batch_callback = yield_batch_callback @front_buffer = [] @back_buffer = [] @stopping_promise = Ione::Promise.new @stopped_promise = Ione::Promise.new @max_batch_size = .fetch(:max_batch_size) { 100 } @no_work_backoff = .fetch(:no_work_backoff) { 0.1 } super() end |
Instance Method Details
#perform_within_batch(&block) ⇒ Object
41 42 43 44 45 46 47 48 49 |
# File 'lib/batch_reactor/reactor.rb', line 41 def perform_within_batch(&block) promise = Ione::Promise.new if @stopping_promise.future.resolved? promise.fail(StandardError.new('Reactor stopped!')) else synchronize { @back_buffer << Work.new(block, promise) } end promise.future end |
#start ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/batch_reactor/reactor.rb', line 17 def start return @started_promise.future if @started_promise @started_promise = Ione::Promise.new Thread.start do @started_promise.fulfill(self) last_batch_future = Ione::Future.resolved(nil) until @stopping swap_buffers if needs_work? next if no_work? last_batch_future = process_batch end last_batch_future.on_complete { |_, _| shutdown } end @started_promise.future end |
#stop ⇒ Object
36 37 38 39 |
# File 'lib/batch_reactor/reactor.rb', line 36 def stop @stopping = true @stopped_promise.future end |