Class: UringMachine

Inherits:
Object
  • Object
show all
Defined in:
lib/uringmachine.rb,
lib/uringmachine/actor.rb,
lib/uringmachine/version.rb,
lib/uringmachine/dns_resolver.rb,
lib/uringmachine/fiber_scheduler.rb,
ext/um/um_class.c

Overview

A UringMachine instance provides an interface for performing I/O operations and automatically switching between fibers. A single UringMachine instance should be used for each thread.

Defined Under Namespace

Modules: FiberExtensions, ThreadExtensions Classes: Actor, AsyncOp, BlockingOperationThreadPool, DNSResolver, Error, FiberScheduler, IO, Mutex, Queue, Terminate

Constant Summary collapse

TERMINATE_EXCEPTION =
UM::Terminate.new
VERSION =
'0.32.0'

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: 4096, sqpoll: false, sidecar: false) ⇒ void

Initializes a new UringMachine instance with the given options.

Parameters:

  • size (Integer) (defaults to: 4096)

    SQ size (default: 4096)

  • sqpoll (bool, Number) (defaults to: false)

    Set SQPOLL mode, SQPOLL timeout

  • sidecar (bool) (defaults to: false)

    Set sidecar mode



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'ext/um/um_class.c', line 110

VALUE UM_initialize(int argc, VALUE *argv, VALUE self) {
  static ID kwargs_ids[3];
  struct um *machine = RTYPEDDATA_DATA(self);
  VALUE opts, kwargs[3] = {Qnil, Qnil, Qnil};

  if (!kwargs_ids[0]) {
    kwargs_ids[0] = rb_intern_const("size");
    kwargs_ids[1] = rb_intern_const("sqpoll");
    kwargs_ids[2] = rb_intern_const("sidecar");
  }
  rb_scan_args(argc, argv, "0:", &opts);
  if (!NIL_P(opts)) {
    rb_get_kwargs(opts, kwargs_ids, 0, 3, kwargs);
  }

  uint entries_i = TYPE(kwargs[0]) == T_FIXNUM ? NUM2UINT(kwargs[0]) : 0;
  uint sqpoll_timeout_msec = get_sqpoll_timeout_msec(kwargs[1]);
  um_setup(self, machine, entries_i, sqpoll_timeout_msec, RTEST(kwargs[2]));

  return self;
}

Class Method Details

.debug(str) ⇒ void

This method returns an undefined value.

Prints the given string to STDERR.

Parameters:

  • str (String)

    debug message



1377
1378
1379
1380
# File 'ext/um/um_class.c', line 1377

VALUE UM_debug(VALUE self, VALUE str) {
  fprintf(stderr, "%s\n", StringValueCStr(str));
  return Qnil;
}

.inotify_add_watch(fd, path, mask) ⇒ Integer

Adds a watch on the given inotify file descriptor.

Parameters:

  • fd (Integer)

    inotify file descriptor

  • path (String)

    file/directory path

  • mask (Integer)

    inotify event mask

Returns:

  • (Integer)

    watch descriptor



1412
1413
1414
1415
1416
1417
1418
1419
# File 'ext/um/um_class.c', line 1412

VALUE UM_inotify_add_watch(VALUE self, VALUE fd, VALUE path, VALUE mask) {
  int ret = inotify_add_watch(NUM2INT(fd), StringValueCStr(path), NUM2UINT(mask));
  if (ret == -1) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }
  return INT2NUM(ret);
}

.inotify_initInteger

Creates an inotify file descriptor.

Returns:

  • (Integer)

    file descriptor



1391
1392
1393
1394
1395
1396
1397
1398
# File 'ext/um/um_class.c', line 1391

VALUE UM_inotify_init(VALUE self) {
  int fd = inotify_init();
  if (fd == -1) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }
  return INT2NUM(fd);
}

.kernel_versionInteger

Returns the kernel version.

Returns:

  • (Integer)

    kernel version



1365
1366
1367
# File 'ext/um/um_class.c', line 1365

VALUE UM_kernel_version(VALUE self) {
  return INT2NUM(UM_KERNEL_VERSION);
}

.pidfd_open(pid) ⇒ Integer

Creates a file descriptor representing the given process pid. The file descriptor can then be used with methods such as #waitid or .pidfd_open.

Parameters:

  • pid (Integer)

    process pid

Returns:

  • (Integer)

    file descriptor



1325
1326
1327
1328
1329
1330
1331
1332
1333
# File 'ext/um/um_class.c', line 1325

VALUE UM_pidfd_open(VALUE self, VALUE pid) {
  int fd = syscall(SYS_pidfd_open, NUM2INT(pid), 0);
  if (fd == -1) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }

  return INT2NUM(fd);
}

.pidfd_send_signal(fd, sig) ⇒ Integer

Parameters:

  • fd (Integer)

    pidfd

  • sig (Integer)

    signal

Returns:

  • (Integer)

    pidfd



1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
# File 'ext/um/um_class.c', line 1346

VALUE UM_pidfd_send_signal(VALUE self, VALUE fd, VALUE sig) {
  int ret = syscall(
    SYS_pidfd_send_signal, NUM2INT(fd), NUM2INT(sig), NULL, 0
  );
  if (ret) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }

  return fd;
}

.pipeArray

Creates a pipe, returning the file descriptors for the read and write ends.

Returns:

  • (Array)

Returns:

  • (Array<Integer>)

    array containing the read and write file descriptors



1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
# File 'ext/um/um_class.c', line 1280

VALUE UM_pipe(VALUE self) {
  int fds[2];
  int ret = pipe(fds);
  if (ret) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }

  return rb_ary_new_from_args(2, INT2NUM(fds[0]), INT2NUM(fds[1]));
}

.pr_set_child_subreaper(vaule) ⇒ bool

Sets/unsets the “child subreaper” attribute of the calling process.

Parameters:

  • value (bool)

    set/unset value

Returns:

  • (bool)

    set/unset value



1472
1473
1474
1475
1476
1477
1478
1479
1480
# File 'ext/um/um_class.c', line 1472

VALUE UM_pr_set_child_subreaper(VALUE self, VALUE set) {
  int ret = prctl(PR_SET_CHILD_SUBREAPER, RTEST(set) ? 1 : 0);
  if (ret) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }

  return set;
}

.socketpair(domain, type, protocol) ⇒ Array

Creates a pair of connected sockets, returning the two file descriptors.

Returns:

  • (Array)

Parameters:

  • domain (Integer)

    domain

  • type (Integer)

    type

  • protocol (Integer)

    protocol

Returns:

  • (Array<Integer>)

    array containing the two file descriptors



1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
# File 'ext/um/um_class.c', line 1303

VALUE UM_socketpair(VALUE self, VALUE domain, VALUE type, VALUE protocol) {
  int fds[2];
  int ret = socketpair(NUM2INT(domain), NUM2INT(type), NUM2INT(protocol), fds);
  if (ret) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }

  return rb_ary_new_from_args(2, INT2NUM(fds[0]), INT2NUM(fds[1]));
}

Instance Method Details

#accept(server_fd) ⇒ Integer

Parameters:

  • server_fd (Integer)

    listening socket file descriptor

Returns:

  • (Integer)

    connection file descriptor



636
637
638
639
# File 'ext/um/um_class.c', line 636

VALUE UM_accept(VALUE self, VALUE server_fd) {
  struct um *machine = um_get_machine(self);
  return um_accept(machine, NUM2INT(server_fd));
}

#accept_each(server_fd) {|connection_fd| ... } ⇒ void

This method returns an undefined value.

Repeatedly accepts incoming TCP connections in a loop, yielding the connection file descriptors to the given block.

Yields:

  • (connection_fd)

Parameters:

  • server_fd (Integer)

    listening socket file descriptor



653
654
655
656
# File 'ext/um/um_class.c', line 653

VALUE UM_accept_each(VALUE self, VALUE server_fd) {
  struct um *machine = um_get_machine(self);
  return um_accept_each(machine, NUM2INT(server_fd));
}

#accept_into_queue(server_fd, queue) ⇒ void

This method returns an undefined value.

Repeatedly accepts incoming TCP connections in a loop, pushing the connection file descriptors into the given queue.

Parameters:

  • server_fd (Integer)

    listening socket file descriptor

  • queue (UM::Queue)

    connection queue



671
672
673
674
# File 'ext/um/um_class.c', line 671

VALUE UM_accept_into_queue(VALUE self, VALUE server_fd, VALUE queue) {
  struct um *machine = um_get_machine(self);
  return um_accept_into_queue(machine, NUM2INT(server_fd), queue);
}

#await(*fibers) ⇒ Integer

Waits for the given fibers to terminate, without collecting their return values.

Parameters:

  • fibers (Fiber, Array<Fiber>)

    fibers to wait for

Returns:

  • (Integer)

    number of fibers



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/uringmachine.rb', line 124

def await(*fibers)
  queue = Fiber.current.mailbox

  if fibers.size == 1
    first = fibers.first
    case first
    when Enumerable
      fibers = first
    when Fiber
      first = proc_spin(first) if first.is_a?(Proc)
      if !first.done?
        first.add_done_listener(queue)
        self.shift(queue)
      end
      return 1
    end
  end

  pending = nil
  fibers.each do |f|
    f = proc_spin(f) if f.is_a?(Proc)
    if !f.done?
      (pending ||= []) << f
      f.add_done_listener(queue)
    end
  end
  if pending
    while !pending.empty?
      f = self.shift(queue)
      pending.delete(f)
    end
  end
  fibers.count
end

#bind(fd, host, port) ⇒ 0

Returns:

  • (0)

Parameters:

  • fd (Integer)

    file descriptor

  • host (String)

    hostname or IP address

  • port (Integer)

    port number

Returns:

  • (0)

    success



923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
# File 'ext/um/um_class.c', line 923

VALUE UM_bind(VALUE self, VALUE fd, VALUE host, VALUE port) {
  struct sockaddr_in addr;
  memset(&addr, 0, sizeof(addr));
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = inet_addr(StringValueCStr(host));
  addr.sin_port = htons(NUM2INT(port));

#ifdef HAVE_IO_URING_PREP_BIND
  struct um *machine = um_get_machine(self);
  return um_bind(machine, NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr));
#else
  int res = bind(NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr));
  if (res)
    rb_syserr_fail(errno, strerror(errno));
  return INT2NUM(0);
#endif
}

#close(fd) ⇒ 0

Returns:

  • (0)

Parameters:

  • fd (Integer)

    file descriptor

Returns:

  • (0)

    success



602
603
604
605
# File 'ext/um/um_class.c', line 602

VALUE UM_close(VALUE self, VALUE fd) {
  struct um *machine = um_get_machine(self);
  return um_close(machine, NUM2INT(fd));
}

#close_async(fd) ⇒ Hash

Closes the given file descriptor. This method submits the operation but does not wait for it to complete. This method may be used to improve performance in cases where the application des not care whether it succeeds or not.

Returns:

  • (Hash)

Parameters:

  • fd (Integer)

    file descriptor

Returns:

  • (Integer)

    file descriptor



620
621
622
623
# File 'ext/um/um_class.c', line 620

VALUE UM_close_async(VALUE self, VALUE fd) {
  struct um *machine = um_get_machine(self);
  return um_close_async(machine, NUM2INT(fd));
}

#connect(fd, host, port) ⇒ 0

Returns:

  • (0)

Parameters:

  • fd (Integer)

    file descriptor

  • host (String)

    hostname or IP address

  • port (Integer)

    port number

Returns:

  • (0)

    success



777
778
779
780
781
782
783
784
785
786
787
# File 'ext/um/um_class.c', line 777

VALUE UM_connect(VALUE self, VALUE fd, VALUE host, VALUE port) {
  struct um *machine = um_get_machine(self);

  struct sockaddr_in addr;
  memset(&addr, 0, sizeof(addr));
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = inet_addr(StringValueCStr(host));
  addr.sin_port = htons(NUM2INT(port));

  return um_connect(machine, NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr));
}

#fiber_setObject

Returns the set of running fibers.

return [Set]



17
18
19
# File 'lib/uringmachine.rb', line 17

def fiber_set
  @fiber_set ||= Set.new
end

#file_watch(root, mask) ⇒ void

This method returns an undefined value.

Watches for filesystem events using inotify in an infinite loop, yielding incoming events to the given block.

Parameters:

  • root (String)

    directory to watch

  • mask (Integer)

    event mask



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/uringmachine.rb', line 175

def file_watch(root, mask)
  fd = UM.inotify_init
  wd_map = {}
  recursive_file_watch(fd, root, wd_map, mask)
  while true
    events = inotify_get_events(fd)
    events.each do |event|
      if event[:mask] | UM::IN_IGNORED == UM::IN_IGNORED
        wd_map.delete(event[:wd])
        next
      end
      transformed_event = transform_file_watch_event(event, wd_map)
      if event[:mask] == UM::IN_CREATE | UM::IN_ISDIR
        recursive_file_watch(fd, transformed_event[:fn], wd_map, mask)
      end
      yield transformed_event
    end
  end
ensure
  close_async(fd)
end

#fsync(fd) ⇒ 0

Returns:

  • (0)

Parameters:

  • fd (Integer)

    fd

Returns:

  • (Integer)

    0 if successful



586
587
588
589
# File 'ext/um/um_class.c', line 586

VALUE UM_fsync(VALUE self, VALUE fd) {
  struct um *machine = um_get_machine(self);
  return um_fsync(machine, NUM2INT(fd));
}

#getsockopt(fd, level, opt) ⇒ Integer

Parameters:

  • fd (Integer)

    file descriptor

  • level (Integer)

    level

  • opt (Integer)

    level

Returns:

  • (Integer)

    option value



989
990
991
992
# File 'ext/um/um_class.c', line 989

VALUE UM_getsockopt(VALUE self, VALUE fd, VALUE level, VALUE opt) {
  struct um *machine = um_get_machine(self);
  return um_getsockopt(machine, NUM2INT(fd), NUM2INT(level), NUM2INT(opt));
}

#inotify_get_events(fd) ⇒ Array<Hash>

Waits for and returns one or more events on the given inotify file descriptor. Each event is returned as a hash containing the watch descriptor, the file name, and the event mask.

Parameters:

  • fd (Integer)

    inotify file descriptor

Returns:

  • (Array<Hash>)

    array of one or more events



1453
1454
1455
1456
1457
1458
1459
1460
# File 'ext/um/um_class.c', line 1453

VALUE UM_inotify_get_events(VALUE self, VALUE fd) {
  struct um *machine = um_get_machine(self);

  char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));

  size_t ret = um_read_raw(machine, NUM2INT(fd), buf, sizeof(buf));
  return inotify_get_events(buf, ret);
}

#io(target, mode = nil) ⇒ UringMachine::IO

call-seq:

machine.io(fd, mode = nil) -> conn
machine.io(fd, mode = nil) { |conn| }

Creates an UM::IO for the given target (fd or SSLSocket). The mode indicates the type of target and how it is read from:

  • :fd - read from the given fd using the buffer pool (default mode)

  • :socket - receive from the given socket fd using the buffer pool

  • :ssl - read from the given SSL socket

If a block is given, the block will be called with the IO instance as argument and the method will return the block’s return value.

Parameters:

  • target (Integer, OpenSSL::SSL::SSLSocket)

    fd or ssl socket

  • mode (Symbol, nil) (defaults to: nil)

    IO mode

Returns:



214
215
216
217
218
219
220
221
# File 'lib/uringmachine.rb', line 214

def io(target, mode = nil)
  conn = UM::IO.new(self, target, mode)
  return conn if !block_given?

  res = yield(conn)
  conn.clear
  res
end

#join(*fibers) ⇒ Object

Waits for the given fibers to terminate, returning the return value for each given fiber. This method also accepts procs instead of fibers. When a proc is given, it is ran in a separate fiber which will be joined.

machine.join(
  -> { machine.sleep(0.01); :f1 },
  -> { machine.sleep(0.02); :f2 },
  -> { machine.sleep(0.03); :f3 }
) #=> [:f1, :f2, :f3]

values of the given fibers

Parameters:

  • *fibers (Array<Fiber, Proc>)

    fibers @return [Array<any>] return



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/uringmachine.rb', line 77

def join(*fibers)
  queue = Fiber.current.mailbox

  if fibers.size == 1
    first = fibers.first
    case first
    when Enumerable
      fibers = first
    when Fiber
      first = proc_spin(first) if first.is_a?(Proc)
      if !first.done?
        first.add_done_listener(queue)
        self.shift(queue)
      end
      return first.result
    end
  end

  results = {}
  pending = nil
  fibers.each do |f|
    f = proc_spin(f) if f.is_a?(Proc)
    if f.done?
      results[f] = f.result
    else
      results[f] = nil
      (pending ||= []) << f
      f.add_done_listener(queue)
    end
  end

  if pending
    while !pending.empty?
      f = self.shift(queue)
      pending.delete(f)
      results[f] = f.result
    end
  end
  values = results.values
  fibers.size == 1 ? values.first : values
end

#listen(fd, backlog) ⇒ 0

Returns:

  • (0)

Parameters:

  • fd (Integer)

    file descriptor

  • backlog (String)

    pending connection queue length

Returns:

  • (0)

    success



953
954
955
956
957
958
959
960
961
962
963
# File 'ext/um/um_class.c', line 953

VALUE UM_listen(VALUE self, VALUE fd, VALUE backlog) {
#ifdef HAVE_IO_URING_PREP_LISTEN
  struct um *machine = um_get_machine(self);
  return um_listen(machine, NUM2INT(fd), NUM2INT(backlog));
#else
  int res = listen(NUM2INT(fd), NUM2INT(backlog));
  if (res)
    rb_syserr_fail(errno, strerror(errno));
  return INT2NUM(0);
#endif
}

#mark(mark) ⇒ Object

:nodoc:



155
156
157
158
159
# File 'ext/um/um_class.c', line 155

VALUE UM_mark_m(VALUE self, VALUE mark) {
  struct um *machine = um_get_machine(self);
  machine->mark = NUM2UINT(mark);
  return self;
}

#metricsHash

Returns a hash with different metrics about the functioning of the UringMachine instance.

Returns:

  • (Hash)

    metrics hash



166
167
168
169
# File 'ext/um/um_class.c', line 166

VALUE UM_metrics(VALUE self) {
  struct um *machine = um_get_machine(self);
  return um_metrics(machine, &machine->metrics);
}

#open(pathname, flags) ⇒ Integer #open(pathname, flags) {|fd| ... } ⇒ Integer

Opens a file using the given pathname and flags. If a block is given, the file descriptor is passed to the block, and the file is automatically closed when the block returns.

Overloads:

  • #open(pathname, flags) {|fd| ... } ⇒ Integer

    Yields:

    • (fd)

Parameters:

  • pathname (String)

    file path

  • flags (Integer)

    flags mask

Returns:

  • (Integer)

    fd



1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
# File 'ext/um/um_class.c', line 1127

VALUE UM_open(VALUE self, VALUE pathname, VALUE flags) {
  struct um *machine = um_get_machine(self);
  // TODO: take optional perm (mode) arg
  VALUE fd = um_open(machine, pathname, NUM2INT(flags), 0666);
  if (rb_block_given_p()) {
    struct um_open_ctx ctx = { self, fd };
    return rb_ensure(rb_yield, fd, UM_open_complete, (VALUE)&ctx);
  }
  else
    return fd;
}

#pending_fibersSet

Returns a set containing all fibers in pending state (i.e. waiting for an operation to complete.)

Returns:

  • (Set)

    set of pending fibers



318
319
320
321
# File 'ext/um/um_class.c', line 318

VALUE UM_pending_fibers(VALUE self) {
  struct um *machine = um_get_machine(self);
  return machine->pending_fibers;
}

#periodically(interval) ⇒ void

This method returns an undefined value.

Runs the given block at regular time intervals in an infinite loop.

Parameters:

  • interval (Number)

    time interval (in seconds) between consecutive invocations



370
371
372
373
# File 'ext/um/um_class.c', line 370

VALUE UM_periodically(VALUE self, VALUE interval) {
  struct um *machine = um_get_machine(self);
  return um_periodically(machine, NUM2DBL(interval));
}

#poll(fd, mask) ⇒ Integer

Waits for readiness of the given file descriptor according to the given event mask.

Parameters:

  • fd (Integer)

    file descriptor

  • mask (Integer)

    events mask

Returns:

  • (Integer)

    fd



1152
1153
1154
1155
# File 'ext/um/um_class.c', line 1152

VALUE UM_poll(VALUE self, VALUE fd, VALUE mask) {
  struct um *machine = um_get_machine(self);
  return um_poll(machine, NUM2INT(fd), NUM2UINT(mask));
}

#pop(queue) ⇒ any

Parameters:

Returns:

  • (any)

    value



1060
1061
1062
1063
1064
# File 'ext/um/um_class.c', line 1060

VALUE UM_queue_pop(VALUE self, VALUE queue) {
  struct um *machine = um_get_machine(self);
  struct um_queue *que = Queue_data(queue);
  return um_queue_pop(machine, que);
}

#prep_timeout(interval) ⇒ UM::AsyncOp

Prepares an asynchronous timeout instance.

Parameters:

  • interval (Integer)

    timeout interval in seconds

Returns:



1218
1219
1220
1221
# File 'ext/um/um_class.c', line 1218

VALUE UM_prep_timeout(VALUE self, VALUE interval) {
  struct um *machine = um_get_machine(self);
  return um_prep_timeout(machine, NUM2DBL(interval));
}

#profile_mode=(value) ⇒ bool

Sets/resets profile mode.

Parameters:

  • value (bool)

    profile mode on/off

Returns:

  • (bool)

    profile mode on/off



194
195
196
197
198
199
200
201
202
# File 'ext/um/um_class.c', line 194

VALUE UM_profile_mode_set(VALUE self, VALUE value) {
  struct um *machine = um_get_machine(self);
  machine->profile_mode = RTEST(value);
  if (machine->profile_mode) {
    machine->metrics.time_total_wait = 0.0;
    machine->metrics.time_last_cpu = machine->metrics.time_first_cpu = um_get_time_cpu();
  }
  return value;
}

#profile_mode?bool

Returns the profile mode state.

Returns:

  • (bool)

    profile mode on/off



175
176
177
178
# File 'ext/um/um_class.c', line 175

VALUE UM_profile_mode_p(VALUE self) {
  struct um *machine = um_get_machine(self);
  return machine->profile_mode ? Qtrue : Qfalse;
}

#push(queue, value) ⇒ UM::Queue

Parameters:

  • queue (UM::Queue)

    queue

  • value (any)

    value

Returns:



1043
1044
1045
1046
1047
# File 'ext/um/um_class.c', line 1043

VALUE UM_queue_push(VALUE self, VALUE queue, VALUE value) {
  struct um *machine = um_get_machine(self);
  struct um_queue *que = Queue_data(queue);
  return um_queue_push(machine, que, value);
}

#read(fd, buffer, maxlen, buffer_offset = nil, file_offset = nil) ⇒ Integer

Reads up to maxlen bytes from the given fd into the given buffer. The optional buffer_offset parameter determines the position in the buffer into which the data will be read. A negative buffer_offset denotes a position relative to the end of the buffer, e.g. a value of -1 means the data will be appended to the buffer.

Parameters:

  • fd (Integer)

    file descriptor

  • buffer (String, IO::Buffer)

    buffer

  • maxlen (Integer)

    maximum number of bytes to read

  • buffer_offset (Integer)

    optional buffer offset to read into

  • file_offset (Integer)

    optional file offset to read from

Returns:

  • (Integer)

    number of bytes read



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'ext/um/um_class.c', line 394

VALUE UM_read(int argc, VALUE *argv, VALUE self) {
  struct um *machine = um_get_machine(self);
  VALUE fd;
  VALUE buffer;
  VALUE maxlen;
  VALUE buffer_offset;
  VALUE file_offset;
  rb_scan_args(argc, argv, "32", &fd, &buffer, &maxlen, &buffer_offset, &file_offset);

  ssize_t maxlen_i = NIL_P(maxlen) ? -1 : NUM2INT(maxlen);
  ssize_t buffer_offset_i = NIL_P(buffer_offset) ? 0 : NUM2INT(buffer_offset);
  __u64 file_offset_i = NIL_P(file_offset) ? (__u64)-1 : NUM2UINT(file_offset);

  return um_read(machine, NUM2INT(fd), buffer, maxlen_i, buffer_offset_i, file_offset_i);
}

#read_each(fd, bgid) {|data| ... } ⇒ void

This method returns an undefined value.

Reads repeatedly from the given fd using the given buffer group id. The buffer group should have been previously setup using #setup_buffer_ring. Read data is yielded in an infinite loop to the given block.

Yields:

  • (data)

Parameters:

  • fd (Integer)

    file descriptor

  • bgid (Integer)

    buffer group id



423
424
425
426
# File 'ext/um/um_class.c', line 423

VALUE UM_read_each(VALUE self, VALUE fd, VALUE bgid) {
  struct um *machine = um_get_machine(self);
  return um_read_each(machine, NUM2INT(fd), NUM2INT(bgid));
}

#recv(fd, buffer, maxlen, flags) ⇒ Integer

Parameters:

  • fd (Integer)

    file descriptor

  • buffer (String, IO::Buffer)

    buffer

  • maxlen (Integer)

    maximum number of bytes to receive

  • flags (Integer)

    flags mask

Returns:

  • (Integer)

    number of bytes received



885
886
887
888
# File 'ext/um/um_class.c', line 885

VALUE UM_recv(VALUE self, VALUE fd, VALUE buffer, VALUE maxlen, VALUE flags) {
  struct um *machine = um_get_machine(self);
  return um_recv(machine, NUM2INT(fd), buffer, NUM2INT(maxlen), NUM2INT(flags));
}

#recv_each(fd, bgid, flags) {|data| ... } ⇒ void

This method returns an undefined value.

Repeatedlty receives data from the given socket in an infinite loop using the given buffer group id. The buffer group should have been previously setup using #setup_buffer_ring.

Yields:

  • (data)

Parameters:

  • fd (Integer)

    file descriptor

  • bgid (Integer)

    buffer group id

  • flags (Integer)

    flags mask



905
906
907
908
# File 'ext/um/um_class.c', line 905

VALUE UM_recv_each(VALUE self, VALUE fd, VALUE bgid, VALUE flags) {
  struct um *machine = um_get_machine(self);
  return um_recv_each(machine, NUM2INT(fd), NUM2INT(bgid), NUM2INT(flags));
}

#recv_fd(sock_fd) ⇒ Integer

Parameters:

  • sock_fd (Integer)

    socket file descriptor

Returns:

  • (Integer)

    rececived file descriptor



759
760
761
762
# File 'ext/um/um_class.c', line 759

VALUE UM_recv_fd(VALUE self, VALUE sock_fd) {
  struct um *machine = um_get_machine(self);
  return um_recv_fd(machine, NUM2INT(sock_fd));
}

#resolve(hostname, type = :A) ⇒ String

Resolves a hostname to an IP address by performing a DNS query.

Parameters:

  • hostname (String)

    hostname

  • type (Symbol) (defaults to: :A)

    record type

Returns:

  • (String)

    IP address



164
165
166
167
# File 'lib/uringmachine.rb', line 164

def resolve(hostname, type = :A)
  @resolver ||= DNSResolver.new(self)
  @resolver.resolve(hostname, type)
end

#run(fiber, &block) ⇒ Fiber

Runs the given block in the given fiber. This method is used to run fibers indirectly.

Parameters:

  • fiber (Fiber)

    fiber

  • block (Proc)

    block to run

Returns:



58
59
60
61
62
63
# File 'lib/uringmachine.rb', line 58

def run(fiber, &block)
  run_block_in_fiber(block, fiber, nil)
  self.schedule(fiber, nil)
  fiber_set << fiber
  fiber
end

#schedule(fiber, value) ⇒ UringMachine

Schedules the given fiber by adding it to the runqueue. The fiber will be resumed with the given value.

Parameters:

  • fiber (Fiber)

    fiber to schedule

  • value (any)

    resume value

Returns:



330
331
332
333
334
# File 'ext/um/um_class.c', line 330

VALUE UM_schedule(VALUE self, VALUE fiber, VALUE value) {
  struct um *machine = um_get_machine(self);
  um_schedule(machine, fiber, value);
  return self;
}

#select(read_fds, write_fds, except_fds) ⇒ Array

Waits for readyness of at least one fd for read, write or exception condition from the given fds. This method provides a similar interface to ‘IO#select`.

Returns:

  • (Array)

Parameters:

  • read_fds (Array<Integer>)

    file descriptors for reading

  • write_fds (Array<Integer>)

    file descriptors for writing

  • except_fds (Array<Integer>)

    file descriptors for exception

Returns:

  • (Array<Array<Integer>>)

    read-ready, write-ready, and exception-ready fds



1171
1172
1173
1174
# File 'ext/um/um_class.c', line 1171

VALUE UM_select(VALUE self, VALUE read_fds, VALUE write_fds, VALUE except_fds) {
  struct um *machine = um_get_machine(self);
  return um_select(machine, read_fds, write_fds, except_fds);
}

#send(fd, buffer, len, flags) ⇒ Integer

Sends data on the given socket. This method is not guaranteed to send all of the data in the buffer, unless UM::MSG_WAITALL is specified in the flags mask.

Parameters:

  • fd (Integer)

    file descriptor

  • buffer (String, IO::Buffer)

    buffer

  • len (Integer)

    number of bytes to send

  • flags (Integer)

    flags mask

Returns:

  • (Integer)

    number of bytes sent



805
806
807
808
# File 'ext/um/um_class.c', line 805

VALUE UM_send(VALUE self, VALUE fd, VALUE buffer, VALUE len, VALUE flags) {
  struct um *machine = um_get_machine(self);
  return um_send(machine, NUM2INT(fd), buffer, NUM2INT(len), NUM2INT(flags));
}

#send_bundle(fd, bgid, *buffers) ⇒ Integer #send_bundle(fd, bgid, *buffers) ⇒ Object

Sends data on the given socket from the given buffers using a registered buffer group. The buffer group should have been previously registered using #setup_buffer_ring.

Overloads:

  • #send_bundle(fd, bgid, *buffers) ⇒ Integer

    Returns number of bytes sent.

    Parameters:

    • fd (Integer)

      file descriptor

    • bgid (Integer)

      buffer group id

    • *buffers (Array<String, IO::Buffer>)

      buffers

    Returns:

    • (Integer)

      number of bytes sent



855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
# File 'ext/um/um_class.c', line 855

VALUE UM_send_bundle(int argc, VALUE *argv, VALUE self) {
  struct um *machine = um_get_machine(self);
  VALUE fd;
  VALUE bgid;
  VALUE strings;
  rb_scan_args(argc, argv, "2*", &fd, &bgid, &strings);

  if (RARRAY_LEN(strings) == 1) {
    VALUE first = rb_ary_entry(strings, 0);
    if (TYPE(first) == T_ARRAY)
      strings = first;
  }

  return um_send_bundle(machine, NUM2INT(fd), NUM2INT(bgid), strings);
}

#send_fd(sock_fd, fd) ⇒ Integer

Parameters:

  • sock_fd (Integer)

    socket file descriptor

  • fd (Integer)

    file descriptor to send

Returns:

  • (Integer)

    file descriptor to send



743
744
745
746
# File 'ext/um/um_class.c', line 743

VALUE UM_send_fd(VALUE self, VALUE sock_fd, VALUE fd) {
  struct um *machine = um_get_machine(self);
  return um_send_fd(machine, NUM2INT(sock_fd), NUM2INT(fd));
}

#sendv(fd, *buffers) ⇒ Integer #sendv(fd, *buffers) ⇒ Object

Sends data on the given socket from the given buffers. This method is only available on Linux kernel >= 6.17. This method is guaranteed to send

Overloads:

  • #sendv(fd, *buffers) ⇒ Integer

    Returns number of bytes sent.

    Parameters:

    • fd (Integer)

      file descriptor

    • *buffers (Array<String, IO::Buffer>)

      buffers

    Returns:

    • (Integer)

      number of bytes sent



824
825
826
827
828
829
830
831
832
833
834
835
836
# File 'ext/um/um_class.c', line 824

VALUE UM_sendv(int argc, VALUE *argv, VALUE self) {
  struct um *machine = um_get_machine(self);
  if (argc < 1)
    rb_raise(rb_eArgError, "wrong number of arguments (given 0, expected 1+)");
  int fd = NUM2INT(argv[0]);
  if (argc < 2) return INT2NUM(0);

#ifdef HAVE_IO_URING_SEND_VECTORIZED
  return um_sendv(machine, fd, argc - 1, argv + 1);
#else
  return um_writev(machine, fd, argc - 1, argv + 1);
#endif
}

#setsockopt(fd, level, opt, value) ⇒ 0

Returns:

  • (0)

Parameters:

  • fd (Integer)

    file descriptor

  • level (Integer)

    level

  • opt (Integer)

    level

  • value (Integer)

    option value

Returns:

  • (0)

    success



1008
1009
1010
1011
# File 'ext/um/um_class.c', line 1008

VALUE UM_setsockopt(VALUE self, VALUE fd, VALUE level, VALUE opt, VALUE value) {
  struct um *machine = um_get_machine(self);
  return um_setsockopt(machine, NUM2INT(fd), NUM2INT(level), NUM2INT(opt), numeric_value(value));
}

#setup_buffer_ring(size, count) ⇒ Integer

Creates a buffer group (buffer ring) with the given buffer size and buffer count.

Parameters:

  • size (Integer)

    buffer size in bytes

  • count (Integer)

    number of buffers in group

Returns:

  • (Integer)

    buffer group id



138
139
140
141
142
# File 'ext/um/um_class.c', line 138

VALUE UM_setup_buffer_ring(VALUE self, VALUE size, VALUE count) {
  struct um *machine = um_get_machine(self);
  int bgid = um_setup_buffer_ring(machine, NUM2UINT(size), NUM2UINT(count));
  return INT2NUM(bgid);
}

#pop(queue) ⇒ any

Parameters:

Returns:

  • (any)

    value



1095
1096
1097
1098
1099
# File 'ext/um/um_class.c', line 1095

VALUE UM_queue_shift(VALUE self, VALUE queue) {
  struct um *machine = um_get_machine(self);
  struct um_queue *que = Queue_data(queue);
  return um_queue_shift(machine, que);
}

#shutdown(fd, how) ⇒ 0

Returns:

  • (0)

Parameters:

  • fd (Integer)

    file descriptor

  • how (Integer)

    how the socket should be shutdown

Returns:

  • (0)

    success



707
708
709
710
# File 'ext/um/um_class.c', line 707

VALUE UM_shutdown(VALUE self, VALUE fd, VALUE how) {
  struct um *machine = um_get_machine(self);
  return um_shutdown(machine, NUM2INT(fd), NUM2INT(how));
}

#shutdown_async(fd, how) ⇒ 0

Shuts down a socket for sending and/or receiving. This method may be used to improve performance in situations where the application does not care about whether the operation succeeds or not.

Returns:

  • (0)

Parameters:

  • fd (Integer)

    file descriptor

  • how (Integer)

    how the socket should be shutdown

Returns:

  • (0)

    success



726
727
728
729
# File 'ext/um/um_class.c', line 726

VALUE UM_shutdown_async(VALUE self, VALUE fd, VALUE how) {
  struct um *machine = um_get_machine(self);
  return um_shutdown_async(machine, NUM2INT(fd), NUM2INT(how));
}

#sidecar_mode?bool

Returns the sidecar mode state.

Returns:

  • (bool)

    sidecar mode on/off



219
220
221
222
# File 'ext/um/um_class.c', line 219

VALUE UM_sidecar_mode_p(VALUE self) {
  struct um *machine = um_get_machine(self);
  return machine->sidecar_mode ? Qtrue : Qfalse;
}

#sidecar_startUringMachine

Starts sidecar mode.

Returns:



228
229
230
231
232
# File 'ext/um/um_class.c', line 228

VALUE UM_sidecar_start(VALUE self) {
  struct um *machine = um_get_machine(self);
  um_sidecar_setup(machine);
  return self;
}

#sidecar_stopUringMachine

Stops sidecar mode.

Returns:



238
239
240
241
242
# File 'ext/um/um_class.c', line 238

VALUE UM_sidecar_stop(VALUE self) {
  struct um *machine = um_get_machine(self);
  um_sidecar_teardown(machine);
  return self;
}

#sizeInteger

Returns the SQ (submission queue) size.

Returns:

  • (Integer)

    SQ size



148
149
150
151
# File 'ext/um/um_class.c', line 148

VALUE UM_size(VALUE self) {
  struct um *machine = um_get_machine(self);
  return UINT2NUM(machine->size);
}

#sleep(duration) ⇒ void

This method returns an undefined value.

Puts the current fiber to sleep for the given time duration (in seconds), yielding control to the next fiber in the runqueue.

Parameters:

  • duration (Number)

    sleep duration in seconds



358
359
360
361
# File 'ext/um/um_class.c', line 358

VALUE UM_sleep(VALUE self, VALUE duration) {
  struct um *machine = um_get_machine(self);
  return um_sleep(machine, NUM2DBL(duration));
}

#snoozeUringMachine

Adds the current fiber to the end of the runqueue and yields control to the next fiber in the runqueue. This method is usually used to yield control while performing CPU-intensive work in order not starve other fibers.

Returns:



250
251
252
253
254
255
256
257
258
259
# File 'ext/um/um_class.c', line 250

VALUE UM_snooze(VALUE self) {
  struct um *machine = um_get_machine(self);
  um_schedule(machine, rb_fiber_current(), Qnil);

  // the current fiber is already scheduled, and the runqueue is GC-marked, so
  // we can safely call um_switch, which is faster than calling um_yield.
  VALUE ret = um_switch(machine);
  RAISE_IF_EXCEPTION(ret);
  return ret;
}

#socket(domain, type, protocol, flags) ⇒ Integer

Parameters:

  • domain (Integer)

    socket domain

  • type (Integer)

    socket type

  • protocol (Integer)

    socket protocol

  • flags (Integer)

    flags

Returns:

  • (Integer)

    file descriptor



690
691
692
693
# File 'ext/um/um_class.c', line 690

VALUE UM_socket(VALUE self, VALUE domain, VALUE type, VALUE protocol, VALUE flags) {
  struct um *machine = um_get_machine(self);
  return um_socket(machine, NUM2INT(domain), NUM2INT(type), NUM2INT(protocol), NUM2UINT(flags));
}

#spin(value = nil, klass = Fiber, &block) ⇒ Fiber

Creates a new fiber and schedules it to be ran.

Parameters:

  • value (any) (defaults to: nil)

    Value to be passed to the given block

  • klass (Class) (defaults to: Fiber)

    fiber class

  • block (Proc)

    block to run in fiber

Returns:



31
32
33
34
35
36
37
# File 'lib/uringmachine.rb', line 31

def spin(value = nil, klass = Fiber, &block)
  fiber = klass.new(blocking: false) { |v| run_block_in_fiber(block, fiber, v) }
  self.schedule(fiber, value)

  fiber_set << fiber
  fiber
end

#spin_actor(mod, *a, **k) ⇒ Object



4
5
6
7
8
9
10
11
# File 'lib/uringmachine/actor.rb', line 4

def spin_actor(mod, *a, **k)
  target = Object.new.extend(mod)
  mailbox = UM::Queue.new
  actor = spin(nil, Actor) { actor.run(self, target, mailbox) }
  target.setup(*a, **k)
  snooze
  actor
end

#spin_thread_actor(mod, *a, **k) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/uringmachine/actor.rb', line 13

def spin_thread_actor(mod, *a, **k)
  machine = UM.new
  target = Object.new.extend(mod)
  mailbox = UM::Queue.new
  actor = Actor.new
  Thread.new do
    actor.run(machine, target, mailbox)
  end
  target.setup(*a, **k)
  snooze
  actor
end

#splice(in_fd, out_fd, nbytes) ⇒ Integer

Splices bytes from in_fd to out_fd. At least one of the given fds must be a pipe.

Parameters:

  • in_fd (Integer)

    fd to splice from

  • out_fd (Integer)

    fd to splice to

  • nbytes (Integer)

    number of bytes to splice

Returns:

  • (Integer)

    number of bytes spliced



551
552
553
554
# File 'ext/um/um_class.c', line 551

VALUE UM_splice(VALUE self, VALUE in_fd, VALUE out_fd, VALUE nbytes) {
  struct um *machine = um_get_machine(self);
  return um_splice(machine, NUM2INT(in_fd), NUM2INT(out_fd), NUM2UINT(nbytes));
}

#sqpoll_mode?bool

Returns the SQPOLL mode state.

Returns:

  • (bool)

    SQPOLL mode on/off



184
185
186
187
# File 'ext/um/um_class.c', line 184

VALUE UM_sqpoll_mode_p(VALUE self) {
  struct um *machine = um_get_machine(self);
  return machine->sqpoll_mode ? Qtrue : Qfalse;
}

#ssl_read(ssl, buf, maxlen) ⇒ Integer

Reads from the given SSL socket. This method should be used after first having called #ssl_set_bio.

Parameters:

  • ssl (OpenSSL::SSL::SSLSocket)

    ssl socket

  • buf (String, IO::Buffer)

    buffer

  • maxlen (Integer)

    maximum number of bytes to read

Returns:

  • (Integer)

    number of bytes read



1248
1249
1250
1251
1252
# File 'ext/um/um_class.c', line 1248

VALUE UM_ssl_read(VALUE self, VALUE ssl, VALUE buf, VALUE maxlen) {
  struct um *machine = um_get_machine(self);
  int ret = um_ssl_read(machine, ssl, buf, NUM2INT(maxlen));
  return INT2NUM(ret);
}

#ssl_set_bio(ssl) ⇒ UringMachine

Sets up the given ssl socket to use the machine for sending and receiving.

Parameters:

  • ssl (OpenSSL::SSL::SSLSocket)

    ssl socket

Returns:



1231
1232
1233
1234
1235
# File 'ext/um/um_class.c', line 1231

VALUE UM_ssl_set_bio(VALUE self, VALUE ssl) {
  struct um *machine = um_get_machine(self);
  um_ssl_set_bio(machine, ssl);
  return self;
}

#ssl_write(ssl, buf, len) ⇒ Integer

Writes to the given SSL socket. This method should be used after first having called #ssl_set_bio.

Parameters:

  • ssl (OpenSSL::SSL::SSLSocket)

    ssl socket

  • buf (String, IO::Buffer)

    buffer

  • len (Integer)

    number of bytes to write

Returns:

  • (Integer)

    number of bytes written



1265
1266
1267
1268
1269
# File 'ext/um/um_class.c', line 1265

VALUE UM_ssl_write(VALUE self, VALUE ssl, VALUE buf, VALUE len) {
  struct um *machine = um_get_machine(self);
  int ret = um_ssl_write(machine, ssl, buf, NUM2INT(len));
  return INT2NUM(ret);
}

#statx(fd, nil, flags, mask) ⇒ Hash #statx(dirfd, path, flags, mask) ⇒ Hash #statx(UM: :AT_FDCWD, path, flags, mask) ⇒ Hash

Overloads:

  • #statx(fd, nil, flags, mask) ⇒ Hash

    Returns:

    • (Hash)
  • #statx(dirfd, path, flags, mask) ⇒ Hash

    Returns:

    • (Hash)
  • #statx(UM: :AT_FDCWD, path, flags, mask) ⇒ Hash

    Returns:

    • (Hash)

Parameters:

  • dirfd (Integer)

    file or directory descriptor

  • path (String, nil)

    file path

  • flags (Integer)

    flags

  • mask (Integer)

    mask of information to return

Returns:

  • (hash)

    hash containing file information



532
533
534
535
# File 'ext/um/um_class.c', line 532

VALUE UM_statx(VALUE self, VALUE dirfd, VALUE path, VALUE flags, VALUE mask) {
  struct um *machine = um_get_machine(self);
  return um_statx(machine, NUM2INT(dirfd), path, NUM2INT(flags), NUM2UINT(mask));
}

#submitInteger

Submits any pending I/O operations that are not yet submitted.

Returns:

  • (Integer)

    number of I/O operations submitted



307
308
309
310
311
# File 'ext/um/um_class.c', line 307

VALUE UM_submit(VALUE self) {
  struct um *machine = um_get_machine(self);
  uint ret = um_submit(machine);
  return UINT2NUM(ret);
}

#switchany

Yields control to the next fiber in the runqueue. The current fiber will not be resumed unless it is scheduled again by some other fiber. The call to #yield will return the value with which the current fiber will be eventually scheduled. If resumed with an exception, that exception will be raised when the fiber is resumed.

Returns:

  • (any)

    resume value



284
285
286
287
288
289
290
# File 'ext/um/um_class.c', line 284

VALUE UM_switch(VALUE self) {
  struct um *machine = um_get_machine(self);

  VALUE ret = um_switch(machine);
  RAISE_IF_EXCEPTION(ret);
  return ret;
}

#synchronize(mutex) { ... } ⇒ any

Synchronizes access to the given mutex. The mutex is locked, the given block is executed and finally the mutex is unlocked.

Yields:

Parameters:

Returns:

  • (any)

    block return value



1025
1026
1027
1028
1029
# File 'ext/um/um_class.c', line 1025

VALUE UM_mutex_synchronize(VALUE self, VALUE mutex) {
  struct um *machine = um_get_machine(self);
  struct um_mutex *mutex_data = Mutex_data(mutex);
  return um_mutex_synchronize(machine, mutex_data);
}

#tcp_connect(host, port) ⇒ Integer

Creates and connects a TCP socket to the given host and port.

Parameters:

  • host (String)

    host IP address

  • port (Integer)

    TCP port

Returns:

  • (Integer)

    socket fd



241
242
243
244
245
# File 'lib/uringmachine.rb', line 241

def tcp_connect(host, port)
  fd = socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
  connect(fd, host, port)
  fd
end

#tcp_listen(host, port) ⇒ Integer

Creates, binds and sets up a TCP socket for listening on the given host and port.

Parameters:

  • host (String)

    host IP address

  • port (Integer)

    TCP port

Returns:

  • (Integer)

    socket fd



229
230
231
232
233
234
# File 'lib/uringmachine.rb', line 229

def tcp_listen(host, port)
  fd = socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
  bind(fd, host, port)
  listen(fd, UM::SOMAXCONN)
  fd
end

#tee(in_fd, out_fd, nbytes) ⇒ Integer

Duplicates bytes from in_fd to out_fd. At least one of the given fds must be a pipe.

Parameters:

  • in_fd (Integer)

    fd to copy from

  • out_fd (Integer)

    fd to copy to

  • nbytes (Integer)

    number of bytes to duplicate

Returns:

  • (Integer)

    number of bytes duplicated



570
571
572
573
# File 'ext/um/um_class.c', line 570

VALUE UM_tee(VALUE self, VALUE in_fd, VALUE out_fd, VALUE nbytes) {
  struct um *machine = um_get_machine(self);
  return um_tee(machine, NUM2INT(in_fd), NUM2INT(out_fd), NUM2UINT(nbytes));
}

#terminate(*fibers) ⇒ void

This method returns an undefined value.

Terminates the given fibers by scheduling them with a UM::Terminate exception. This method does not wait for the fibers to be done.

Parameters:

  • *fibers (Array<Fiber>)

    fibers to terminate



46
47
48
49
50
# File 'lib/uringmachine.rb', line 46

def terminate(*fibers)
  fibers = fibers.first if fibers.size == 1 && fibers.first.is_a?(Enumerable)

  fibers.each { schedule(it, TERMINATE_EXCEPTION) unless it.done? }
end

#test_mode=(value) ⇒ bool

Sets/resets test mode.

Parameters:

  • value (bool)

    test mode on/off

Returns:

  • (bool)

    test mode on/off



209
210
211
212
213
# File 'ext/um/um_class.c', line 209

VALUE UM_test_mode_set(VALUE self, VALUE value) {
  struct um *machine = um_get_machine(self);
  machine->test_mode = RTEST(value);
  return value;
}

#timeout(interval, exception) ⇒ any

Runs the given block, interrupting its execution if its runtime exceeds the given timeout interval (in seconds), raising the specified exception.

Parameters:

  • interval (Number)

    timeout interval in seconds

  • exception (any)

    timeout exception class or instance

Returns:

  • (any)

    block’s return value



345
346
347
348
# File 'ext/um/um_class.c', line 345

VALUE UM_timeout(VALUE self, VALUE interval, VALUE exception) {
  struct um *machine = um_get_machine(self);
  return um_timeout(machine, interval, exception);
}

#unshift(queue, value) ⇒ UM::Queue

Parameters:

  • queue (UM::Queue)

    queue

  • value (any)

    value

Returns:



1078
1079
1080
1081
1082
# File 'ext/um/um_class.c', line 1078

VALUE UM_queue_unshift(VALUE self, VALUE queue, VALUE value) {
  struct um *machine = um_get_machine(self);
  struct um_queue *que = Queue_data(queue);
  return um_queue_unshift(machine, que, value);
}

#waitid(idtype, id, options) ⇒ Array

Waits for a process to change state. The process to wait for can be specified as a pid or as a pidfd, according to the given idtype and id.

Returns:

  • (Array)

Parameters:

  • idtype (Integer)

    id type

  • id (Integer)

    id

  • options (Integer)

    options

Returns:

  • (Array<Integer>)

    pid status



1190
1191
1192
1193
# File 'ext/um/um_class.c', line 1190

VALUE UM_waitid(VALUE self, VALUE idtype, VALUE id, VALUE options) {
  struct um *machine = um_get_machine(self);
  return um_waitid(machine, NUM2INT(idtype), NUM2INT(id), NUM2INT(options));
}

#waitid_status(idtype, id, options) ⇒ Object

:nodoc:

This method depends on the availability of rb_process_status_new. See: github.com/ruby/ruby/pull/15213



1203
1204
1205
1206
# File 'ext/um/um_class.c', line 1203

VALUE UM_waitid_status(VALUE self, VALUE idtype, VALUE id, VALUE options) {
  struct um *machine = um_get_machine(self);
  return um_waitid_status(machine, NUM2INT(idtype), NUM2INT(id), NUM2INT(options));
}

#wakeupvoid

This method returns an undefined value.

Wakes up the machine in order to start processing its runqueue again. This method is normally called from another thread in order to resume processing of the runqueue when a machine is waiting for I/O completions.



298
299
300
301
# File 'ext/um/um_class.c', line 298

VALUE UM_wakeup(VALUE self) {
  struct um *machine = um_get_machine(self);
  return um_wakeup(machine);
}

#write(fd, buffer, len = nil, file_offset = nil) ⇒ Integer

Writes up to len bytes from the given buffer to the given fd. If len, is not given, the entire buffer length is used.

Parameters:

  • fd (Integer)

    file descriptor

  • buffer (String, IO::Buffer)

    buffer

  • len (Integer)

    maximum number of bytes to write

  • file_offset (Integer)

    optional file offset to write to

Returns:

  • (Integer)

    number of bytes written



443
444
445
446
447
448
449
450
451
452
453
454
455
# File 'ext/um/um_class.c', line 443

VALUE UM_write(int argc, VALUE *argv, VALUE self) {
  struct um *machine = um_get_machine(self);
  VALUE fd;
  VALUE buffer;
  VALUE len;
  VALUE file_offset;
  rb_scan_args(argc, argv, "22", &fd, &buffer, &len, &file_offset);

  size_t len_i = NIL_P(len) ? (size_t)-1 : NUM2UINT(len);
  __u64 file_offset_i = NIL_P(file_offset) ? (__u64)-1 : NUM2UINT(file_offset);

  return um_write(machine, NUM2INT(fd), buffer, len_i, file_offset_i);
}

#write_async(fd, buffer, len = nil, file_offset = nil) ⇒ String, IO::Buffer

Writes up to len bytes from the given buffer to the given fd. If len, is not given, the entire buffer length is used. This method submits the operation but does not wait for it to complete. This method may be used to improve performance in situations where the application does not care about whether the I/O operation succeeds or not.

Parameters:

  • fd (Integer)

    file descriptor

  • buffer (String, IO::Buffer)

    buffer

  • len (Integer)

    maximum number of bytes to write

  • file_offset (Integer)

    optional file offset to write to

Returns:

  • (String, IO::Buffer)

    buffer



502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'ext/um/um_class.c', line 502

VALUE UM_write_async(int argc, VALUE *argv, VALUE self) {
  struct um *machine = um_get_machine(self);
  VALUE fd;
  VALUE buffer;
  VALUE len;
  VALUE file_offset;
  rb_scan_args(argc, argv, "22", &fd, &buffer, &len, &file_offset);

  size_t len_i = NIL_P(len) ? (size_t)-1 : NUM2UINT(len);
  __u64 file_offset_i = NIL_P(file_offset) ? (__u64)-1 : NUM2UINT(file_offset);

  return um_write_async(machine, NUM2INT(fd), buffer, len_i, file_offset_i);
}

#writev(fd, *buffers, file_offset = nil) ⇒ Integer

Writes from the given buffers into the given fd. This method does not guarantee that all data will be written. The application code should check the return value which indicates the number of bytes written and potentially repeat the operation after adjusting the buffers accordingly. See also #sendv.

Parameters:

  • fd (Integer)

    file descriptor

  • *buffers (Array<String, IO::Buffer>)

    data buffers

  • file_offset (Integer)

    optional file offset to write to

Returns:

  • (Integer)

    number of bytes written



474
475
476
477
478
479
480
481
482
# File 'ext/um/um_class.c', line 474

VALUE UM_writev(int argc, VALUE *argv, VALUE self) {
  struct um *machine = um_get_machine(self);
  if (argc < 1)
    rb_raise(rb_eArgError, "wrong number of arguments (given 0, expected 1+)");
  int fd = NUM2INT(argv[0]);
  if (argc < 2) return INT2NUM(0);

  return um_writev(machine, fd, argc - 1, argv + 1);
}

#yieldany

Yields control to the next fiber in the runqueue. The current fiber will not be resumed unless it is scheduled again by some other fiber. The call to #yield will return the value with which the current fiber will be eventually scheduled.

Returns:

  • (any)

    resume value



268
269
270
271
272
273
274
# File 'ext/um/um_class.c', line 268

VALUE UM_yield(VALUE self) {
  struct um *machine = um_get_machine(self);

  VALUE ret = um_yield(machine);
  RAISE_IF_EXCEPTION(ret);
  return ret;
}