Class: FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_scheduler/selector.rb,
lib/fiber_scheduler.rb,
lib/fiber_scheduler/timeout.rb,
lib/fiber_scheduler/version.rb,
lib/fiber_scheduler/timeouts.rb,
lib/fiber_scheduler/compatibility.rb

Overview

The code in this class has been taken from io-event gem. It has been cleaned up and appropriated for use in this gem.

Defined Under Namespace

Modules: Compatibility Classes: Selector, Timeout, Timeouts

Constant Summary collapse

Error =
Class.new(RuntimeError)
VERSION =
"0.11.0".freeze

Instance Method Summary collapse

Constructor Details

#initializeFiberScheduler

Returns a new instance of FiberScheduler.



87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fiber_scheduler.rb', line 87

def initialize
  @fiber = Fiber.current
  @selector =
    if defined?(IO::Event)
      IO::Event::Selector.new(@fiber)
    else
      Selector.new(@fiber)
    end
  @timeouts = Timeouts.new

  @count = 0
  @nested = []
end

Instance Method Details

#address_resolve(hostname) ⇒ Object



150
151
152
# File 'lib/fiber_scheduler.rb', line 150

def address_resolve(hostname)
  Resolv.getaddresses(hostname)
end

#block(blocker, duration = nil) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/fiber_scheduler.rb', line 132

def block(blocker, duration = nil)
  return @selector.transfer unless duration

  @timeouts.timeout(duration, method: :transfer) do
    @selector.transfer
  end
end

#closeObject

Fiber::SchedulerInterface methods below



121
122
123
124
125
126
127
128
129
130
# File 'lib/fiber_scheduler.rb', line 121

def close
  return unless @selector

  begin
    run
  ensure
    @selector.close
    @selector = nil
  end
end

#fiber(type = nil, &block) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/fiber_scheduler.rb', line 178

def fiber(type = nil, &block)
  current = Fiber.current

  case type
  when :blocking
    Fiber.new(blocking: true, &block).tap(&:resume)
  when :waiting
    finished = false # prevents races
    fiber = Fiber.new(blocking: false) do
      @count += 1
      block.call
    ensure
      @count -= 1
      finished = true
      # Resume waiting parent fiber
      current.transfer
    end
    fiber.transfer

    # Current fiber is waiting until waiting fiber finishes.
    if current == @fiber
      # In a top-level fiber, there's nothing we can transfer to, so run
      # other fibers (or just block) until waiting fiber finishes.
      until finished
        run_once
      end
    elsif !finished
      @selector.transfer
    end

    fiber
  when :fleeting
    if current != @fiber
      # nested Fiber.schedule
      @nested << current
    end

    Fiber.new(blocking: false, &block).tap(&:transfer)
  when nil
    if current != @fiber
      # nested Fiber.schedule
      @nested << current
    end

    fiber = Fiber.new(blocking: false) do
      @count += 1
      block.call
    ensure
      @count -= 1
    end
    fiber.tap(&:transfer)

  else
    raise "Unknown type"
  end
end

#io_read(io, buffer, length) ⇒ Object



162
163
164
# File 'lib/fiber_scheduler.rb', line 162

def io_read(io, buffer, length)
  @selector.io_read(Fiber.current, io, buffer, length)
end

#io_wait(io, events, duration = nil) ⇒ Object



154
155
156
157
158
159
160
# File 'lib/fiber_scheduler.rb', line 154

def io_wait(io, events, duration = nil)
  return @selector.io_wait(Fiber.current, io, events) unless duration

  @timeouts.timeout(duration, method: :transfer) do
    @selector.io_wait(Fiber.current, io, events)
  end
end

#io_write(io, buffer, length) ⇒ Object



166
167
168
# File 'lib/fiber_scheduler.rb', line 166

def io_write(io, buffer, length)
  @selector.io_write(Fiber.current, io, buffer, length)
end

#kernel_sleep(duration = nil) ⇒ Object



144
145
146
147
148
# File 'lib/fiber_scheduler.rb', line 144

def kernel_sleep(duration = nil)
  return @selector.transfer unless duration

  block(:sleep, duration)
end

#process_wait(pid, flags) ⇒ Object



170
171
172
# File 'lib/fiber_scheduler.rb', line 170

def process_wait(pid, flags)
  @selector.process_wait(Fiber.current, pid, flags)
end

#runObject



101
102
103
104
105
# File 'lib/fiber_scheduler.rb', line 101

def run
  while @count > 0
    run_once
  end
end

#run_onceObject



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fiber_scheduler.rb', line 107

def run_once
  if @nested.empty?
    @selector.select(@timeouts.interval)
    @timeouts.call
  else
    while @nested.any?
      fiber = @nested.pop
      fiber.transfer
    end
  end
end

#timeout_after(duration, exception = Timeout::Error, message = "timeout", &block) ⇒ Object



174
175
176
# File 'lib/fiber_scheduler.rb', line 174

def timeout_after(duration, exception = Timeout::Error, message = "timeout", &block)
  @timeouts.timeout(duration, exception, message, &block)
end

#unblock(blocker, fiber) ⇒ Object



140
141
142
# File 'lib/fiber_scheduler.rb', line 140

def unblock(blocker, fiber)
  @selector.push(fiber)
end