Class: POSIX_MQ

Inherits:
Object
  • Object
show all
Defined in:
lib/posix_mq.rb,
ext/posix_mq/posix_mq.c

Defined Under Namespace

Classes: Attr

Constant Summary collapse

VERSION =

version of POSIX_MQ, currently 0.1.0

'0.2.0'
OPEN_MAX =

The maximum number of open message descriptors supported by the system. This may be -1, in which case it is dynamically set at runtime. Consult your operating system documentation for system-specific information about this.

LONG2NUM(sysconf(_SC_MQ_OPEN_MAX))
PRIO_MAX =

The maximum priority that may be specified for POSIX_MQ#send On POSIX-compliant systems, this is at least 31, but some systems allow higher limits. The minimum priority is always zero.

LONG2NUM(sysconf(_SC_MQ_PRIO_MAX))

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Object

POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq

Opens a POSIX message queue given by name. name should start with a slash (“/”) for portable applications.

If a Symbol is given in place of integer flags, then:

  • :r is equivalent to IO::RDONLY

  • :w is equivalent to IO::CREAT|IO::WRONLY

  • :rw is equivalent to IO::CREAT|IO::RDWR

mode is an integer and only used when IO::CREAT is used. mq_attr is a POSIX_MQ::Attr and only used if IO::CREAT is used. If mq_attr is not specified when creating a queue, then the system defaults will be used.

See the manpage for mq_open(3) for more details on this function.



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'ext/posix_mq/posix_mq.c', line 287

static VALUE init(int argc, VALUE *argv, VALUE self)
{
	struct posix_mq *mq = get(self, 0);
	struct open_args x;
	VALUE name, oflags, mode, attr;

	rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);

	if (TYPE(name) != T_STRING)
		rb_raise(rb_eArgError, "name must be a string");

	switch (TYPE(oflags)) {
	case T_NIL:
		x.oflags = O_RDONLY;
		break;
	case T_SYMBOL:
		if (oflags == sym_r)
			x.oflags = O_RDONLY;
		else if (oflags == sym_w)
			x.oflags = O_CREAT|O_WRONLY;
		else if (oflags == sym_rw)
			x.oflags = O_CREAT|O_RDWR;
		else
			rb_raise(rb_eArgError,
			         "symbol must be :r, :w, or :rw: %s",
				 RSTRING_PTR(rb_inspect(oflags)));
		break;
	case T_BIGNUM:
	case T_FIXNUM:
		x.oflags = NUM2INT(oflags);
		break;
	default:
		rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
	}

	x.name = RSTRING_PTR(name);
	x.argc = 2;

	switch (TYPE(mode)) {
	case T_FIXNUM:
		x.argc = 3;
		x.mode = NUM2INT(mode);
		break;
	case T_NIL:
		if (x.oflags & O_CREAT) {
			x.argc = 3;
			x.mode = 0666;
		}
		break;
	default:
		rb_raise(rb_eArgError, "mode not an integer");
	}

	switch (TYPE(attr)) {
	case T_STRUCT:
		x.argc = 4;
		attr_from_struct(&x.attr, attr, 1);

		/* principle of least surprise */
		if (x.attr.mq_flags & O_NONBLOCK)
			x.oflags |= O_NONBLOCK;
		break;
	case T_NIL:
		break;
	default:
		rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
		         RSTRING_PTR(rb_inspect(attr)));
	}

	mq->des = (mqd_t)rb_thread_blocking_region(xopen, &x, RUBY_UBF_IO, 0);
	if (mq->des == MQD_INVALID)
		rb_sys_fail("mq_open");

	mq->name = rb_str_dup(name);

	return self;
}

Class Method Details

.open(*args) ⇒ Object

Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit. All all arguments are passed to POSIX_MQ.new.



20
21
22
23
24
25
26
27
28
# File 'lib/posix_mq.rb', line 20

def open(*args)
  mq = new(*args)
  block_given? or return mq
  begin
    yield mq
  ensure
    mq.close unless mq.closed?
  end
end

POSIX_MQ.unlink(name) => 1

Unlinks the message queue given by name. The queue will be destroyed when the last process with the queue open closes its queue descriptors.



372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'ext/posix_mq/posix_mq.c', line 372

static VALUE s_unlink(VALUE self, VALUE name)
{
	mqd_t rv;
	void *ptr = (void *)name;

	if (TYPE(name) != T_STRING)
		rb_raise(rb_eArgError, "argument must be a string");

	rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
	if (rv == MQD_INVALID)
		rb_sys_fail("mq_unlink");

	return INT2NUM(1);
}

Instance Method Details

#<<(buffer) ⇒ Object

mq << string => mq

Inserts the given string into the message queue with a default priority of 0 and no timeout.



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'ext/posix_mq/posix_mq.c', line 458

static VALUE send0(VALUE self, VALUE buffer)
{
	struct posix_mq *mq = get(self, 1);
	struct rw_args x;
	mqd_t rv;

	setup_send_buffer(&x, buffer);
	x.des = mq->des;
	x.timeout = NULL;
	x.msg_prio = 0;

	rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
	if (rv == MQD_INVALID)
		rb_sys_fail("mq_send");

	return self;
}

#attrObject

mq.attr => mq_attr

Returns a POSIX_MQ::Attr struct containing the attributes of the message queue. See the mq_getattr(3) manpage for more details.



565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
# File 'ext/posix_mq/posix_mq.c', line 565

static VALUE getattr(VALUE self)
{
	struct posix_mq *mq = get(self, 1);
	struct mq_attr attr;
	VALUE astruct;
	VALUE *ptr;

	if (mq_getattr(mq->des, &attr) == MQD_INVALID)
		rb_sys_fail("mq_getattr");

	astruct = rb_struct_alloc_noinit(cAttr);
	ptr = RSTRUCT_PTR(astruct);
	ptr[0] = LONG2NUM(attr.mq_flags);
	ptr[1] = LONG2NUM(attr.mq_maxmsg);
	ptr[2] = LONG2NUM(attr.mq_msgsize);
	ptr[3] = LONG2NUM(attr.mq_curmsgs);

	return astruct;
}

#attr=(astruct) ⇒ Object

mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr

Only the IO::NONBLOCK flag may be set or unset (zero) in this manner. See the mq_setattr(3) manpage for more details.

Consider using the POSIX_MQ#nonblock= method as it is easier and more natural to use.



595
596
597
598
599
600
601
602
603
604
605
606
# File 'ext/posix_mq/posix_mq.c', line 595

static VALUE setattr(VALUE self, VALUE astruct)
{
	struct posix_mq *mq = get(self, 1);
	struct mq_attr newattr;

	attr_from_struct(&newattr, astruct, 0);

	if (mq_setattr(mq->des, &newattr, NULL) == MQD_INVALID)
		rb_sys_fail("mq_setattr");

	return astruct;
}

#closeObject

mq.close => nil

Closes the underlying message queue descriptor. If this descriptor had a registered notification request, the request will be removed so another descriptor or process may register a notification request. Message queue descriptors are automatically closed by garbage collection.



618
619
620
621
622
623
624
625
626
627
628
629
# File 'ext/posix_mq/posix_mq.c', line 618

static VALUE _close(VALUE self)
{
	struct posix_mq *mq = get(self, 1);

	if (mq_close(mq->des) == MQD_INVALID)
		rb_sys_fail("mq_close");

	mq->des = MQD_INVALID;
	MQ_IO_SET(mq, Qnil);

	return Qnil;
}

#closed?Boolean

mq.closed? => true or false

Returns true if the message queue descriptor is closed and therefore unusable, otherwise false

Returns:

  • (Boolean)


638
639
640
641
642
643
# File 'ext/posix_mq/posix_mq.c', line 638

static VALUE closed(VALUE self)
{
	struct posix_mq *mq = get(self, 0);

	return mq->des == MQD_INVALID ? Qtrue : Qfalse;
}

#nameObject

mq.name => string

Returns the string name of message queue associated with mq



651
652
653
654
655
656
# File 'ext/posix_mq/posix_mq.c', line 651

static VALUE name(VALUE self)
{
	struct posix_mq *mq = get(self, 0);

	return mq->name;
}

#nonblock=(nb) ⇒ Object

mq.nonblock = boolean => boolean

Enables or disables non-blocking operation for the message queue descriptor. Errno::EAGAIN will be raised in situations where the queue would block. This is not compatible with timeout arguments to POSIX_MQ#send and POSIX_MQ#receive.



767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
# File 'ext/posix_mq/posix_mq.c', line 767

static VALUE setnonblock(VALUE self, VALUE nb)
{
	struct mq_attr newattr, oldattr;
	struct posix_mq *mq = get(self, 1);

	if (nb == Qtrue)
		newattr.mq_flags = O_NONBLOCK;
	else if (nb == Qfalse)
		newattr.mq_flags = 0;
	else
		rb_raise(rb_eArgError, "must be true or false");

	if (mq_setattr(mq->des, &newattr, &oldattr) == MQD_INVALID)
		rb_sys_fail("mq_setattr");

	mq->msgsize = oldattr.mq_msgsize; /* optimization */

	return nb;
}

#nonblock?Boolean

mq.nonblock? => true or false

Returns the current non-blocking state of the message queue descriptor.

Returns:

  • (Boolean)


745
746
747
748
749
750
751
752
753
754
755
756
# File 'ext/posix_mq/posix_mq.c', line 745

static VALUE getnonblock(VALUE self)
{
	struct mq_attr attr;
	struct posix_mq *mq = get(self, 1);

	if (mq_getattr(mq->des, &attr) == MQD_INVALID)
		rb_sys_fail("mq_getattr");

	mq->msgsize = attr.mq_msgsize; /* optimization */

	return attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
}

#notify=(arg) ⇒ Object

mq.notify = signal => signal

Registers the notification request to deliver a given signal to the current process when message is received. If signal is nil, it will unregister and disable the notification request to allow other processes to register a request. If signal is false, it will register a no-op notification request which will prevent other processes from registering a notification. Only one process may have a notification request for a queue at a time, Errno::EBUSY will be raised if there is already a notification request registration for the queue.

For readers of the mq_notify(3) manpage, passing false is equivalent to SIGEV_NONE, and passing nil is equivalent of passing a NULL notification pointer to mq_notify(3).



704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
# File 'ext/posix_mq/posix_mq.c', line 704

static VALUE setnotify(VALUE self, VALUE arg)
{
	struct posix_mq *mq = get(self, 1);
	struct sigevent not;
	struct sigevent * notification = &not;
	VALUE rv = arg;

	not.sigev_notify = SIGEV_SIGNAL;

	switch (TYPE(arg)) {
	case T_FALSE:
		not.sigev_notify = SIGEV_NONE;
		break;
	case T_NIL:
		notification = NULL;
		break;
	case T_FIXNUM:
		not.sigev_signo = NUM2INT(arg);
		break;
	case T_SYMBOL:
	case T_STRING:
		not.sigev_signo = lookup_sig(arg);
		rv = INT2NUM(not.sigev_signo);
		break;
	default:
		/* maybe support Proc+thread via sigev_notify_function.. */
		rb_raise(rb_eArgError, "must be a signal or nil");
	}

	if (mq_notify(mq->des, notification) == MQD_INVALID)
		rb_sys_fail("mq_notify");

	return rv;
}

#receive(*args) ⇒ Object

mq.receive([buffer, [timeout]]) => [ message, priority ]

Takes the highest priority message off the queue and returns an array containing the message as a String and the Integer priority of the message.

If the optional buffer is present, then it must be a String which will receive the data.

If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
# File 'ext/posix_mq/posix_mq.c', line 522

static VALUE receive(int argc, VALUE *argv, VALUE self)
{
	struct posix_mq *mq = get(self, 1);
	struct rw_args x;
	VALUE buffer, timeout;
	ssize_t r;
	struct timespec expire;

	if (mq->msgsize < 0)
		get_msgsize(mq);

	rb_scan_args(argc, argv, "02", &buffer, &timeout);
	x.timeout = convert_timeout(&expire, timeout);

	if (NIL_P(buffer)) {
		buffer = rb_str_new(0, mq->msgsize);
	} else {
		StringValue(buffer);
		rb_str_modify(buffer);
		rb_str_resize(buffer, mq->msgsize);
	}
	OBJ_TAINT(buffer);
	x.msg_ptr = RSTRING_PTR(buffer);
	x.msg_len = (size_t)mq->msgsize;
	x.des = mq->des;

	r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0);
	if (r < 0)
		rb_sys_fail("mq_receive");

	rb_str_set_len(buffer, r);

	return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
}

#send(*args) ⇒ Object

mq.send(string [,priority[, timeout]]) => nil

Inserts the given string into the message queue with an optional, unsigned integer priority. If the optional timeout is specified, then Errno::ETIMEDOUT will be raised if the operation cannot complete before timeout seconds has elapsed. Without timeout, this method may block until the queue is writable.



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'ext/posix_mq/posix_mq.c', line 429

static VALUE _send(int argc, VALUE *argv, VALUE self)
{
	struct posix_mq *mq = get(self, 1);
	struct rw_args x;
	VALUE buffer, prio, timeout;
	mqd_t rv;
	struct timespec expire;

	rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);

	setup_send_buffer(&x, buffer);
	x.des = mq->des;
	x.timeout = convert_timeout(&expire, timeout);
	x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);

	rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
	if (rv == MQD_INVALID)
		rb_sys_fail("mq_send");

	return Qnil;
}

#to_ioObject

mq.to_io => IO

Returns an IO.select-able IO object. This method is only available under Linux and is not intended to be portable.



484
485
486
487
488
489
490
491
492
493
# File 'ext/posix_mq/posix_mq.c', line 484

static VALUE to_io(VALUE self)
{
	struct posix_mq *mq = get(self, 1);
	int fd = MQD_TO_FD(mq->des);

	if (NIL_P(mq->io))
		mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));

	return mq->io;
}

mq.unlink => mq

Unlinks the message queue to prevent other processes from accessing it. All existing queue descriptors to this queue including those opened by other processes are unaffected. The queue will only be destroyed when the last process with open descriptors to this queue closes the descriptors.



397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'ext/posix_mq/posix_mq.c', line 397

static VALUE _unlink(VALUE self)
{
	struct posix_mq *mq = get(self, 0);
	mqd_t rv;
	void *ptr = (void *)mq->name;

	assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");

	rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
	if (rv == MQD_INVALID)
		rb_sys_fail("mq_unlink");

	return self;
}