Class: FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_scheduler/selector.rb,
lib/fiber_scheduler.rb,
lib/fiber_scheduler/timeout.rb,
lib/fiber_scheduler/version.rb,
lib/fiber_scheduler/timeouts.rb,
lib/fiber_scheduler/compatibility.rb

Overview

The code in this class has been taken from io-event gem. It has been cleaned up and appropriated for use in this gem.

Defined Under Namespace

Modules: Compatibility Classes: Selector, Timeout, Timeouts

Constant Summary collapse

Error =
Class.new(RuntimeError)
VERSION =
"0.10.0".freeze

Instance Method Summary collapse

Constructor Details

#initializeFiberScheduler

Returns a new instance of FiberScheduler.



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fiber_scheduler.rb', line 82

def initialize
  @fiber = Fiber.current
  @selector =
    if defined?(IO::Event)
      IO::Event::Selector.new(@fiber)
    else
      Selector.new(@fiber)
    end
  @timeouts = Timeouts.new

  @count = 0
  @nested = []
end

Instance Method Details

#address_resolve(hostname) ⇒ Object



145
146
147
# File 'lib/fiber_scheduler.rb', line 145

def address_resolve(hostname)
  Resolv.getaddresses(hostname)
end

#block(blocker, duration = nil) ⇒ Object



127
128
129
130
131
132
133
# File 'lib/fiber_scheduler.rb', line 127

def block(blocker, duration = nil)
  return @selector.transfer unless duration

  @timeouts.timeout(duration, method: :transfer) do
    @selector.transfer
  end
end

#closeObject

Fiber::SchedulerInterface methods below



116
117
118
119
120
121
122
123
124
125
# File 'lib/fiber_scheduler.rb', line 116

def close
  return unless @selector

  begin
    run
  ensure
    @selector.close
    @selector = nil
  end
end

#fiber(blocking: false, waiting: false, &block) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/fiber_scheduler.rb', line 173

def fiber(blocking: false, waiting: false, &block)
  current = Fiber.current

  if blocking
    # All fibers wait on a blocking fiber, so 'waiting' option is ignored.
    Fiber.new(blocking: true, &block).tap(&:resume)
  elsif waiting
    finished = false # prevents races
    fiber = Fiber.new(blocking: false) do
      @count += 1
      block.call
    ensure
      @count -= 1
      finished = true
      # Resume waiting parent fiber
      current.transfer
    end
    fiber.transfer

    # Current fiber is waiting until waiting fiber finishes.
    if current == @fiber
      # In a top-level fiber, there's nothing we can transfer to, so run
      # other fibers (or just block) until waiting fiber finishes.
      until finished
        run_once
      end
    elsif !finished
      @selector.transfer
    end

    fiber
  else
    if current != @fiber
      # nested Fiber.schedule
      @nested << current
    end

    fiber = Fiber.new(blocking: false) do
      @count += 1
      block.call
    ensure
      @count -= 1
    end
    fiber.tap(&:transfer)
  end
end

#io_read(io, buffer, length) ⇒ Object



157
158
159
# File 'lib/fiber_scheduler.rb', line 157

def io_read(io, buffer, length)
  @selector.io_read(Fiber.current, io, buffer, length)
end

#io_wait(io, events, duration = nil) ⇒ Object



149
150
151
152
153
154
155
# File 'lib/fiber_scheduler.rb', line 149

def io_wait(io, events, duration = nil)
  return @selector.io_wait(Fiber.current, io, events) unless duration

  @timeouts.timeout(duration, method: :transfer) do
    @selector.io_wait(Fiber.current, io, events)
  end
end

#io_write(io, buffer, length) ⇒ Object



161
162
163
# File 'lib/fiber_scheduler.rb', line 161

def io_write(io, buffer, length)
  @selector.io_write(Fiber.current, io, buffer, length)
end

#kernel_sleep(duration = nil) ⇒ Object



139
140
141
142
143
# File 'lib/fiber_scheduler.rb', line 139

def kernel_sleep(duration = nil)
  return @selector.transfer unless duration

  block(:sleep, duration)
end

#process_wait(pid, flags) ⇒ Object



165
166
167
# File 'lib/fiber_scheduler.rb', line 165

def process_wait(pid, flags)
  @selector.process_wait(Fiber.current, pid, flags)
end

#runObject



96
97
98
99
100
# File 'lib/fiber_scheduler.rb', line 96

def run
  while @count > 0
    run_once
  end
end

#run_onceObject



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fiber_scheduler.rb', line 102

def run_once
  if @nested.empty?
    @selector.select(@timeouts.interval)
    @timeouts.call
  else
    while @nested.any?
      fiber = @nested.pop
      fiber.transfer
    end
  end
end

#timeout_after(duration, exception = Timeout::Error, message = "timeout", &block) ⇒ Object



169
170
171
# File 'lib/fiber_scheduler.rb', line 169

def timeout_after(duration, exception = Timeout::Error, message = "timeout", &block)
  @timeouts.timeout(duration, exception, message, &block)
end

#unblock(blocker, fiber) ⇒ Object



135
136
137
# File 'lib/fiber_scheduler.rb', line 135

def unblock(blocker, fiber)
  @selector.push(fiber)
end