Class: IOPromise::ExecutorContext
- Inherits:
-
Object
- Object
- IOPromise::ExecutorContext
- Defined in:
- lib/iopromise/executor_context.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ ExecutorContext
constructor
A new instance of ExecutorContext.
- #register(promise) ⇒ Object
- #wait_for_all_data(end_when_complete: nil) ⇒ Object
Constructor Details
#initialize ⇒ ExecutorContext
Returns a new instance of ExecutorContext.
22 23 24 25 26 27 28 29 30 |
# File 'lib/iopromise/executor_context.rb', line 22 def initialize @pools = Set.new @pool_ready_readers = {} @pool_ready_writers = {} @pool_ready_exceptions = {} @pending_registrations = [] end |
Class Method Details
.current ⇒ Object
13 14 15 |
# File 'lib/iopromise/executor_context.rb', line 13 def current @contexts.last end |
.pop ⇒ Object
17 18 19 |
# File 'lib/iopromise/executor_context.rb', line 17 def pop @contexts.pop end |
.push ⇒ Object
8 9 10 11 |
# File 'lib/iopromise/executor_context.rb', line 8 def push @contexts ||= [] @contexts << ExecutorContext.new end |
Instance Method Details
#register(promise) ⇒ Object
32 33 34 |
# File 'lib/iopromise/executor_context.rb', line 32 def register(promise) @pending_registrations << promise end |
#wait_for_all_data(end_when_complete: nil) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/iopromise/executor_context.rb', line 36 def wait_for_all_data(end_when_complete: nil) loop do complete_pending_registrations readers, writers, exceptions, wait_time = continue_to_read_pools unless end_when_complete.nil? return unless end_when_complete.pending? end break if readers.empty? && writers.empty? && exceptions.empty? && @pending_registrations.empty? # if we have any pending promises to register, we'll not block at all so we immediately continue wait_time = 0 unless @pending_registrations.empty? # we could be clever and decide which ones to "continue" on next ready = IO.select(readers.keys, writers.keys, exceptions.keys, wait_time) ready = [[], [], []] if ready.nil? ready_readers, ready_writers, ready_exceptions = ready # group by the pool object that provided the fd @pool_ready_readers = ready_readers.group_by { |i| readers[i] } @pool_ready_writers = ready_writers.group_by { |i| writers[i] } @pool_ready_exceptions = ready_exceptions.group_by { |i| exceptions[i] } end unless end_when_complete.nil? raise ::IOPromise::Error.new('Internal error: IO loop completed without fulfilling the desired promise') else @pools.each do |pool| pool.wait end end ensure complete_pending_registrations end |