Class: UringMachine::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/uringmachine/fiber_scheduler.rb

Overview

UringMachine::FiberScheduler implements the Fiber::Scheduler interface for creating fiber-based concurrent applications in Ruby, in tight integration with the standard Ruby I/O and locking APIs.

Constant Summary collapse

@@blocking_operation_thread_pool =

The blocking operation thread pool is shared by all fiber schedulers.

BlockingOperationThreadPool.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(machine = nil) ⇒ void

Instantiates a scheduler with the given UringMachine instance.

machine = UM.new
scheduler = UM::FiberScheduler.new(machine)
Fiber.set_scheduler(scheduler)

Parameters:

  • machine (UringMachine, nil) (defaults to: nil)

    UringMachine instance



95
96
97
98
99
# File 'lib/uringmachine/fiber_scheduler.rb', line 95

def initialize(machine = nil)
  @machine = machine || UM.new
  @fiber_map = ObjectSpace::WeakMap.new
  @thread = Thread.current
end

Instance Attribute Details

#fiber_mapObject (readonly)

WeakMap holding references scheduler fibers as keys.



85
86
87
# File 'lib/uringmachine/fiber_scheduler.rb', line 85

def fiber_map
  @fiber_map
end

#machineObject (readonly)

UringMachine instance associated with scheduler.



82
83
84
# File 'lib/uringmachine/fiber_scheduler.rb', line 82

def machine
  @machine
end

Instance Method Details

#address_resolve(hostname) ⇒ Array<Addrinfo>

Resolves an hostname.

Parameters:

  • hostname (String)

    hostname to resolve

Returns:

  • (Array<Addrinfo>)

    array of resolved addresses



369
370
371
# File 'lib/uringmachine/fiber_scheduler.rb', line 369

def address_resolve(hostname)
	Resolv.getaddresses(hostname)
end

#block(blocker, timeout = nil) ⇒ bool

Blocks the current fiber by yielding to the machine. This hook is called when a synchronization mechanism blocks, e.g. a mutex, a queue, etc.

Parameters:

  • blocker (any)

    blocker object

  • timeout (Number, nil) (defaults to: nil)

    optional timeout

Returns:

  • (bool)

    was the operation successful



157
158
159
160
161
162
163
164
165
166
# File 'lib/uringmachine/fiber_scheduler.rb', line 157

def block(blocker, timeout = nil)
  if timeout
    @machine.timeout(timeout, Timeout::Error) { @machine.yield }
  else
    @machine.yield
  end
  true
rescue Timeout::Error
  false
end

#blocking_operation_wait(op) ⇒ void

This method returns an undefined value.

Runs the given operation in a separate thread, so as not to block other fibers.

Parameters:

  • op (callable)

    blocking operation



147
148
149
# File 'lib/uringmachine/fiber_scheduler.rb', line 147

def blocking_operation_wait(op)
  @@blocking_operation_thread_pool.process(@machine, op)
end

#fiber(&block) ⇒ Object

Creates a new fiber with the given block. The created fiber is added to the fiber map, scheduled on the scheduler machine, and started before this method returns (by calling snooze).

Parameters:

  • block (Proc)

    fiber block @return [Fiber]



111
112
113
114
115
116
117
118
# File 'lib/uringmachine/fiber_scheduler.rb', line 111

def fiber(&block)
  fiber = Fiber.new(blocking: false) { @machine.run(fiber, &block) }

  @fiber_map[fiber] = true
  @machine.schedule(fiber, nil)
  @machine.snooze
  fiber
end

#fiber_interrupt(fiber, exception) ⇒ void

This method returns an undefined value.

Interrupts the given fiber with an exception.

Parameters:

  • fiber (Fiber)

    fiber to interrupt

  • exception (Exception)

    Exception



360
361
362
363
# File 'lib/uringmachine/fiber_scheduler.rb', line 360

def fiber_interrupt(fiber, exception)
  @machine.schedule(fiber, exception)
  @machine.wakeup
end

#instance_variables_to_inspectObject

:nodoc:



102
103
104
# File 'lib/uringmachine/fiber_scheduler.rb', line 102

def instance_variables_to_inspect
  [:@machine]
end

#io_close(fd) ⇒ Integer

Closes the given fd.

Parameters:

  • fd (Integer)

    file descriptor

Returns:

  • (Integer)

    file descriptor



337
338
339
340
# File 'lib/uringmachine/fiber_scheduler.rb', line 337

def io_close(fd)
  # p(io_close: fd)
  @machine.close_async(fd)
end

#io_pread(io, buffer, from, length, offset) ⇒ Integer

Reads from the given IO at the given file offset

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    read buffer

  • from (Integer)

    read offset

  • length (Integer)

    read length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes read



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/uringmachine/fiber_scheduler.rb', line 266

def io_pread(io, buffer, from, length, offset)
  length = buffer.size if length == 0

  if (timeout = io.timeout)
@machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset, from)
  end
rescue Errno::EINTR
  retry
end

#io_pwrite(io, buffer, from, length, offset) ⇒ Integer

Writes to the given IO at the given file offset.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    write buffer

  • length (Integer)

    file offset

  • length (Integer)

    write length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes written



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/uringmachine/fiber_scheduler.rb', line 315

def io_pwrite(io, buffer, from, length, offset)
  # p(io_pwrite: io, from:, length:, offset:, timeout: io.timeout)
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
@machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length, from)
  end
rescue Errno::EINTR
  retry
end

#io_read(io, buffer, length, offset) ⇒ Integer

Reads from the given IO.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    read buffer

  • length (Integer)

    read length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes read



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/uringmachine/fiber_scheduler.rb', line 242

def io_read(io, buffer, length, offset)
  length = buffer.size if length == 0

  if (timeout = io.timeout)
@machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset)
  end
rescue Errno::EINTR
  retry
end

#io_select(rios, wios, eios, timeout = nil) ⇒ Object

Selects the first ready IOs from the given sets of IOs.

Parameters:

  • rios (Array<IO>)

    readable IOs

  • wios (Array<IO>)

    writable IOs

  • eios (Array<IO>)

    exceptable IOs

  • timeout (Number, nil) (defaults to: nil)

    optional timeout



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/uringmachine/fiber_scheduler.rb', line 218

def io_select(rios, wios, eios, timeout = nil)
  map_r = map_fds(rios)
  map_w = map_fds(wios)
  map_e = map_fds(eios)

  r, w, e = nil
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
    }
  else
    r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
  end

  [unmap_fds(r, map_r), unmap_fds(w, map_w), unmap_fds(e, map_e)]
end

#io_wait(io, events, timeout = nil) ⇒ Object

Waits for the given io to become ready.

Parameters:

  • io (IO)

    IO object

  • events (Number)

    readiness bitmask

  • timeout (Number, nil) (defaults to: nil)

    optional timeout

  • return


200
201
202
203
204
205
206
207
208
209
210
# File 'lib/uringmachine/fiber_scheduler.rb', line 200

def io_wait(io, events, timeout = nil)
  # p(io_wait: io, events:)
  timeout ||= io.timeout
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      @machine.poll(io.fileno, events)
    }
  else
    @machine.poll(io.fileno, events)
  end
end

#io_write(io, buffer, length, offset) ⇒ Integer

Writes to the given IO.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    write buffer

  • length (Integer)

    write length

  • offset (Integer)

    write offset

Returns:

  • (Integer)

    bytes written



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/uringmachine/fiber_scheduler.rb', line 289

def io_write(io, buffer, length, offset)
  # p(io_write: io, length:, offset:, timeout: io.timeout)
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
@machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length)
  end
rescue Errno::EINTR
  retry
end

#join(*fibers) ⇒ void

This method returns an undefined value.

Waits for the given fibers to terminate. If no fibers are given, waits for all fibers to terminate.

Parameters:

  • fibers (Array<Fiber>)

    fibers to terminate



133
134
135
136
137
138
139
140
# File 'lib/uringmachine/fiber_scheduler.rb', line 133

def join(*fibers)
  if fibers.empty?
    fibers = @fiber_map.keys
    @fiber_map = ObjectSpace::WeakMap.new
  end

  @machine.await_fibers(fibers)
end

#kernel_sleep(duration = nil) ⇒ void

This method returns an undefined value.

Sleeps for the given duration.

Parameters:

  • duration (Number, nil) (defaults to: nil)

    sleep duration



184
185
186
# File 'lib/uringmachine/fiber_scheduler.rb', line 184

def kernel_sleep(duration = nil)
  duration ? @machine.sleep(duration) : @machine.yield
end

#process_wait(pid, flags) ⇒ Process::Status

Waits for a process to terminate.

Parameters:

  • pid (Integer)

    process pid (0 for any child process)

  • flags (Integer)

    waitpid flags

Returns:

  • (Process::Status)

    terminated process status



349
350
351
352
# File 'lib/uringmachine/fiber_scheduler.rb', line 349

def process_wait(pid, flags)
  flags = UM::WEXITED if flags == 0
  @machine.waitid_status(UM::P_PID, pid, flags)
end

#scheduler_closevoid

This method returns an undefined value.

Waits for all fiber to terminate. Called upon thread termination or when the thread’s fiber scheduler is changed.



124
125
126
# File 'lib/uringmachine/fiber_scheduler.rb', line 124

def scheduler_close
  join()
end

#timeout_after(duration, exception, message, &block) ⇒ any

Run the given block with a timeout.

Parameters:

  • duration (Number)

    timeout duration

  • exception (Class)

    exception Class

  • message (String)

    exception message

  • block (Proc)

    block to run

Returns:

  • (any)

    block return value



380
381
382
# File 'lib/uringmachine/fiber_scheduler.rb', line 380

def timeout_after(duration, exception, message, &block)
  @machine.timeout(duration, exception, &block)
end

#unblock(blocker, fiber) ⇒ void

This method returns an undefined value.

Unblocks the given fiber by scheduling it. This hook is called when a synchronization mechanism unblocks, e.g. a mutex, a queue, etc.

Parameters:

  • blocker (any)

    blocker object

  • fiber (Fiber)

    fiber to resume



175
176
177
178
# File 'lib/uringmachine/fiber_scheduler.rb', line 175

def unblock(blocker, fiber)
  @machine.schedule(fiber, nil)
  @machine.wakeup if Thread.current != @thread
end

#yieldObject

Yields to the next runnable fiber.



189
190
191
192
# File 'lib/uringmachine/fiber_scheduler.rb', line 189

def yield
  @machine.snooze
  # @machine.yield
end