Class: IOPromise::ExecutorContext

Inherits:
Object
  • Object
show all
Defined in:
lib/iopromise/executor_context.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeExecutorContext

Returns a new instance of ExecutorContext.



22
23
24
25
26
27
28
29
30
# File 'lib/iopromise/executor_context.rb', line 22

def initialize
  @pools = Set.new

  @pool_ready_readers = {}
  @pool_ready_writers = {}
  @pool_ready_exceptions = {}

  @pending_registrations = []
end

Class Method Details

.currentObject



13
14
15
# File 'lib/iopromise/executor_context.rb', line 13

def current
  @contexts.last
end

.popObject



17
18
19
# File 'lib/iopromise/executor_context.rb', line 17

def pop
  @contexts.pop
end

.pushObject



8
9
10
11
# File 'lib/iopromise/executor_context.rb', line 8

def push
  @contexts ||= []
  @contexts << ExecutorContext.new
end

Instance Method Details

#register(promise) ⇒ Object



32
33
34
# File 'lib/iopromise/executor_context.rb', line 32

def register(promise)
  @pending_registrations << promise
end

#wait_for_all_data(end_when_complete: nil) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/iopromise/executor_context.rb', line 36

def wait_for_all_data(end_when_complete: nil)
  loop do
    complete_pending_registrations

    readers, writers, exceptions, wait_time = continue_to_read_pools

    unless end_when_complete.nil?
      return unless end_when_complete.pending?
    end

    break if readers.empty? && writers.empty? && exceptions.empty? && @pending_registrations.empty?

    # if we have any pending promises to register, we'll not block at all so we immediately continue
    wait_time = 0 unless @pending_registrations.empty?
  
    # we could be clever and decide which ones to "continue" on next
    ready = IO.select(readers.keys, writers.keys, exceptions.keys, wait_time)
    ready = [[], [], []] if ready.nil?
    ready_readers, ready_writers, ready_exceptions = ready

    # group by the pool object that provided the fd
    @pool_ready_readers = ready_readers.group_by { |i| readers[i] }
    @pool_ready_writers = ready_writers.group_by { |i| writers[i] }
    @pool_ready_exceptions = ready_exceptions.group_by { |i| exceptions[i] }
  end
  
  unless end_when_complete.nil?
    raise ::IOPromise::Error.new('Internal error: IO loop completed without fulfilling the desired promise')
  else
    @pools.each do |pool|
      pool.wait
    end
  end
ensure
  complete_pending_registrations
end