Class: IO::Event::Selector::KQueue

Inherits:
Object
  • Object
show all
Defined in:
ext/io/event/selector/kqueue.c

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'ext/io/event/selector/kqueue.c', line 97

VALUE IO_Event_Selector_KQueue_initialize(VALUE self, VALUE loop) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	IO_Event_Selector_initialize(&data->backend, loop);
	int result = kqueue();
	
	if (result == -1) {
		rb_sys_fail("IO_Event_Selector_KQueue_initialize:kqueue");
	} else {
		ioctl(result, FIOCLEX);
		data->descriptor = result;
		
		rb_update_max_fd(data->descriptor);
	}
	
	return self;
}

Instance Method Details

#closeObject



123
124
125
126
127
128
129
130
# File 'ext/io/event/selector/kqueue.c', line 123

VALUE IO_Event_Selector_KQueue_close(VALUE self) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	close_internal(data);
	
	return Qnil;
}

#io_read(*args) ⇒ Object



470
471
472
473
474
475
476
477
478
479
480
481
# File 'ext/io/event/selector/kqueue.c', line 470

static VALUE IO_Event_Selector_KQueue_io_read_compatible(int argc, VALUE *argv, VALUE self)
{
	rb_check_arity(argc, 4, 5);
	
	VALUE _offset = SIZET2NUM(0);
	
	if (argc == 5) {
		_offset = argv[4];
	}
	
	return IO_Event_Selector_KQueue_io_read(self, argv[0], argv[1], argv[2], argv[3], _offset);
}

#io_wait(fiber, io, events) ⇒ Object



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'ext/io/event/selector/kqueue.c', line 366

VALUE IO_Event_Selector_KQueue_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE events) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	int descriptor = IO_Event_Selector_io_descriptor(io);
	
	struct io_wait_arguments io_wait_arguments = {
		.events = io_add_filters(data->descriptor, descriptor, RB_NUM2INT(events), fiber),
		.data = data,
		.descriptor = descriptor,
	};
	
	if (DEBUG_IO_WAIT) fprintf(stderr, "IO_Event_Selector_KQueue_io_wait descriptor=%d\n", descriptor);
	
	return rb_rescue(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_rescue, (VALUE)&io_wait_arguments);
}

#io_write(*args) ⇒ Object



572
573
574
575
576
577
578
579
580
581
582
583
# File 'ext/io/event/selector/kqueue.c', line 572

static VALUE IO_Event_Selector_KQueue_io_write_compatible(int argc, VALUE *argv, VALUE self)
{
	rb_check_arity(argc, 4, 5);
	
	VALUE _offset = SIZET2NUM(0);
	
	if (argc == 5) {
		_offset = argv[4];
	}
	
	return IO_Event_Selector_KQueue_io_write(self, argv[0], argv[1], argv[2], argv[3], _offset);
}

#loopObject



116
117
118
119
120
121
# File 'ext/io/event/selector/kqueue.c', line 116

VALUE IO_Event_Selector_KQueue_loop(VALUE self) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	return data->backend.loop;
}

#process_wait(fiber, pid, flags) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'ext/io/event/selector/kqueue.c', line 242

VALUE IO_Event_Selector_KQueue_process_wait(VALUE self, VALUE fiber, VALUE pid, VALUE flags) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	struct process_wait_arguments process_wait_arguments = {
		.data = data,
		.pid = NUM2PIDT(pid),
		.flags = RB_NUM2INT(flags),
	};
	
	VALUE result = Qnil;
	
	// This loop should not be needed but I have seen a race condition between NOTE_EXIT and `waitpid`, thus the result would be (unexpectedly) nil. So we put this in a loop to retry if the race condition shows up:
	while (NIL_P(result)) {
		int waiting = process_add_filters(data->descriptor, process_wait_arguments.pid, fiber);
	
		if (waiting) {
			result = rb_rescue(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_rescue, (VALUE)&process_wait_arguments);
		} else {
			result = IO_Event_Selector_process_status_wait(process_wait_arguments.pid);
		}
	}
	
	return result;
}

#push(fiber) ⇒ Object



156
157
158
159
160
161
162
163
164
# File 'ext/io/event/selector/kqueue.c', line 156

VALUE IO_Event_Selector_KQueue_push(VALUE self, VALUE fiber)
{
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	IO_Event_Selector_queue_push(&data->backend, fiber);
	
	return Qnil;
}

#raise(*args) ⇒ Object



166
167
168
169
170
171
172
# File 'ext/io/event/selector/kqueue.c', line 166

VALUE IO_Event_Selector_KQueue_raise(int argc, VALUE *argv, VALUE self)
{
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	return IO_Event_Selector_raise(&data->backend, argc, argv);
}

#ready?Boolean

Returns:

  • (Boolean)


174
175
176
177
178
179
# File 'ext/io/event/selector/kqueue.c', line 174

VALUE IO_Event_Selector_KQueue_ready_p(VALUE self) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	return data->backend.ready ? Qtrue : Qfalse;
}

#resume(*args) ⇒ Object



140
141
142
143
144
145
146
# File 'ext/io/event/selector/kqueue.c', line 140

VALUE IO_Event_Selector_KQueue_resume(int argc, VALUE *argv, VALUE self)
{
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	return IO_Event_Selector_resume(&data->backend, argc, argv);
}

#select(duration) ⇒ Object



666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
# File 'ext/io/event/selector/kqueue.c', line 666

VALUE IO_Event_Selector_KQueue_select(VALUE self, VALUE duration) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	int ready = IO_Event_Selector_queue_flush(&data->backend);
	
	struct select_arguments arguments = {
		.data = data,
		.count = KQUEUE_MAX_EVENTS,
		.storage = {
			.tv_sec = 0,
			.tv_nsec = 0
		}
	};
	
	arguments.timeout = &arguments.storage;
	
	// We break this implementation into two parts.
	// (1) count = kevent(..., timeout = 0)
	// (2) without gvl: kevent(..., timeout = 0) if count == 0 and timeout != 0
	// This allows us to avoid releasing and reacquiring the GVL.
	// Non-comprehensive testing shows this gives a 1.5x speedup.
	
	// First do the syscall with no timeout to get any immediately available events:
	if (DEBUG) fprintf(stderr, "\r\nselect_internal_with_gvl timeout=" PRINTF_TIMESPEC "\r\n", PRINTF_TIMESPEC_ARGS(arguments.storage));
	select_internal_with_gvl(&arguments);
	if (DEBUG) fprintf(stderr, "\r\nselect_internal_with_gvl done\r\n");
	
	// If we:
	// 1. Didn't process any ready fibers, and
	// 2. Didn't process any events from non-blocking select (above), and
	// 3. There are no items in the ready list,
	// then we can perform a blocking select.
	if (!ready && !arguments.count && !data->backend.ready) {
		arguments.timeout = make_timeout(duration, &arguments.storage);
		
		if (!timeout_nonblocking(arguments.timeout)) {
			arguments.count = KQUEUE_MAX_EVENTS;
			
			if (DEBUG) fprintf(stderr, "IO_Event_Selector_KQueue_select timeout=" PRINTF_TIMESPEC "\n", PRINTF_TIMESPEC_ARGS(arguments.storage));
			select_internal_without_gvl(&arguments);
		}
	}
	
	for (int i = 0; i < arguments.count; i += 1) {
		if (arguments.events[i].udata) {
			VALUE fiber = (VALUE)arguments.events[i].udata;
			VALUE result = INT2NUM(arguments.events[i].filter);
			
			IO_Event_Selector_fiber_transfer(fiber, 1, &result);
		}
	}
	
	return INT2NUM(arguments.count);
}

#transferObject



132
133
134
135
136
137
138
# File 'ext/io/event/selector/kqueue.c', line 132

VALUE IO_Event_Selector_KQueue_transfer(VALUE self)
{
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	return IO_Event_Selector_fiber_transfer(data->backend.loop, 0, NULL);
}

#wakeupObject



722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
# File 'ext/io/event/selector/kqueue.c', line 722

VALUE IO_Event_Selector_KQueue_wakeup(VALUE self) {
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	if (data->blocked) {
		struct kevent trigger = {0};
		
		trigger.filter = EVFILT_USER;
		trigger.flags = EV_ADD | EV_CLEAR | EV_UDATA_SPECIFIC;
		trigger.fflags = NOTE_TRIGGER;
		
		int result = kevent(data->descriptor, &trigger, 1, NULL, 0, NULL);
		
		if (result == -1) {
			rb_sys_fail("IO_Event_Selector_KQueue_wakeup:kevent");
		}
		
		return Qtrue;
	}
	
	return Qfalse;
}

#yieldObject



148
149
150
151
152
153
154
# File 'ext/io/event/selector/kqueue.c', line 148

VALUE IO_Event_Selector_KQueue_yield(VALUE self)
{
	struct IO_Event_Selector_KQueue *data = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_KQueue, &IO_Event_Selector_KQueue_Type, data);
	
	return IO_Event_Selector_yield(&data->backend);
}