Class: Event::Backend::URing

Inherits:
Object
  • Object
show all
Defined in:
ext/event/backend/uring.c

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'ext/event/backend/uring.c', line 90

VALUE Event_Backend_URing_initialize(VALUE self, VALUE loop) {
	struct Event_Backend_URing *data = NULL;
	TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
	
	data->loop = loop;
	
	int result = io_uring_queue_init(URING_ENTRIES, &data->ring, 0);
	
	if (result < 0) {
		rb_syserr_fail(-result, "io_uring_queue_init");
	}
	
	rb_update_max_fd(data->ring.ring_fd);
	
	return self;
}

Instance Method Details

#closeObject



107
108
109
110
111
112
113
114
# File 'ext/event/backend/uring.c', line 107

VALUE Event_Backend_URing_close(VALUE self) {
	struct Event_Backend_URing *data = NULL;
	TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
	
	close_internal(data);
	
	return Qnil;
}

#io_read(fiber, io, buffer, offset, length) ⇒ Object



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'ext/event/backend/uring.c', line 286

VALUE Event_Backend_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE offset, VALUE length) {
	struct Event_Backend_URing *data = NULL;
	TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
	
	resize_to_capacity(buffer, NUM2SIZET(offset), NUM2SIZET(length));
	
	int descriptor = NUM2INT(rb_funcall(io, id_fileno, 0));
	struct io_uring_sqe *sqe = io_get_sqe(data);
	assert(sqe);
	
	struct iovec iovecs[1];
	iovecs[0].iov_base = RSTRING_PTR(buffer) + NUM2SIZET(offset);
	iovecs[0].iov_len = NUM2SIZET(length);
	
	io_uring_prep_readv(sqe, descriptor, iovecs, 1, 0);
	io_uring_sqe_set_data(sqe, (void*)fiber);
	io_uring_submit(&data->ring);
	
	// fprintf(stderr, "prep_readv(%p, %d, %ld)\n", sqe, descriptor, iovecs[0].iov_len);
	
	int result = NUM2INT(Event_Backend_transfer(data->loop));
	
	if (result < 0) {
		rb_syserr_fail(-result, strerror(-result));
	}
	
	resize_to_fit(buffer, NUM2SIZET(offset), (size_t)result);
	
	return INT2NUM(result);
}

#io_wait(fiber, io, events) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'ext/event/backend/uring.c', line 237

VALUE Event_Backend_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE events) {
	struct Event_Backend_URing *data = NULL;
	TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
	
	int descriptor = NUM2INT(rb_funcall(io, id_fileno, 0));
	struct io_uring_sqe *sqe = io_get_sqe(data);
	assert(sqe);
	
	short flags = poll_flags_from_events(NUM2INT(events));
	
	// fprintf(stderr, "poll_add(%p, %d, %d, %p)\n", sqe, descriptor, flags, (void*)fiber);
	
	io_uring_prep_poll_add(sqe, descriptor, flags);
	io_uring_sqe_set_data(sqe, (void*)fiber);
	// fprintf(stderr, "io_uring_submit\n");
	// io_uring_submit(&data->ring);
	
	struct io_wait_arguments io_wait_arguments = {
		.data = data,
		.fiber = fiber,
		.flags = flags
	};
	
	return rb_rescue(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_rescue, (VALUE)&io_wait_arguments);
}

#io_write(fiber, io, buffer, offset, length) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'ext/event/backend/uring.c', line 317

VALUE Event_Backend_URing_io_write(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE offset, VALUE length) {
	struct Event_Backend_URing *data = NULL;
	TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
	
	if ((size_t)RSTRING_LEN(buffer) < NUM2SIZET(offset) + NUM2SIZET(length)) {
		rb_raise(rb_eRuntimeError, "invalid offset/length exceeds bounds of buffer");
	}
	
	int descriptor = NUM2INT(rb_funcall(io, id_fileno, 0));
	struct io_uring_sqe *sqe = io_get_sqe(data);
	
	struct iovec iovecs[1];
	iovecs[0].iov_base = RSTRING_PTR(buffer) + NUM2SIZET(offset);
	iovecs[0].iov_len = NUM2SIZET(length);
	
	io_uring_prep_writev(sqe, descriptor, iovecs, 1, 0);
	io_uring_sqe_set_data(sqe, (void*)fiber);
	io_uring_submit(&data->ring);
	
	// fprintf(stderr, "prep_writev(%p, %d, %ld)\n", sqe, descriptor, iovecs[0].iov_len);
	
	int result = NUM2INT(Event_Backend_transfer(data->loop));
	
	if (result < 0) {
		rb_syserr_fail(-result, strerror(-result));
	}
	
	return INT2NUM(result);
}

#process_wait(fiber, pid, flags) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'ext/event/backend/uring.c', line 154

VALUE Event_Backend_URing_process_wait(VALUE self, VALUE fiber, VALUE pid, VALUE flags) {
	struct Event_Backend_URing *data = NULL;
	TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
	
	struct process_wait_arguments process_wait_arguments = {
		.data = data,
		.pid = NUM2PIDT(pid),
		.flags = NUM2INT(flags),
	};
	
	process_wait_arguments.descriptor = pidfd_open(process_wait_arguments.pid, 0);
	rb_update_max_fd(process_wait_arguments.descriptor);
	
	struct io_uring_sqe *sqe = io_get_sqe(data);
	assert(sqe);
	
	io_uring_prep_poll_add(sqe, process_wait_arguments.descriptor, POLLIN|POLLHUP|POLLERR);
	io_uring_sqe_set_data(sqe, (void*)fiber);
	
	return rb_ensure(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_ensure, (VALUE)&process_wait_arguments);
}

#select(duration) ⇒ Object



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# File 'ext/event/backend/uring.c', line 444

VALUE Event_Backend_URing_select(VALUE self, VALUE duration) {
	struct Event_Backend_URing *data = NULL;
	TypedData_Get_Struct(self, struct Event_Backend_URing, &Event_Backend_URing_Type, data);
	
	int result = select_process_completions(&data->ring);
	
	if (result < 0) {
		rb_syserr_fail(-result, strerror(-result));
	} else if (result == 0) {
		// We might need to wait for events:
		struct select_arguments arguments = {
			.data = data,
			.timeout = NULL,
		};
		
		arguments.timeout = make_timeout(duration, &arguments.storage);
		
		if (!timeout_nonblocking(arguments.timeout)) {
			result = select_internal_without_gvl(&arguments);
		} else {
			io_uring_submit(&data->ring);
		}
	}
	
	result = select_process_completions(&data->ring);
	
	return INT2NUM(result);
}