Class: UringMachine
- Inherits:
-
Object
- Object
- UringMachine
- Defined in:
- lib/uringmachine.rb,
lib/uringmachine/actor.rb,
lib/uringmachine/version.rb,
lib/uringmachine/dns_resolver.rb,
lib/uringmachine/fiber_scheduler.rb,
ext/um/um_class.c
Overview
A UringMachine instance provides an interface for performing I/O operations and automatically switching between fibers. A single UringMachine instance should be used for each thread.
Defined Under Namespace
Modules: FiberExtensions, ThreadExtensions Classes: Actor, AsyncOp, BlockingOperationThreadPool, DNSResolver, Error, FiberScheduler, IO, Mutex, Queue, Terminate
Constant Summary collapse
- TERMINATE_EXCEPTION =
UM::Terminate.new
- VERSION =
'0.32.0'
Class Method Summary collapse
-
.debug(str) ⇒ void
Prints the given string to STDERR.
-
.inotify_add_watch(fd, path, mask) ⇒ Integer
Adds a watch on the given inotify file descriptor.
-
.inotify_init ⇒ Integer
Creates an inotify file descriptor.
-
.kernel_version ⇒ Integer
Returns the kernel version.
-
.pidfd_open(pid) ⇒ Integer
Creates a file descriptor representing the given process pid.
-
.pidfd_send_signal(fd, sig) ⇒ Integer
Sends a signal to a pidfd.
-
.pipe ⇒ Array
Creates a pipe, returning the file descriptors for the read and write ends.
-
.pr_set_child_subreaper(vaule) ⇒ bool
Sets/unsets the “child subreaper” attribute of the calling process.
-
.socketpair(domain, type, protocol) ⇒ Array
Creates a pair of connected sockets, returning the two file descriptors.
Instance Method Summary collapse
-
#accept(server_fd) ⇒ Integer
Accepts an incoming TCP connection.
-
#accept_each(server_fd) {|connection_fd| ... } ⇒ void
Repeatedly accepts incoming TCP connections in a loop, yielding the connection file descriptors to the given block.
-
#accept_into_queue(server_fd, queue) ⇒ void
Repeatedly accepts incoming TCP connections in a loop, pushing the connection file descriptors into the given queue.
-
#await(*fibers) ⇒ Integer
Waits for the given fibers to terminate, without collecting their return values.
-
#bind(fd, host, port) ⇒ 0
Binds the given socket to the given host and port.
-
#close(fd) ⇒ 0
Closes the given file descriptor.
-
#close_async(fd) ⇒ Hash
Closes the given file descriptor.
-
#connect(fd, host, port) ⇒ 0
Connects the given socket to the given host and port.
-
#fiber_set ⇒ Object
Returns the set of running fibers.
-
#file_watch(root, mask) ⇒ void
Watches for filesystem events using inotify in an infinite loop, yielding incoming events to the given block.
-
#fsync(fd) ⇒ 0
Flushes all modified file data to the storage device.
-
#getsockopt(fd, level, opt) ⇒ Integer
Returns the value of a socket option.
-
#initialize(size: 4096, sqpoll: false, sidecar: false) ⇒ void
constructor
Initializes a new UringMachine instance with the given options.
-
#inotify_get_events(fd) ⇒ Array<Hash>
Waits for and returns one or more events on the given inotify file descriptor.
-
#io(target, mode = nil) ⇒ UringMachine::IO
call-seq: machine.io(fd, mode = nil) -> conn machine.io(fd, mode = nil) { |conn| }.
-
#join(*fibers) ⇒ Object
Waits for the given fibers to terminate, returning the return value for each given fiber.
-
#listen(fd, backlog) ⇒ 0
Starts listening for incoming connections on the given socket.
-
#mark(mark) ⇒ Object
:nodoc:.
-
#metrics ⇒ Hash
Returns a hash with different metrics about the functioning of the UringMachine instance.
-
#open(pathname, flags) ⇒ Integer
Opens a file using the given pathname and flags.
-
#pending_fibers ⇒ Set
Returns a set containing all fibers in pending state (i.e. waiting for an operation to complete.).
-
#periodically(interval) ⇒ void
Runs the given block at regular time intervals in an infinite loop.
-
#poll(fd, mask) ⇒ Integer
Waits for readiness of the given file descriptor according to the given event mask.
-
#pop(queue) ⇒ any
removes a value from the tail of the given queue.
-
#prep_timeout(interval) ⇒ UM::AsyncOp
Prepares an asynchronous timeout instance.
-
#profile_mode=(value) ⇒ bool
Sets/resets profile mode.
-
#profile_mode? ⇒ bool
Returns the profile mode state.
-
#push(queue, value) ⇒ UM::Queue
Pushes a value to the tail of the given queue.
-
#read(fd, buffer, maxlen, buffer_offset = nil, file_offset = nil) ⇒ Integer
Reads up to
maxlenbytes from the givenfdinto the given buffer. -
#read_each(fd, bgid) {|data| ... } ⇒ void
Reads repeatedly from the given
fdusing the given buffer group id. -
#recv(fd, buffer, maxlen, flags) ⇒ Integer
Receives data from the given socket.
-
#recv_each(fd, bgid, flags) {|data| ... } ⇒ void
Repeatedlty receives data from the given socket in an infinite loop using the given buffer group id.
-
#recv_fd(sock_fd) ⇒ Integer
Receives a file descriptor over the given socket.
-
#resolve(hostname, type = :A) ⇒ String
Resolves a hostname to an IP address by performing a DNS query.
-
#run(fiber, &block) ⇒ Fiber
Runs the given block in the given fiber.
-
#schedule(fiber, value) ⇒ UringMachine
Schedules the given fiber by adding it to the runqueue.
-
#select(read_fds, write_fds, except_fds) ⇒ Array
Waits for readyness of at least one fd for read, write or exception condition from the given fds.
-
#send(fd, buffer, len, flags) ⇒ Integer
Sends data on the given socket.
-
#send_bundle(*args) ⇒ Object
Sends data on the given socket from the given buffers using a registered buffer group.
-
#send_fd(sock_fd, fd) ⇒ Integer
Sends the given file descriptor over the given socket.
-
#sendv(*args) ⇒ Object
Sends data on the given socket from the given buffers.
-
#setsockopt(fd, level, opt, value) ⇒ 0
Sets the value of a socket option.
-
#setup_buffer_ring(size, count) ⇒ Integer
Creates a buffer group (buffer ring) with the given buffer size and buffer count.
-
#pop(queue) ⇒ any
removes a value from the head of the given queue.
-
#shutdown(fd, how) ⇒ 0
Shuts down a socket for sending and/or receiving.
-
#shutdown_async(fd, how) ⇒ 0
Shuts down a socket for sending and/or receiving.
-
#sidecar_mode? ⇒ bool
Returns the sidecar mode state.
-
#sidecar_start ⇒ UringMachine
Starts sidecar mode.
-
#sidecar_stop ⇒ UringMachine
Stops sidecar mode.
-
#size ⇒ Integer
Returns the SQ (submission queue) size.
-
#sleep(duration) ⇒ void
Puts the current fiber to sleep for the given time duration (in seconds), yielding control to the next fiber in the runqueue.
-
#snooze ⇒ UringMachine
Adds the current fiber to the end of the runqueue and yields control to the next fiber in the runqueue.
-
#socket(domain, type, protocol, flags) ⇒ Integer
Creates a socket.
-
#spin(value = nil, klass = Fiber, &block) ⇒ Fiber
Creates a new fiber and schedules it to be ran.
- #spin_actor(mod, *a, **k) ⇒ Object
- #spin_thread_actor(mod, *a, **k) ⇒ Object
-
#splice(in_fd, out_fd, nbytes) ⇒ Integer
Splices bytes from in_fd to out_fd.
-
#sqpoll_mode? ⇒ bool
Returns the SQPOLL mode state.
-
#ssl_read(ssl, buf, maxlen) ⇒ Integer
Reads from the given SSL socket.
-
#ssl_set_bio(ssl) ⇒ UringMachine
Sets up the given ssl socket to use the machine for sending and receiving.
-
#ssl_write(ssl, buf, len) ⇒ Integer
Writes to the given SSL socket.
-
#statx(dirfd, path, flags, mask) ⇒ hash
Returns information about a file.
-
#submit ⇒ Integer
Submits any pending I/O operations that are not yet submitted.
-
#switch ⇒ any
Yields control to the next fiber in the runqueue.
-
#synchronize(mutex) { ... } ⇒ any
Synchronizes access to the given mutex.
-
#tcp_connect(host, port) ⇒ Integer
Creates and connects a TCP socket to the given host and port.
-
#tcp_listen(host, port) ⇒ Integer
Creates, binds and sets up a TCP socket for listening on the given host and port.
-
#tee(in_fd, out_fd, nbytes) ⇒ Integer
Duplicates bytes from in_fd to out_fd.
-
#terminate(*fibers) ⇒ void
Terminates the given fibers by scheduling them with a
UM::Terminateexception. -
#test_mode=(value) ⇒ bool
Sets/resets test mode.
-
#timeout(interval, exception) ⇒ any
Runs the given block, interrupting its execution if its runtime exceeds the given timeout interval (in seconds), raising the specified exception.
-
#unshift(queue, value) ⇒ UM::Queue
Pushes a value to the head of the given queue.
-
#waitid(idtype, id, options) ⇒ Array
Waits for a process to change state.
-
#waitid_status(idtype, id, options) ⇒ Object
:nodoc:.
-
#wakeup ⇒ void
Wakes up the machine in order to start processing its runqueue again.
-
#write(fd, buffer, len = nil, file_offset = nil) ⇒ Integer
Writes up to
lenbytes from the given buffer to the givenfd. -
#write_async(fd, buffer, len = nil, file_offset = nil) ⇒ String, IO::Buffer
Writes up to
lenbytes from the given buffer to the givenfd. -
#writev(fd, *buffers, file_offset = nil) ⇒ Integer
Writes from the given buffers into the given fd.
-
#yield ⇒ any
Yields control to the next fiber in the runqueue.
Constructor Details
#initialize(size: 4096, sqpoll: false, sidecar: false) ⇒ void
Initializes a new UringMachine instance with the given options.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'ext/um/um_class.c', line 110 VALUE UM_initialize(int argc, VALUE *argv, VALUE self) { static ID kwargs_ids[3]; struct um *machine = RTYPEDDATA_DATA(self); VALUE opts, kwargs[3] = {Qnil, Qnil, Qnil}; if (!kwargs_ids[0]) { kwargs_ids[0] = rb_intern_const("size"); kwargs_ids[1] = rb_intern_const("sqpoll"); kwargs_ids[2] = rb_intern_const("sidecar"); } rb_scan_args(argc, argv, "0:", &opts); if (!NIL_P(opts)) { rb_get_kwargs(opts, kwargs_ids, 0, 3, kwargs); } uint entries_i = TYPE(kwargs[0]) == T_FIXNUM ? NUM2UINT(kwargs[0]) : 0; uint sqpoll_timeout_msec = get_sqpoll_timeout_msec(kwargs[1]); um_setup(self, machine, entries_i, sqpoll_timeout_msec, RTEST(kwargs[2])); return self; } |
Class Method Details
.debug(str) ⇒ void
This method returns an undefined value.
Prints the given string to STDERR.
1377 1378 1379 1380 |
# File 'ext/um/um_class.c', line 1377 VALUE UM_debug(VALUE self, VALUE str) { fprintf(stderr, "%s\n", StringValueCStr(str)); return Qnil; } |
.inotify_add_watch(fd, path, mask) ⇒ Integer
Adds a watch on the given inotify file descriptor.
1412 1413 1414 1415 1416 1417 1418 1419 |
# File 'ext/um/um_class.c', line 1412 VALUE UM_inotify_add_watch(VALUE self, VALUE fd, VALUE path, VALUE mask) { int ret = inotify_add_watch(NUM2INT(fd), StringValueCStr(path), NUM2UINT(mask)); if (ret == -1) { int e = errno; rb_syserr_fail(e, strerror(e)); } return INT2NUM(ret); } |
.inotify_init ⇒ Integer
Creates an inotify file descriptor.
1391 1392 1393 1394 1395 1396 1397 1398 |
# File 'ext/um/um_class.c', line 1391 VALUE UM_inotify_init(VALUE self) { int fd = inotify_init(); if (fd == -1) { int e = errno; rb_syserr_fail(e, strerror(e)); } return INT2NUM(fd); } |
.kernel_version ⇒ Integer
Returns the kernel version.
1365 1366 1367 |
# File 'ext/um/um_class.c', line 1365 VALUE UM_kernel_version(VALUE self) { return INT2NUM(UM_KERNEL_VERSION); } |
.pidfd_open(pid) ⇒ Integer
Creates a file descriptor representing the given process pid. The file descriptor can then be used with methods such as #waitid or .pidfd_open.
1325 1326 1327 1328 1329 1330 1331 1332 1333 |
# File 'ext/um/um_class.c', line 1325 VALUE UM_pidfd_open(VALUE self, VALUE pid) { int fd = syscall(SYS_pidfd_open, NUM2INT(pid), 0); if (fd == -1) { int e = errno; rb_syserr_fail(e, strerror(e)); } return INT2NUM(fd); } |
.pidfd_send_signal(fd, sig) ⇒ Integer
Sends a signal to a pidfd.
1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 |
# File 'ext/um/um_class.c', line 1346 VALUE UM_pidfd_send_signal(VALUE self, VALUE fd, VALUE sig) { int ret = syscall( SYS_pidfd_send_signal, NUM2INT(fd), NUM2INT(sig), NULL, 0 ); if (ret) { int e = errno; rb_syserr_fail(e, strerror(e)); } return fd; } |
.pipe ⇒ Array
Creates a pipe, returning the file descriptors for the read and write ends.
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 |
# File 'ext/um/um_class.c', line 1280 VALUE UM_pipe(VALUE self) { int fds[2]; int ret = pipe(fds); if (ret) { int e = errno; rb_syserr_fail(e, strerror(e)); } return rb_ary_new_from_args(2, INT2NUM(fds[0]), INT2NUM(fds[1])); } |
.pr_set_child_subreaper(vaule) ⇒ bool
Sets/unsets the “child subreaper” attribute of the calling process.
1472 1473 1474 1475 1476 1477 1478 1479 1480 |
# File 'ext/um/um_class.c', line 1472 VALUE UM_pr_set_child_subreaper(VALUE self, VALUE set) { int ret = prctl(PR_SET_CHILD_SUBREAPER, RTEST(set) ? 1 : 0); if (ret) { int e = errno; rb_syserr_fail(e, strerror(e)); } return set; } |
.socketpair(domain, type, protocol) ⇒ Array
Creates a pair of connected sockets, returning the two file descriptors.
1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 |
# File 'ext/um/um_class.c', line 1303 VALUE UM_socketpair(VALUE self, VALUE domain, VALUE type, VALUE protocol) { int fds[2]; int ret = socketpair(NUM2INT(domain), NUM2INT(type), NUM2INT(protocol), fds); if (ret) { int e = errno; rb_syserr_fail(e, strerror(e)); } return rb_ary_new_from_args(2, INT2NUM(fds[0]), INT2NUM(fds[1])); } |
Instance Method Details
#accept(server_fd) ⇒ Integer
Accepts an incoming TCP connection.
636 637 638 639 |
# File 'ext/um/um_class.c', line 636 VALUE UM_accept(VALUE self, VALUE server_fd) { struct um *machine = um_get_machine(self); return um_accept(machine, NUM2INT(server_fd)); } |
#accept_each(server_fd) {|connection_fd| ... } ⇒ void
This method returns an undefined value.
Repeatedly accepts incoming TCP connections in a loop, yielding the connection file descriptors to the given block.
653 654 655 656 |
# File 'ext/um/um_class.c', line 653 VALUE UM_accept_each(VALUE self, VALUE server_fd) { struct um *machine = um_get_machine(self); return um_accept_each(machine, NUM2INT(server_fd)); } |
#accept_into_queue(server_fd, queue) ⇒ void
This method returns an undefined value.
Repeatedly accepts incoming TCP connections in a loop, pushing the connection file descriptors into the given queue.
671 672 673 674 |
# File 'ext/um/um_class.c', line 671 VALUE UM_accept_into_queue(VALUE self, VALUE server_fd, VALUE queue) { struct um *machine = um_get_machine(self); return um_accept_into_queue(machine, NUM2INT(server_fd), queue); } |
#await(*fibers) ⇒ Integer
Waits for the given fibers to terminate, without collecting their return values.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/uringmachine.rb', line 124 def await(*fibers) queue = Fiber.current.mailbox if fibers.size == 1 first = fibers.first case first when Enumerable fibers = first when Fiber first = proc_spin(first) if first.is_a?(Proc) if !first.done? first.add_done_listener(queue) self.shift(queue) end return 1 end end pending = nil fibers.each do |f| f = proc_spin(f) if f.is_a?(Proc) if !f.done? (pending ||= []) << f f.add_done_listener(queue) end end if pending while !pending.empty? f = self.shift(queue) pending.delete(f) end end fibers.count end |
#bind(fd, host, port) ⇒ 0
Binds the given socket to the given host and port.
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 |
# File 'ext/um/um_class.c', line 923 VALUE UM_bind(VALUE self, VALUE fd, VALUE host, VALUE port) { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(StringValueCStr(host)); addr.sin_port = htons(NUM2INT(port)); #ifdef HAVE_IO_URING_PREP_BIND struct um *machine = um_get_machine(self); return um_bind(machine, NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr)); #else int res = bind(NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr)); if (res) rb_syserr_fail(errno, strerror(errno)); return INT2NUM(0); #endif } |
#close(fd) ⇒ 0
Closes the given file descriptor.
602 603 604 605 |
# File 'ext/um/um_class.c', line 602 VALUE UM_close(VALUE self, VALUE fd) { struct um *machine = um_get_machine(self); return um_close(machine, NUM2INT(fd)); } |
#close_async(fd) ⇒ Hash
Closes the given file descriptor. This method submits the operation but does not wait for it to complete. This method may be used to improve performance in cases where the application des not care whether it succeeds or not.
620 621 622 623 |
# File 'ext/um/um_class.c', line 620 VALUE UM_close_async(VALUE self, VALUE fd) { struct um *machine = um_get_machine(self); return um_close_async(machine, NUM2INT(fd)); } |
#connect(fd, host, port) ⇒ 0
Connects the given socket to the given host and port.
777 778 779 780 781 782 783 784 785 786 787 |
# File 'ext/um/um_class.c', line 777 VALUE UM_connect(VALUE self, VALUE fd, VALUE host, VALUE port) { struct um *machine = um_get_machine(self); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(StringValueCStr(host)); addr.sin_port = htons(NUM2INT(port)); return um_connect(machine, NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr)); } |
#fiber_set ⇒ Object
Returns the set of running fibers.
return [Set]
17 18 19 |
# File 'lib/uringmachine.rb', line 17 def fiber_set @fiber_set ||= Set.new end |
#file_watch(root, mask) ⇒ void
This method returns an undefined value.
Watches for filesystem events using inotify in an infinite loop, yielding incoming events to the given block.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/uringmachine.rb', line 175 def file_watch(root, mask) fd = UM.inotify_init wd_map = {} recursive_file_watch(fd, root, wd_map, mask) while true events = inotify_get_events(fd) events.each do |event| if event[:mask] | UM::IN_IGNORED == UM::IN_IGNORED wd_map.delete(event[:wd]) next end transformed_event = transform_file_watch_event(event, wd_map) if event[:mask] == UM::IN_CREATE | UM::IN_ISDIR recursive_file_watch(fd, transformed_event[:fn], wd_map, mask) end yield transformed_event end end ensure close_async(fd) end |
#fsync(fd) ⇒ 0
Flushes all modified file data to the storage device.
586 587 588 589 |
# File 'ext/um/um_class.c', line 586 VALUE UM_fsync(VALUE self, VALUE fd) { struct um *machine = um_get_machine(self); return um_fsync(machine, NUM2INT(fd)); } |
#getsockopt(fd, level, opt) ⇒ Integer
Returns the value of a socket option.
989 990 991 992 |
# File 'ext/um/um_class.c', line 989 VALUE UM_getsockopt(VALUE self, VALUE fd, VALUE level, VALUE opt) { struct um *machine = um_get_machine(self); return um_getsockopt(machine, NUM2INT(fd), NUM2INT(level), NUM2INT(opt)); } |
#inotify_get_events(fd) ⇒ Array<Hash>
Waits for and returns one or more events on the given inotify file descriptor. Each event is returned as a hash containing the watch descriptor, the file name, and the event mask.
1453 1454 1455 1456 1457 1458 1459 1460 |
# File 'ext/um/um_class.c', line 1453 VALUE UM_inotify_get_events(VALUE self, VALUE fd) { struct um *machine = um_get_machine(self); char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event)))); size_t ret = um_read_raw(machine, NUM2INT(fd), buf, sizeof(buf)); return inotify_get_events(buf, ret); } |
#io(target, mode = nil) ⇒ UringMachine::IO
call-seq:
machine.io(fd, mode = nil) -> conn
machine.io(fd, mode = nil) { |conn| }
Creates an UM::IO for the given target (fd or SSLSocket). The mode indicates the type of target and how it is read from:
-
:fd - read from the given fd using the buffer pool (default mode)
-
:socket - receive from the given socket fd using the buffer pool
-
:ssl - read from the given SSL socket
If a block is given, the block will be called with the IO instance as argument and the method will return the block’s return value.
214 215 216 217 218 219 220 221 |
# File 'lib/uringmachine.rb', line 214 def io(target, mode = nil) conn = UM::IO.new(self, target, mode) return conn if !block_given? res = yield(conn) conn.clear res end |
#join(*fibers) ⇒ Object
Waits for the given fibers to terminate, returning the return value for each given fiber. This method also accepts procs instead of fibers. When a proc is given, it is ran in a separate fiber which will be joined.
machine.join(
-> { machine.sleep(0.01); :f1 },
-> { machine.sleep(0.02); :f2 },
-> { machine.sleep(0.03); :f3 }
) #=> [:f1, :f2, :f3]
values of the given fibers
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/uringmachine.rb', line 77 def join(*fibers) queue = Fiber.current.mailbox if fibers.size == 1 first = fibers.first case first when Enumerable fibers = first when Fiber first = proc_spin(first) if first.is_a?(Proc) if !first.done? first.add_done_listener(queue) self.shift(queue) end return first.result end end results = {} pending = nil fibers.each do |f| f = proc_spin(f) if f.is_a?(Proc) if f.done? results[f] = f.result else results[f] = nil (pending ||= []) << f f.add_done_listener(queue) end end if pending while !pending.empty? f = self.shift(queue) pending.delete(f) results[f] = f.result end end values = results.values fibers.size == 1 ? values.first : values end |
#listen(fd, backlog) ⇒ 0
Starts listening for incoming connections on the given socket.
953 954 955 956 957 958 959 960 961 962 963 |
# File 'ext/um/um_class.c', line 953 VALUE UM_listen(VALUE self, VALUE fd, VALUE backlog) { #ifdef HAVE_IO_URING_PREP_LISTEN struct um *machine = um_get_machine(self); return um_listen(machine, NUM2INT(fd), NUM2INT(backlog)); #else int res = listen(NUM2INT(fd), NUM2INT(backlog)); if (res) rb_syserr_fail(errno, strerror(errno)); return INT2NUM(0); #endif } |
#mark(mark) ⇒ Object
:nodoc:
155 156 157 158 159 |
# File 'ext/um/um_class.c', line 155 VALUE UM_mark_m(VALUE self, VALUE mark) { struct um *machine = um_get_machine(self); machine->mark = NUM2UINT(mark); return self; } |
#metrics ⇒ Hash
Returns a hash with different metrics about the functioning of the UringMachine instance.
166 167 168 169 |
# File 'ext/um/um_class.c', line 166 VALUE UM_metrics(VALUE self) { struct um *machine = um_get_machine(self); return um_metrics(machine, &machine->metrics); } |
#open(pathname, flags) ⇒ Integer #open(pathname, flags) {|fd| ... } ⇒ Integer
Opens a file using the given pathname and flags. If a block is given, the file descriptor is passed to the block, and the file is automatically closed when the block returns.
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 |
# File 'ext/um/um_class.c', line 1127 VALUE UM_open(VALUE self, VALUE pathname, VALUE flags) { struct um *machine = um_get_machine(self); // TODO: take optional perm (mode) arg VALUE fd = um_open(machine, pathname, NUM2INT(flags), 0666); if (rb_block_given_p()) { struct um_open_ctx ctx = { self, fd }; return rb_ensure(rb_yield, fd, UM_open_complete, (VALUE)&ctx); } else return fd; } |
#pending_fibers ⇒ Set
Returns a set containing all fibers in pending state (i.e. waiting for an operation to complete.)
318 319 320 321 |
# File 'ext/um/um_class.c', line 318 VALUE UM_pending_fibers(VALUE self) { struct um *machine = um_get_machine(self); return machine->pending_fibers; } |
#periodically(interval) ⇒ void
This method returns an undefined value.
Runs the given block at regular time intervals in an infinite loop.
370 371 372 373 |
# File 'ext/um/um_class.c', line 370 VALUE UM_periodically(VALUE self, VALUE interval) { struct um *machine = um_get_machine(self); return um_periodically(machine, NUM2DBL(interval)); } |
#poll(fd, mask) ⇒ Integer
Waits for readiness of the given file descriptor according to the given event mask.
1152 1153 1154 1155 |
# File 'ext/um/um_class.c', line 1152 VALUE UM_poll(VALUE self, VALUE fd, VALUE mask) { struct um *machine = um_get_machine(self); return um_poll(machine, NUM2INT(fd), NUM2UINT(mask)); } |
#pop(queue) ⇒ any
removes a value from the tail of the given queue.
1060 1061 1062 1063 1064 |
# File 'ext/um/um_class.c', line 1060 VALUE UM_queue_pop(VALUE self, VALUE queue) { struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_pop(machine, que); } |
#prep_timeout(interval) ⇒ UM::AsyncOp
Prepares an asynchronous timeout instance.
1218 1219 1220 1221 |
# File 'ext/um/um_class.c', line 1218 VALUE UM_prep_timeout(VALUE self, VALUE interval) { struct um *machine = um_get_machine(self); return um_prep_timeout(machine, NUM2DBL(interval)); } |
#profile_mode=(value) ⇒ bool
Sets/resets profile mode.
194 195 196 197 198 199 200 201 202 |
# File 'ext/um/um_class.c', line 194 VALUE UM_profile_mode_set(VALUE self, VALUE value) { struct um *machine = um_get_machine(self); machine->profile_mode = RTEST(value); if (machine->profile_mode) { machine->metrics.time_total_wait = 0.0; machine->metrics.time_last_cpu = machine->metrics.time_first_cpu = um_get_time_cpu(); } return value; } |
#profile_mode? ⇒ bool
Returns the profile mode state.
175 176 177 178 |
# File 'ext/um/um_class.c', line 175 VALUE UM_profile_mode_p(VALUE self) { struct um *machine = um_get_machine(self); return machine->profile_mode ? Qtrue : Qfalse; } |
#push(queue, value) ⇒ UM::Queue
Pushes a value to the tail of the given queue.
1043 1044 1045 1046 1047 |
# File 'ext/um/um_class.c', line 1043 VALUE UM_queue_push(VALUE self, VALUE queue, VALUE value) { struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_push(machine, que, value); } |
#read(fd, buffer, maxlen, buffer_offset = nil, file_offset = nil) ⇒ Integer
Reads up to maxlen bytes from the given fd into the given buffer. The optional buffer_offset parameter determines the position in the buffer into which the data will be read. A negative buffer_offset denotes a position relative to the end of the buffer, e.g. a value of -1 means the data will be appended to the buffer.
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'ext/um/um_class.c', line 394 VALUE UM_read(int argc, VALUE *argv, VALUE self) { struct um *machine = um_get_machine(self); VALUE fd; VALUE buffer; VALUE maxlen; VALUE buffer_offset; VALUE file_offset; rb_scan_args(argc, argv, "32", &fd, &buffer, &maxlen, &buffer_offset, &file_offset); ssize_t maxlen_i = NIL_P(maxlen) ? -1 : NUM2INT(maxlen); ssize_t buffer_offset_i = NIL_P(buffer_offset) ? 0 : NUM2INT(buffer_offset); __u64 file_offset_i = NIL_P(file_offset) ? (__u64)-1 : NUM2UINT(file_offset); return um_read(machine, NUM2INT(fd), buffer, maxlen_i, buffer_offset_i, file_offset_i); } |
#read_each(fd, bgid) {|data| ... } ⇒ void
This method returns an undefined value.
Reads repeatedly from the given fd using the given buffer group id. The buffer group should have been previously setup using #setup_buffer_ring. Read data is yielded in an infinite loop to the given block.
423 424 425 426 |
# File 'ext/um/um_class.c', line 423 VALUE UM_read_each(VALUE self, VALUE fd, VALUE bgid) { struct um *machine = um_get_machine(self); return um_read_each(machine, NUM2INT(fd), NUM2INT(bgid)); } |
#recv(fd, buffer, maxlen, flags) ⇒ Integer
Receives data from the given socket.
885 886 887 888 |
# File 'ext/um/um_class.c', line 885 VALUE UM_recv(VALUE self, VALUE fd, VALUE buffer, VALUE maxlen, VALUE flags) { struct um *machine = um_get_machine(self); return um_recv(machine, NUM2INT(fd), buffer, NUM2INT(maxlen), NUM2INT(flags)); } |
#recv_each(fd, bgid, flags) {|data| ... } ⇒ void
This method returns an undefined value.
Repeatedlty receives data from the given socket in an infinite loop using the given buffer group id. The buffer group should have been previously setup using #setup_buffer_ring.
905 906 907 908 |
# File 'ext/um/um_class.c', line 905 VALUE UM_recv_each(VALUE self, VALUE fd, VALUE bgid, VALUE flags) { struct um *machine = um_get_machine(self); return um_recv_each(machine, NUM2INT(fd), NUM2INT(bgid), NUM2INT(flags)); } |
#recv_fd(sock_fd) ⇒ Integer
Receives a file descriptor over the given socket.
759 760 761 762 |
# File 'ext/um/um_class.c', line 759 VALUE UM_recv_fd(VALUE self, VALUE sock_fd) { struct um *machine = um_get_machine(self); return um_recv_fd(machine, NUM2INT(sock_fd)); } |
#resolve(hostname, type = :A) ⇒ String
Resolves a hostname to an IP address by performing a DNS query.
164 165 166 167 |
# File 'lib/uringmachine.rb', line 164 def resolve(hostname, type = :A) @resolver ||= DNSResolver.new(self) @resolver.resolve(hostname, type) end |
#run(fiber, &block) ⇒ Fiber
Runs the given block in the given fiber. This method is used to run fibers indirectly.
58 59 60 61 62 63 |
# File 'lib/uringmachine.rb', line 58 def run(fiber, &block) run_block_in_fiber(block, fiber, nil) self.schedule(fiber, nil) fiber_set << fiber fiber end |
#schedule(fiber, value) ⇒ UringMachine
Schedules the given fiber by adding it to the runqueue. The fiber will be resumed with the given value.
330 331 332 333 334 |
# File 'ext/um/um_class.c', line 330 VALUE UM_schedule(VALUE self, VALUE fiber, VALUE value) { struct um *machine = um_get_machine(self); um_schedule(machine, fiber, value); return self; } |
#select(read_fds, write_fds, except_fds) ⇒ Array
Waits for readyness of at least one fd for read, write or exception condition from the given fds. This method provides a similar interface to ‘IO#select`.
1171 1172 1173 1174 |
# File 'ext/um/um_class.c', line 1171 VALUE UM_select(VALUE self, VALUE read_fds, VALUE write_fds, VALUE except_fds) { struct um *machine = um_get_machine(self); return um_select(machine, read_fds, write_fds, except_fds); } |
#send(fd, buffer, len, flags) ⇒ Integer
Sends data on the given socket. This method is not guaranteed to send all of the data in the buffer, unless UM::MSG_WAITALL is specified in the flags mask.
805 806 807 808 |
# File 'ext/um/um_class.c', line 805 VALUE UM_send(VALUE self, VALUE fd, VALUE buffer, VALUE len, VALUE flags) { struct um *machine = um_get_machine(self); return um_send(machine, NUM2INT(fd), buffer, NUM2INT(len), NUM2INT(flags)); } |
#send_bundle(fd, bgid, *buffers) ⇒ Integer #send_bundle(fd, bgid, *buffers) ⇒ Object
Sends data on the given socket from the given buffers using a registered buffer group. The buffer group should have been previously registered using #setup_buffer_ring.
855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 |
# File 'ext/um/um_class.c', line 855 VALUE UM_send_bundle(int argc, VALUE *argv, VALUE self) { struct um *machine = um_get_machine(self); VALUE fd; VALUE bgid; VALUE strings; rb_scan_args(argc, argv, "2*", &fd, &bgid, &strings); if (RARRAY_LEN(strings) == 1) { VALUE first = rb_ary_entry(strings, 0); if (TYPE(first) == T_ARRAY) strings = first; } return um_send_bundle(machine, NUM2INT(fd), NUM2INT(bgid), strings); } |
#send_fd(sock_fd, fd) ⇒ Integer
Sends the given file descriptor over the given socket.
743 744 745 746 |
# File 'ext/um/um_class.c', line 743 VALUE UM_send_fd(VALUE self, VALUE sock_fd, VALUE fd) { struct um *machine = um_get_machine(self); return um_send_fd(machine, NUM2INT(sock_fd), NUM2INT(fd)); } |
#sendv(fd, *buffers) ⇒ Integer #sendv(fd, *buffers) ⇒ Object
Sends data on the given socket from the given buffers. This method is only available on Linux kernel >= 6.17. This method is guaranteed to send
824 825 826 827 828 829 830 831 832 833 834 835 836 |
# File 'ext/um/um_class.c', line 824 VALUE UM_sendv(int argc, VALUE *argv, VALUE self) { struct um *machine = um_get_machine(self); if (argc < 1) rb_raise(rb_eArgError, "wrong number of arguments (given 0, expected 1+)"); int fd = NUM2INT(argv[0]); if (argc < 2) return INT2NUM(0); #ifdef HAVE_IO_URING_SEND_VECTORIZED return um_sendv(machine, fd, argc - 1, argv + 1); #else return um_writev(machine, fd, argc - 1, argv + 1); #endif } |
#setsockopt(fd, level, opt, value) ⇒ 0
Sets the value of a socket option.
1008 1009 1010 1011 |
# File 'ext/um/um_class.c', line 1008 VALUE UM_setsockopt(VALUE self, VALUE fd, VALUE level, VALUE opt, VALUE value) { struct um *machine = um_get_machine(self); return um_setsockopt(machine, NUM2INT(fd), NUM2INT(level), NUM2INT(opt), numeric_value(value)); } |
#setup_buffer_ring(size, count) ⇒ Integer
Creates a buffer group (buffer ring) with the given buffer size and buffer count.
138 139 140 141 142 |
# File 'ext/um/um_class.c', line 138 VALUE UM_setup_buffer_ring(VALUE self, VALUE size, VALUE count) { struct um *machine = um_get_machine(self); int bgid = um_setup_buffer_ring(machine, NUM2UINT(size), NUM2UINT(count)); return INT2NUM(bgid); } |
#pop(queue) ⇒ any
removes a value from the head of the given queue.
1095 1096 1097 1098 1099 |
# File 'ext/um/um_class.c', line 1095 VALUE UM_queue_shift(VALUE self, VALUE queue) { struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_shift(machine, que); } |
#shutdown(fd, how) ⇒ 0
Shuts down a socket for sending and/or receiving.
707 708 709 710 |
# File 'ext/um/um_class.c', line 707 VALUE UM_shutdown(VALUE self, VALUE fd, VALUE how) { struct um *machine = um_get_machine(self); return um_shutdown(machine, NUM2INT(fd), NUM2INT(how)); } |
#shutdown_async(fd, how) ⇒ 0
Shuts down a socket for sending and/or receiving. This method may be used to improve performance in situations where the application does not care about whether the operation succeeds or not.
726 727 728 729 |
# File 'ext/um/um_class.c', line 726 VALUE UM_shutdown_async(VALUE self, VALUE fd, VALUE how) { struct um *machine = um_get_machine(self); return um_shutdown_async(machine, NUM2INT(fd), NUM2INT(how)); } |
#sidecar_mode? ⇒ bool
Returns the sidecar mode state.
219 220 221 222 |
# File 'ext/um/um_class.c', line 219 VALUE UM_sidecar_mode_p(VALUE self) { struct um *machine = um_get_machine(self); return machine->sidecar_mode ? Qtrue : Qfalse; } |
#sidecar_start ⇒ UringMachine
Starts sidecar mode.
228 229 230 231 232 |
# File 'ext/um/um_class.c', line 228 VALUE UM_sidecar_start(VALUE self) { struct um *machine = um_get_machine(self); um_sidecar_setup(machine); return self; } |
#sidecar_stop ⇒ UringMachine
Stops sidecar mode.
238 239 240 241 242 |
# File 'ext/um/um_class.c', line 238 VALUE UM_sidecar_stop(VALUE self) { struct um *machine = um_get_machine(self); um_sidecar_teardown(machine); return self; } |
#size ⇒ Integer
Returns the SQ (submission queue) size.
148 149 150 151 |
# File 'ext/um/um_class.c', line 148 VALUE UM_size(VALUE self) { struct um *machine = um_get_machine(self); return UINT2NUM(machine->size); } |
#sleep(duration) ⇒ void
This method returns an undefined value.
Puts the current fiber to sleep for the given time duration (in seconds), yielding control to the next fiber in the runqueue.
358 359 360 361 |
# File 'ext/um/um_class.c', line 358 VALUE UM_sleep(VALUE self, VALUE duration) { struct um *machine = um_get_machine(self); return um_sleep(machine, NUM2DBL(duration)); } |
#snooze ⇒ UringMachine
Adds the current fiber to the end of the runqueue and yields control to the next fiber in the runqueue. This method is usually used to yield control while performing CPU-intensive work in order not starve other fibers.
250 251 252 253 254 255 256 257 258 259 |
# File 'ext/um/um_class.c', line 250 VALUE UM_snooze(VALUE self) { struct um *machine = um_get_machine(self); um_schedule(machine, rb_fiber_current(), Qnil); // the current fiber is already scheduled, and the runqueue is GC-marked, so // we can safely call um_switch, which is faster than calling um_yield. VALUE ret = um_switch(machine); RAISE_IF_EXCEPTION(ret); return ret; } |
#socket(domain, type, protocol, flags) ⇒ Integer
690 691 692 693 |
# File 'ext/um/um_class.c', line 690 VALUE UM_socket(VALUE self, VALUE domain, VALUE type, VALUE protocol, VALUE flags) { struct um *machine = um_get_machine(self); return um_socket(machine, NUM2INT(domain), NUM2INT(type), NUM2INT(protocol), NUM2UINT(flags)); } |
#spin(value = nil, klass = Fiber, &block) ⇒ Fiber
Creates a new fiber and schedules it to be ran.
31 32 33 34 35 36 37 |
# File 'lib/uringmachine.rb', line 31 def spin(value = nil, klass = Fiber, &block) fiber = klass.new(blocking: false) { |v| run_block_in_fiber(block, fiber, v) } self.schedule(fiber, value) fiber_set << fiber fiber end |
#spin_actor(mod, *a, **k) ⇒ Object
4 5 6 7 8 9 10 11 |
# File 'lib/uringmachine/actor.rb', line 4 def spin_actor(mod, *a, **k) target = Object.new.extend(mod) mailbox = UM::Queue.new actor = spin(nil, Actor) { actor.run(self, target, mailbox) } target.setup(*a, **k) snooze actor end |
#spin_thread_actor(mod, *a, **k) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/uringmachine/actor.rb', line 13 def spin_thread_actor(mod, *a, **k) machine = UM.new target = Object.new.extend(mod) mailbox = UM::Queue.new actor = Actor.new Thread.new do actor.run(machine, target, mailbox) end target.setup(*a, **k) snooze actor end |
#splice(in_fd, out_fd, nbytes) ⇒ Integer
Splices bytes from in_fd to out_fd. At least one of the given fds must be a pipe.
551 552 553 554 |
# File 'ext/um/um_class.c', line 551 VALUE UM_splice(VALUE self, VALUE in_fd, VALUE out_fd, VALUE nbytes) { struct um *machine = um_get_machine(self); return um_splice(machine, NUM2INT(in_fd), NUM2INT(out_fd), NUM2UINT(nbytes)); } |
#sqpoll_mode? ⇒ bool
Returns the SQPOLL mode state.
184 185 186 187 |
# File 'ext/um/um_class.c', line 184 VALUE UM_sqpoll_mode_p(VALUE self) { struct um *machine = um_get_machine(self); return machine->sqpoll_mode ? Qtrue : Qfalse; } |
#ssl_read(ssl, buf, maxlen) ⇒ Integer
Reads from the given SSL socket. This method should be used after first having called #ssl_set_bio.
1248 1249 1250 1251 1252 |
# File 'ext/um/um_class.c', line 1248 VALUE UM_ssl_read(VALUE self, VALUE ssl, VALUE buf, VALUE maxlen) { struct um *machine = um_get_machine(self); int ret = um_ssl_read(machine, ssl, buf, NUM2INT(maxlen)); return INT2NUM(ret); } |
#ssl_set_bio(ssl) ⇒ UringMachine
Sets up the given ssl socket to use the machine for sending and receiving.
1231 1232 1233 1234 1235 |
# File 'ext/um/um_class.c', line 1231 VALUE UM_ssl_set_bio(VALUE self, VALUE ssl) { struct um *machine = um_get_machine(self); um_ssl_set_bio(machine, ssl); return self; } |
#ssl_write(ssl, buf, len) ⇒ Integer
Writes to the given SSL socket. This method should be used after first having called #ssl_set_bio.
1265 1266 1267 1268 1269 |
# File 'ext/um/um_class.c', line 1265 VALUE UM_ssl_write(VALUE self, VALUE ssl, VALUE buf, VALUE len) { struct um *machine = um_get_machine(self); int ret = um_ssl_write(machine, ssl, buf, NUM2INT(len)); return INT2NUM(ret); } |
#statx(fd, nil, flags, mask) ⇒ Hash #statx(dirfd, path, flags, mask) ⇒ Hash #statx(UM: :AT_FDCWD, path, flags, mask) ⇒ Hash
Returns information about a file.
532 533 534 535 |
# File 'ext/um/um_class.c', line 532 VALUE UM_statx(VALUE self, VALUE dirfd, VALUE path, VALUE flags, VALUE mask) { struct um *machine = um_get_machine(self); return um_statx(machine, NUM2INT(dirfd), path, NUM2INT(flags), NUM2UINT(mask)); } |
#submit ⇒ Integer
Submits any pending I/O operations that are not yet submitted.
307 308 309 310 311 |
# File 'ext/um/um_class.c', line 307 VALUE UM_submit(VALUE self) { struct um *machine = um_get_machine(self); uint ret = um_submit(machine); return UINT2NUM(ret); } |
#switch ⇒ any
Yields control to the next fiber in the runqueue. The current fiber will not be resumed unless it is scheduled again by some other fiber. The call to #yield will return the value with which the current fiber will be eventually scheduled. If resumed with an exception, that exception will be raised when the fiber is resumed.
284 285 286 287 288 289 290 |
# File 'ext/um/um_class.c', line 284 VALUE UM_switch(VALUE self) { struct um *machine = um_get_machine(self); VALUE ret = um_switch(machine); RAISE_IF_EXCEPTION(ret); return ret; } |
#synchronize(mutex) { ... } ⇒ any
Synchronizes access to the given mutex. The mutex is locked, the given block is executed and finally the mutex is unlocked.
1025 1026 1027 1028 1029 |
# File 'ext/um/um_class.c', line 1025 VALUE UM_mutex_synchronize(VALUE self, VALUE mutex) { struct um *machine = um_get_machine(self); struct um_mutex *mutex_data = Mutex_data(mutex); return um_mutex_synchronize(machine, mutex_data); } |
#tcp_connect(host, port) ⇒ Integer
Creates and connects a TCP socket to the given host and port.
241 242 243 244 245 |
# File 'lib/uringmachine.rb', line 241 def tcp_connect(host, port) fd = socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0) connect(fd, host, port) fd end |
#tcp_listen(host, port) ⇒ Integer
Creates, binds and sets up a TCP socket for listening on the given host and port.
229 230 231 232 233 234 |
# File 'lib/uringmachine.rb', line 229 def tcp_listen(host, port) fd = socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0) bind(fd, host, port) listen(fd, UM::SOMAXCONN) fd end |
#tee(in_fd, out_fd, nbytes) ⇒ Integer
Duplicates bytes from in_fd to out_fd. At least one of the given fds must be a pipe.
570 571 572 573 |
# File 'ext/um/um_class.c', line 570 VALUE UM_tee(VALUE self, VALUE in_fd, VALUE out_fd, VALUE nbytes) { struct um *machine = um_get_machine(self); return um_tee(machine, NUM2INT(in_fd), NUM2INT(out_fd), NUM2UINT(nbytes)); } |
#terminate(*fibers) ⇒ void
This method returns an undefined value.
Terminates the given fibers by scheduling them with a UM::Terminate exception. This method does not wait for the fibers to be done.
46 47 48 49 50 |
# File 'lib/uringmachine.rb', line 46 def terminate(*fibers) fibers = fibers.first if fibers.size == 1 && fibers.first.is_a?(Enumerable) fibers.each { schedule(it, TERMINATE_EXCEPTION) unless it.done? } end |
#test_mode=(value) ⇒ bool
Sets/resets test mode.
209 210 211 212 213 |
# File 'ext/um/um_class.c', line 209 VALUE UM_test_mode_set(VALUE self, VALUE value) { struct um *machine = um_get_machine(self); machine->test_mode = RTEST(value); return value; } |
#timeout(interval, exception) ⇒ any
Runs the given block, interrupting its execution if its runtime exceeds the given timeout interval (in seconds), raising the specified exception.
345 346 347 348 |
# File 'ext/um/um_class.c', line 345 VALUE UM_timeout(VALUE self, VALUE interval, VALUE exception) { struct um *machine = um_get_machine(self); return um_timeout(machine, interval, exception); } |
#unshift(queue, value) ⇒ UM::Queue
Pushes a value to the head of the given queue.
1078 1079 1080 1081 1082 |
# File 'ext/um/um_class.c', line 1078 VALUE UM_queue_unshift(VALUE self, VALUE queue, VALUE value) { struct um *machine = um_get_machine(self); struct um_queue *que = Queue_data(queue); return um_queue_unshift(machine, que, value); } |
#waitid(idtype, id, options) ⇒ Array
Waits for a process to change state. The process to wait for can be specified as a pid or as a pidfd, according to the given idtype and id.
1190 1191 1192 1193 |
# File 'ext/um/um_class.c', line 1190 VALUE UM_waitid(VALUE self, VALUE idtype, VALUE id, VALUE ) { struct um *machine = um_get_machine(self); return um_waitid(machine, NUM2INT(idtype), NUM2INT(id), NUM2INT()); } |
#waitid_status(idtype, id, options) ⇒ Object
:nodoc:
This method depends on the availability of rb_process_status_new. See: github.com/ruby/ruby/pull/15213
1203 1204 1205 1206 |
# File 'ext/um/um_class.c', line 1203 VALUE UM_waitid_status(VALUE self, VALUE idtype, VALUE id, VALUE ) { struct um *machine = um_get_machine(self); return um_waitid_status(machine, NUM2INT(idtype), NUM2INT(id), NUM2INT()); } |
#wakeup ⇒ void
This method returns an undefined value.
Wakes up the machine in order to start processing its runqueue again. This method is normally called from another thread in order to resume processing of the runqueue when a machine is waiting for I/O completions.
298 299 300 301 |
# File 'ext/um/um_class.c', line 298 VALUE UM_wakeup(VALUE self) { struct um *machine = um_get_machine(self); return um_wakeup(machine); } |
#write(fd, buffer, len = nil, file_offset = nil) ⇒ Integer
Writes up to len bytes from the given buffer to the given fd. If len, is not given, the entire buffer length is used.
443 444 445 446 447 448 449 450 451 452 453 454 455 |
# File 'ext/um/um_class.c', line 443 VALUE UM_write(int argc, VALUE *argv, VALUE self) { struct um *machine = um_get_machine(self); VALUE fd; VALUE buffer; VALUE len; VALUE file_offset; rb_scan_args(argc, argv, "22", &fd, &buffer, &len, &file_offset); size_t len_i = NIL_P(len) ? (size_t)-1 : NUM2UINT(len); __u64 file_offset_i = NIL_P(file_offset) ? (__u64)-1 : NUM2UINT(file_offset); return um_write(machine, NUM2INT(fd), buffer, len_i, file_offset_i); } |
#write_async(fd, buffer, len = nil, file_offset = nil) ⇒ String, IO::Buffer
Writes up to len bytes from the given buffer to the given fd. If len, is not given, the entire buffer length is used. This method submits the operation but does not wait for it to complete. This method may be used to improve performance in situations where the application does not care about whether the I/O operation succeeds or not.
502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'ext/um/um_class.c', line 502 VALUE UM_write_async(int argc, VALUE *argv, VALUE self) { struct um *machine = um_get_machine(self); VALUE fd; VALUE buffer; VALUE len; VALUE file_offset; rb_scan_args(argc, argv, "22", &fd, &buffer, &len, &file_offset); size_t len_i = NIL_P(len) ? (size_t)-1 : NUM2UINT(len); __u64 file_offset_i = NIL_P(file_offset) ? (__u64)-1 : NUM2UINT(file_offset); return um_write_async(machine, NUM2INT(fd), buffer, len_i, file_offset_i); } |
#writev(fd, *buffers, file_offset = nil) ⇒ Integer
Writes from the given buffers into the given fd. This method does not guarantee that all data will be written. The application code should check the return value which indicates the number of bytes written and potentially repeat the operation after adjusting the buffers accordingly. See also #sendv.
474 475 476 477 478 479 480 481 482 |
# File 'ext/um/um_class.c', line 474 VALUE UM_writev(int argc, VALUE *argv, VALUE self) { struct um *machine = um_get_machine(self); if (argc < 1) rb_raise(rb_eArgError, "wrong number of arguments (given 0, expected 1+)"); int fd = NUM2INT(argv[0]); if (argc < 2) return INT2NUM(0); return um_writev(machine, fd, argc - 1, argv + 1); } |
#yield ⇒ any
Yields control to the next fiber in the runqueue. The current fiber will not be resumed unless it is scheduled again by some other fiber. The call to #yield will return the value with which the current fiber will be eventually scheduled.
268 269 270 271 272 273 274 |
# File 'ext/um/um_class.c', line 268 VALUE UM_yield(VALUE self) { struct um *machine = um_get_machine(self); VALUE ret = um_yield(machine); RAISE_IF_EXCEPTION(ret); return ret; } |