Class: UringMachine::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/uringmachine/fiber_scheduler.rb

Overview

Implements the Fiber::Scheduler interface for creating fiber-based concurrent applications in Ruby, in tight integration with the standard Ruby I/O and locking APIs.

Constant Summary collapse

DEFAULT_THREAD_POOL =

The blocking operation thread pool is shared by all fiber schedulers.

BlockingOperationThreadPool.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(machine = nil, thread_pool = DEFAULT_THREAD_POOL) ⇒ void

Instantiates a scheduler with the given UringMachine instance.

machine = UM.new
scheduler = UM::FiberScheduler.new(machine)
Fiber.set_scheduler(scheduler)

Parameters:

  • machine (UringMachine, nil) (defaults to: nil)

    UringMachine instance



98
99
100
101
102
103
# File 'lib/uringmachine/fiber_scheduler.rb', line 98

def initialize(machine = nil, thread_pool = DEFAULT_THREAD_POOL)
  @machine = machine || UM.new
  @thread_pool = thread_pool
  @fiber_map = ObjectSpace::WeakMap.new
  @thread = Thread.current
end

Instance Attribute Details

#fiber_mapObject (readonly)

WeakMap holding references scheduler fibers as keys.



88
89
90
# File 'lib/uringmachine/fiber_scheduler.rb', line 88

def fiber_map
  @fiber_map
end

#machineObject (readonly)

UringMachine instance associated with scheduler.



85
86
87
# File 'lib/uringmachine/fiber_scheduler.rb', line 85

def machine
  @machine
end

Instance Method Details

#address_resolve(hostname) ⇒ Array<Addrinfo>

Resolves an hostname.

Parameters:

  • hostname (String)

    hostname to resolve

Returns:

  • (Array<Addrinfo>)

    array of resolved addresses



379
380
381
# File 'lib/uringmachine/fiber_scheduler.rb', line 379

def address_resolve(hostname)
  Resolv.getaddresses(hostname)
end

#block(blocker, timeout = nil) ⇒ bool

Blocks the current fiber by yielding to the machine. This hook is called when a synchronization mechanism blocks, e.g. a mutex, a queue, etc.

Parameters:

  • blocker (any)

    blocker object

  • timeout (Number, nil) (defaults to: nil)

    optional timeout

Returns:

  • (bool)

    was the operation successful



162
163
164
165
166
167
168
169
170
171
# File 'lib/uringmachine/fiber_scheduler.rb', line 162

def block(blocker, timeout = nil)
  if timeout
    @machine.timeout(timeout, Timeout::Error) { @machine.yield }
  else
    @machine.yield
  end
  true
rescue Timeout::Error
  false
end

#blocking_operation_wait(op) ⇒ void

This method returns an undefined value.

Runs the given operation in a separate thread, so as not to block other fibers.

Parameters:

  • op (callable)

    blocking operation



152
153
154
# File 'lib/uringmachine/fiber_scheduler.rb', line 152

def blocking_operation_wait(op)
  @thread_pool.process(@machine, op)
end

#fiber(&block) ⇒ Fiber

Creates a new fiber with the given block. The created fiber is added to the fiber map, scheduled on the scheduler machine, and started before this method returns (by calling snooze).

Parameters:

  • block (Proc)

    fiber block

Returns:



116
117
118
119
120
121
122
123
# File 'lib/uringmachine/fiber_scheduler.rb', line 116

def fiber(&block)
  fiber = Fiber.new(blocking: false) { @machine.run(fiber, &block) }

  @fiber_map[fiber] = true
  @machine.schedule(fiber, nil)
  @machine.snooze
  fiber
end

#fiber_interrupt(fiber, exception) ⇒ void

This method returns an undefined value.

Interrupts the given fiber with an exception.

Parameters:

  • fiber (Fiber)

    fiber to interrupt

  • exception (Exception)

    Exception



370
371
372
373
# File 'lib/uringmachine/fiber_scheduler.rb', line 370

def fiber_interrupt(fiber, exception)
  @machine.schedule(fiber, exception)
  @machine.wakeup
end

#instance_variables_to_inspectObject

:nodoc:



106
107
108
# File 'lib/uringmachine/fiber_scheduler.rb', line 106

def instance_variables_to_inspect
  [:@machine]
end

#io_close(fd) ⇒ Integer

Closes the given fd.

Parameters:

  • fd (Integer)

    file descriptor

Returns:

  • (Integer)

    file descriptor



346
347
348
349
350
# File 'lib/uringmachine/fiber_scheduler.rb', line 346

def io_close(fd)
  @machine.close_async(fd)
rescue Errno => e
  -e.errno
end

#io_pread(io, buffer, from, length, offset) ⇒ Integer

Reads from the given IO at the given file offset

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    read buffer

  • from (Integer)

    read offset

  • length (Integer)

    read length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes read



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/uringmachine/fiber_scheduler.rb', line 271

def io_pread(io, buffer, from, length, offset)
  length = buffer.size if length == 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset, from)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_pwrite(io, buffer, from, length, offset) ⇒ Integer

Writes to the given IO at the given file offset.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    write buffer

  • from (Integer)

    file offset

  • length (Integer)

    write length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes written



323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/uringmachine/fiber_scheduler.rb', line 323

def io_pwrite(io, buffer, from, length, offset)
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length, from)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_read(io, buffer, length, offset) ⇒ Integer

Reads from the given IO.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    read buffer

  • length (Integer)

    read length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes read



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/uringmachine/fiber_scheduler.rb', line 245

def io_read(io, buffer, length, offset)
  length = buffer.size if length == 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_select(rios, wios, eios, timeout = nil) ⇒ Object

Selects the first ready IOs from the given sets of IOs.

Parameters:

  • rios (Array<IO>)

    readable IOs

  • wios (Array<IO>)

    writable IOs

  • eios (Array<IO>)

    exceptable IOs

  • timeout (Number, nil) (defaults to: nil)

    optional timeout



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/uringmachine/fiber_scheduler.rb', line 221

def io_select(rios, wios, eios, timeout = nil)
  map_r = map_fds(rios)
  map_w = map_fds(wios)
  map_e = map_fds(eios)

  r, w, e = nil
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
    }
  else
    r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
  end

  [unmap_fds(r, map_r), unmap_fds(w, map_w), unmap_fds(e, map_e)]
end

#io_wait(io, events, timeout = nil) ⇒ void

This method returns an undefined value.

Waits for the given io to become ready.

Parameters:

  • io (IO)

    IO object

  • events (Number)

    readiness bitmask

  • timeout (Number, nil) (defaults to: nil)

    optional timeout



204
205
206
207
208
209
210
211
212
213
# File 'lib/uringmachine/fiber_scheduler.rb', line 204

def io_wait(io, events, timeout = nil)
  timeout ||= io.timeout
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      @machine.poll(io.fileno, events)
    }
  else
    @machine.poll(io.fileno, events)
  end
end

#io_write(io, buffer, length, offset) ⇒ Integer

Writes to the given IO.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    write buffer

  • length (Integer)

    write length

  • offset (Integer)

    write offset

Returns:

  • (Integer)

    bytes written



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/uringmachine/fiber_scheduler.rb', line 296

def io_write(io, buffer, length, offset)
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#join(*fibers) ⇒ void

This method returns an undefined value.

Waits for the given fibers to terminate. If no fibers are given, waits for all fibers to terminate.

Parameters:

  • fibers (Array<Fiber>)

    fibers to terminate



138
139
140
141
142
143
144
145
# File 'lib/uringmachine/fiber_scheduler.rb', line 138

def join(*fibers)
  if fibers.empty?
    fibers = @fiber_map.keys
    @fiber_map = ObjectSpace::WeakMap.new
  end

  @machine.await_fibers(fibers)
end

#kernel_sleep(duration = nil) ⇒ void

This method returns an undefined value.

Sleeps for the given duration.

Parameters:

  • duration (Number, nil) (defaults to: nil)

    sleep duration



189
190
191
# File 'lib/uringmachine/fiber_scheduler.rb', line 189

def kernel_sleep(duration = nil)
  duration ? @machine.sleep(duration) : @machine.yield
end

#process_wait(pid, flags) ⇒ Process::Status

Waits for a process to terminate.

Parameters:

  • pid (Integer)

    process pid (0 for any child process)

  • flags (Integer)

    waitpid flags

Returns:

  • (Process::Status)

    terminated process status



359
360
361
362
# File 'lib/uringmachine/fiber_scheduler.rb', line 359

def process_wait(pid, flags)
  flags = UM::WEXITED if flags == 0
  @machine.waitid_status(UM::P_PID, pid, flags)
end

#scheduler_closevoid

This method returns an undefined value.

Waits for all fiber to terminate. Called upon thread termination or when the thread’s fiber scheduler is changed.



129
130
131
# File 'lib/uringmachine/fiber_scheduler.rb', line 129

def scheduler_close
  join()
end

#timeout_after(duration, exception, message, &block) ⇒ any

Run the given block with a timeout.

Parameters:

  • duration (Number)

    timeout duration

  • exception (Class)

    exception Class

  • message (String)

    exception message

  • block (Proc)

    block to run

Returns:

  • (any)

    block return value



390
391
392
# File 'lib/uringmachine/fiber_scheduler.rb', line 390

def timeout_after(duration, exception, message, &block)
  @machine.timeout(duration, exception, &block)
end

#unblock(blocker, fiber) ⇒ void

This method returns an undefined value.

Unblocks the given fiber by scheduling it. This hook is called when a synchronization mechanism unblocks, e.g. a mutex, a queue, etc.

Parameters:

  • blocker (any)

    blocker object

  • fiber (Fiber)

    fiber to resume



180
181
182
183
# File 'lib/uringmachine/fiber_scheduler.rb', line 180

def unblock(blocker, fiber)
  @machine.schedule(fiber, nil)
  @machine.wakeup if Thread.current != @thread
end

#yieldObject

Yields to the next runnable fiber.



194
195
196
# File 'lib/uringmachine/fiber_scheduler.rb', line 194

def yield
  @machine.snooze
end