Class: POSIX_MQ
- Inherits:
-
Object
- Object
- POSIX_MQ
- Defined in:
- lib/posix_mq.rb,
ext/posix_mq/posix_mq.c
Defined Under Namespace
Classes: Attr
Constant Summary collapse
- VERSION =
version of POSIX_MQ, currently 0.1.0
'0.2.0'
- OPEN_MAX =
The maximum number of open message descriptors supported by the system. This may be -1, in which case it is dynamically set at runtime. Consult your operating system documentation for system-specific information about this.
LONG2NUM(sysconf(_SC_MQ_OPEN_MAX))
- PRIO_MAX =
The maximum priority that may be specified for POSIX_MQ#send On POSIX-compliant systems, this is at least 31, but some systems allow higher limits. The minimum priority is always zero.
LONG2NUM(sysconf(_SC_MQ_PRIO_MAX))
Class Method Summary collapse
-
.open(*args) ⇒ Object
Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit.
-
.unlink(name) ⇒ Object
POSIX_MQ.unlink(name) => 1.
Instance Method Summary collapse
-
#<<(buffer) ⇒ Object
mq << string => mq.
-
#attr ⇒ Object
mq.attr => mq_attr.
-
#attr=(astruct) ⇒ Object
mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr.
-
#close ⇒ Object
mq.close => nil.
-
#closed? ⇒ Boolean
mq.closed? => true or false.
-
#initialize(*args) ⇒ Object
constructor
POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq.
-
#name ⇒ Object
mq.name => string.
-
#nonblock=(nb) ⇒ Object
mq.nonblock = boolean => boolean.
-
#nonblock? ⇒ Boolean
mq.nonblock? => true or false.
-
#notify=(arg) ⇒ Object
mq.notify = signal => signal.
-
#receive(*args) ⇒ Object
mq.receive([buffer, [timeout]]) => [ message, priority ].
-
#send(*args) ⇒ Object
mq.send(string [,priority[, timeout]]) => nil.
-
#to_io ⇒ Object
mq.to_io => IO.
-
#unlink ⇒ Object
mq.unlink => mq.
Constructor Details
#initialize(*args) ⇒ Object
POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
Opens a POSIX message queue given by name
. name
should start with a slash (“/”) for portable applications.
If a Symbol is given in place of integer flags
, then:
-
:r
is equivalent to IO::RDONLY -
:w
is equivalent to IO::CREAT|IO::WRONLY -
:rw
is equivalent to IO::CREAT|IO::RDWR
mode
is an integer and only used when IO::CREAT is used. mq_attr
is a POSIX_MQ::Attr and only used if IO::CREAT is used. If mq_attr
is not specified when creating a queue, then the system defaults will be used.
See the manpage for mq_open(3) for more details on this function.
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'ext/posix_mq/posix_mq.c', line 287
static VALUE init(int argc, VALUE *argv, VALUE self)
{
struct posix_mq *mq = get(self, 0);
struct open_args x;
VALUE name, oflags, mode, attr;
rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
if (TYPE(name) != T_STRING)
rb_raise(rb_eArgError, "name must be a string");
switch (TYPE(oflags)) {
case T_NIL:
x.oflags = O_RDONLY;
break;
case T_SYMBOL:
if (oflags == sym_r)
x.oflags = O_RDONLY;
else if (oflags == sym_w)
x.oflags = O_CREAT|O_WRONLY;
else if (oflags == sym_rw)
x.oflags = O_CREAT|O_RDWR;
else
rb_raise(rb_eArgError,
"symbol must be :r, :w, or :rw: %s",
RSTRING_PTR(rb_inspect(oflags)));
break;
case T_BIGNUM:
case T_FIXNUM:
x.oflags = NUM2INT(oflags);
break;
default:
rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
}
x.name = RSTRING_PTR(name);
x.argc = 2;
switch (TYPE(mode)) {
case T_FIXNUM:
x.argc = 3;
x.mode = NUM2INT(mode);
break;
case T_NIL:
if (x.oflags & O_CREAT) {
x.argc = 3;
x.mode = 0666;
}
break;
default:
rb_raise(rb_eArgError, "mode not an integer");
}
switch (TYPE(attr)) {
case T_STRUCT:
x.argc = 4;
attr_from_struct(&x.attr, attr, 1);
/* principle of least surprise */
if (x.attr.mq_flags & O_NONBLOCK)
x.oflags |= O_NONBLOCK;
break;
case T_NIL:
break;
default:
rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
RSTRING_PTR(rb_inspect(attr)));
}
mq->des = (mqd_t)rb_thread_blocking_region(xopen, &x, RUBY_UBF_IO, 0);
if (mq->des == MQD_INVALID)
rb_sys_fail("mq_open");
mq->name = rb_str_dup(name);
return self;
}
|
Class Method Details
.open(*args) ⇒ Object
Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit. All all arguments are passed to POSIX_MQ.new.
20 21 22 23 24 25 26 27 28 |
# File 'lib/posix_mq.rb', line 20 def open(*args) mq = new(*args) block_given? or return mq begin yield mq ensure mq.close unless mq.closed? end end |
.unlink(name) ⇒ Object
POSIX_MQ.unlink(name) => 1
Unlinks the message queue given by name
. The queue will be destroyed when the last process with the queue open closes its queue descriptors.
372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'ext/posix_mq/posix_mq.c', line 372
static VALUE s_unlink(VALUE self, VALUE name)
{
mqd_t rv;
void *ptr = (void *)name;
if (TYPE(name) != T_STRING)
rb_raise(rb_eArgError, "argument must be a string");
rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
if (rv == MQD_INVALID)
rb_sys_fail("mq_unlink");
return INT2NUM(1);
}
|
Instance Method Details
#<<(buffer) ⇒ Object
mq << string => mq
Inserts the given string
into the message queue with a default priority of 0 and no timeout.
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'ext/posix_mq/posix_mq.c', line 458
static VALUE send0(VALUE self, VALUE buffer)
{
struct posix_mq *mq = get(self, 1);
struct rw_args x;
mqd_t rv;
setup_send_buffer(&x, buffer);
x.des = mq->des;
x.timeout = NULL;
x.msg_prio = 0;
rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
if (rv == MQD_INVALID)
rb_sys_fail("mq_send");
return self;
}
|
#attr ⇒ Object
mq.attr => mq_attr
Returns a POSIX_MQ::Attr struct containing the attributes of the message queue. See the mq_getattr(3) manpage for more details.
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 |
# File 'ext/posix_mq/posix_mq.c', line 565
static VALUE getattr(VALUE self)
{
struct posix_mq *mq = get(self, 1);
struct mq_attr attr;
VALUE astruct;
VALUE *ptr;
if (mq_getattr(mq->des, &attr) == MQD_INVALID)
rb_sys_fail("mq_getattr");
astruct = rb_struct_alloc_noinit(cAttr);
ptr = RSTRUCT_PTR(astruct);
ptr[0] = LONG2NUM(attr.mq_flags);
ptr[1] = LONG2NUM(attr.mq_maxmsg);
ptr[2] = LONG2NUM(attr.mq_msgsize);
ptr[3] = LONG2NUM(attr.mq_curmsgs);
return astruct;
}
|
#attr=(astruct) ⇒ Object
mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
Only the IO::NONBLOCK flag may be set or unset (zero) in this manner. See the mq_setattr(3) manpage for more details.
Consider using the POSIX_MQ#nonblock= method as it is easier and more natural to use.
595 596 597 598 599 600 601 602 603 604 605 606 |
# File 'ext/posix_mq/posix_mq.c', line 595
static VALUE setattr(VALUE self, VALUE astruct)
{
struct posix_mq *mq = get(self, 1);
struct mq_attr newattr;
attr_from_struct(&newattr, astruct, 0);
if (mq_setattr(mq->des, &newattr, NULL) == MQD_INVALID)
rb_sys_fail("mq_setattr");
return astruct;
}
|
#close ⇒ Object
mq.close => nil
Closes the underlying message queue descriptor. If this descriptor had a registered notification request, the request will be removed so another descriptor or process may register a notification request. Message queue descriptors are automatically closed by garbage collection.
618 619 620 621 622 623 624 625 626 627 628 629 |
# File 'ext/posix_mq/posix_mq.c', line 618
static VALUE _close(VALUE self)
{
struct posix_mq *mq = get(self, 1);
if (mq_close(mq->des) == MQD_INVALID)
rb_sys_fail("mq_close");
mq->des = MQD_INVALID;
MQ_IO_SET(mq, Qnil);
return Qnil;
}
|
#closed? ⇒ Boolean
mq.closed? => true or false
Returns true
if the message queue descriptor is closed and therefore unusable, otherwise false
638 639 640 641 642 643 |
# File 'ext/posix_mq/posix_mq.c', line 638
static VALUE closed(VALUE self)
{
struct posix_mq *mq = get(self, 0);
return mq->des == MQD_INVALID ? Qtrue : Qfalse;
}
|
#name ⇒ Object
mq.name => string
Returns the string name of message queue associated with mq
651 652 653 654 655 656 |
# File 'ext/posix_mq/posix_mq.c', line 651
static VALUE name(VALUE self)
{
struct posix_mq *mq = get(self, 0);
return mq->name;
}
|
#nonblock=(nb) ⇒ Object
mq.nonblock = boolean => boolean
Enables or disables non-blocking operation for the message queue descriptor. Errno::EAGAIN will be raised in situations where the queue would block. This is not compatible with timeout
arguments to POSIX_MQ#send and POSIX_MQ#receive.
767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 |
# File 'ext/posix_mq/posix_mq.c', line 767
static VALUE setnonblock(VALUE self, VALUE nb)
{
struct mq_attr newattr, oldattr;
struct posix_mq *mq = get(self, 1);
if (nb == Qtrue)
newattr.mq_flags = O_NONBLOCK;
else if (nb == Qfalse)
newattr.mq_flags = 0;
else
rb_raise(rb_eArgError, "must be true or false");
if (mq_setattr(mq->des, &newattr, &oldattr) == MQD_INVALID)
rb_sys_fail("mq_setattr");
mq->msgsize = oldattr.mq_msgsize; /* optimization */
return nb;
}
|
#nonblock? ⇒ Boolean
mq.nonblock? => true or false
Returns the current non-blocking state of the message queue descriptor.
745 746 747 748 749 750 751 752 753 754 755 756 |
# File 'ext/posix_mq/posix_mq.c', line 745
static VALUE getnonblock(VALUE self)
{
struct mq_attr attr;
struct posix_mq *mq = get(self, 1);
if (mq_getattr(mq->des, &attr) == MQD_INVALID)
rb_sys_fail("mq_getattr");
mq->msgsize = attr.mq_msgsize; /* optimization */
return attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
}
|
#notify=(arg) ⇒ Object
mq.notify = signal => signal
Registers the notification request to deliver a given signal
to the current process when message is received. If signal
is nil
, it will unregister and disable the notification request to allow other processes to register a request. If signal
is false
, it will register a no-op notification request which will prevent other processes from registering a notification. Only one process may have a notification request for a queue at a time, Errno::EBUSY will be raised if there is already a notification request registration for the queue.
For readers of the mq_notify(3) manpage, passing false
is equivalent to SIGEV_NONE, and passing nil
is equivalent of passing a NULL notification pointer to mq_notify(3).
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 |
# File 'ext/posix_mq/posix_mq.c', line 704
static VALUE setnotify(VALUE self, VALUE arg)
{
struct posix_mq *mq = get(self, 1);
struct sigevent not;
struct sigevent * notification = ¬
VALUE rv = arg;
not.sigev_notify = SIGEV_SIGNAL;
switch (TYPE(arg)) {
case T_FALSE:
not.sigev_notify = SIGEV_NONE;
break;
case T_NIL:
notification = NULL;
break;
case T_FIXNUM:
not.sigev_signo = NUM2INT(arg);
break;
case T_SYMBOL:
case T_STRING:
not.sigev_signo = lookup_sig(arg);
rv = INT2NUM(not.sigev_signo);
break;
default:
/* maybe support Proc+thread via sigev_notify_function.. */
rb_raise(rb_eArgError, "must be a signal or nil");
}
if (mq_notify(mq->des, notification) == MQD_INVALID)
rb_sys_fail("mq_notify");
return rv;
}
|
#receive(*args) ⇒ Object
mq.receive([buffer, [timeout]]) => [ message, priority ]
Takes the highest priority message off the queue and returns an array containing the message as a String and the Integer priority of the message.
If the optional buffer
is present, then it must be a String which will receive the data.
If the optional timeout
is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout
has elapsed and there are no messages in the queue.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 |
# File 'ext/posix_mq/posix_mq.c', line 522
static VALUE receive(int argc, VALUE *argv, VALUE self)
{
struct posix_mq *mq = get(self, 1);
struct rw_args x;
VALUE buffer, timeout;
ssize_t r;
struct timespec expire;
if (mq->msgsize < 0)
get_msgsize(mq);
rb_scan_args(argc, argv, "02", &buffer, &timeout);
x.timeout = convert_timeout(&expire, timeout);
if (NIL_P(buffer)) {
buffer = rb_str_new(0, mq->msgsize);
} else {
StringValue(buffer);
rb_str_modify(buffer);
rb_str_resize(buffer, mq->msgsize);
}
OBJ_TAINT(buffer);
x.msg_ptr = RSTRING_PTR(buffer);
x.msg_len = (size_t)mq->msgsize;
x.des = mq->des;
r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0);
if (r < 0)
rb_sys_fail("mq_receive");
rb_str_set_len(buffer, r);
return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
}
|
#send(*args) ⇒ Object
mq.send(string [,priority[, timeout]]) => nil
Inserts the given string
into the message queue with an optional, unsigned integer priority
. If the optional timeout
is specified, then Errno::ETIMEDOUT will be raised if the operation cannot complete before timeout
seconds has elapsed. Without timeout
, this method may block until the queue is writable.
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'ext/posix_mq/posix_mq.c', line 429
static VALUE _send(int argc, VALUE *argv, VALUE self)
{
struct posix_mq *mq = get(self, 1);
struct rw_args x;
VALUE buffer, prio, timeout;
mqd_t rv;
struct timespec expire;
rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
setup_send_buffer(&x, buffer);
x.des = mq->des;
x.timeout = convert_timeout(&expire, timeout);
x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
if (rv == MQD_INVALID)
rb_sys_fail("mq_send");
return Qnil;
}
|
#to_io ⇒ Object
mq.to_io => IO
Returns an IO.select-able IO
object. This method is only available under Linux and is not intended to be portable.
484 485 486 487 488 489 490 491 492 493 |
# File 'ext/posix_mq/posix_mq.c', line 484
static VALUE to_io(VALUE self)
{
struct posix_mq *mq = get(self, 1);
int fd = MQD_TO_FD(mq->des);
if (NIL_P(mq->io))
mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
return mq->io;
}
|
#unlink ⇒ Object
mq.unlink => mq
Unlinks the message queue to prevent other processes from accessing it. All existing queue descriptors to this queue including those opened by other processes are unaffected. The queue will only be destroyed when the last process with open descriptors to this queue closes the descriptors.
397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'ext/posix_mq/posix_mq.c', line 397
static VALUE _unlink(VALUE self)
{
struct posix_mq *mq = get(self, 0);
mqd_t rv;
void *ptr = (void *)mq->name;
assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
if (rv == MQD_INVALID)
rb_sys_fail("mq_unlink");
return self;
}
|