Class: POSIX_MQ
- Inherits:
-
Object
- Object
- POSIX_MQ
- Defined in:
- lib/posix_mq.rb,
ext/posix_mq/posix_mq.c
Overview
This class represents an POSIX message queue descriptor (mqd_t) object. It matches the C API for POSIX messages queues closely.
See the README for examples on how to use it.
Defined Under Namespace
Classes: Attr
Constant Summary collapse
- OPEN_MAX =
The maximum number of open message descriptors supported by the system. This may be -1, in which case it is dynamically set at runtime. Consult your operating system documentation for system-specific information about this.
LONG2NUM(sysconf(_SC_MQ_OPEN_MAX))
- PRIO_MAX =
The maximum priority that may be specified for POSIX_MQ#send On POSIX-compliant systems, this is at least 31, but some systems allow higher limits. The minimum priority is always zero.
LONG2NUM(sysconf(_SC_MQ_PRIO_MAX))
Class Method Summary collapse
-
.for_fd(socket) ⇒ Object
POSIX_MQ.for_fd(socket) => mq.
-
.open(*args) ⇒ Object
Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit.
-
.unlink(name) ⇒ Object
POSIX_MQ.unlink(name) => 1.
Instance Method Summary collapse
-
#<<(buffer) ⇒ Object
mq << string => mq.
-
#attr ⇒ Object
mq.attr => mq_attr.
-
#attr=(astruct) ⇒ Object
mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr.
-
#autoclose=(autoclose) ⇒ Object
mq.autoclose = boolean => boolean.
-
#autoclose? ⇒ Boolean
mq.autoclose? => boolean.
-
#close ⇒ Object
mq.close => nil.
-
#closed? ⇒ Boolean
mq.closed? => true or false.
-
#dup ⇒ Object
(also: #clone)
There’s no point in ever duping a POSIX_MQ object.
-
#name ⇒ Object
mq.name => string.
-
#nonblock=(nb) ⇒ Object
mq.nonblock = boolean => boolean.
-
#nonblock? ⇒ Boolean
mq.nonblock? => true or false.
-
#notify(&block) ⇒ Object
Executes the given block upon reception of the next message in an empty queue.
-
#notify=(arg) ⇒ Object
mq.notify = signal => signal.
-
#receive(*args) ⇒ Object
mq.receive([buffer, [timeout]]) => [ message, priority ].
-
#send(*args) ⇒ Object
mq.send(string [,priority[, timeout]]) => true.
-
#shift(*args) ⇒ Object
mq.shift([buffer, [timeout]]) => message.
-
#to_io ⇒ Object
mq.to_io => IO.
-
#tryreceive(*args) ⇒ Object
mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil.
-
#trysend(*args) ⇒ Object
mq.trysend(string [,priority[, timeout]]) =>
trueorfalse. -
#tryshift(*args) ⇒ Object
mq.tryshift([buffer [, timeout]]) => message or nil.
-
#unlink ⇒ Object
mq.unlink => mq.
Class Method Details
.for_fd(socket) ⇒ Object
POSIX_MQ.for_fd(socket) => mq
Adopts a socket as a POSIX message queue. Argument will be checked to ensure it is a POSIX message queue socket.
This is useful for adopting systemd sockets passed via the ListenMessageQueue directive. Returns a POSIX_MQ instance. This method is only available under Linux and FreeBSD and is not intended to be portable.
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
# File 'ext/posix_mq/posix_mq.c', line 406 static VALUE for_fd(VALUE klass, VALUE socket) { VALUE mqv = alloc(klass); struct posix_mq *mq = get(mqv, 0); mqd_t mqd; mq->name = Qnil; mqd = FD_TO_MQD(NUM2INT(socket)); if (mq_getattr(mqd, &mq->attr) < 0) rb_sys_fail("provided file descriptor is not a POSIX MQ"); mq->des = mqd; return mqv; } |
.open(*args) ⇒ Object
Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit. All all arguments are passed to POSIX_MQ.new.
20 21 22 23 24 25 26 27 28 |
# File 'lib/posix_mq.rb', line 20 def self.open(*args) mq = new(*args) block_given? or return mq begin yield mq ensure mq.close unless mq.closed? end end |
.unlink(name) ⇒ Object
POSIX_MQ.unlink(name) => 1
Unlinks the message queue given by name. The queue will be destroyed when the last process with the queue open closes its queue descriptors.
540 541 542 543 544 545 546 547 548 |
# File 'ext/posix_mq/posix_mq.c', line 540 static VALUE s_unlink(VALUE self, VALUE name) { int rv = mq_unlink(StringValueCStr(name)); if (rv < 0) rb_sys_fail("mq_unlink"); return INT2NUM(1); } |
Instance Method Details
#<<(buffer) ⇒ Object
mq << string => mq
Inserts the given string into the message queue with a default priority of 0 and no timeout.
Returns itself so its calls may be chained. This use is only recommended only for users who expect blocking behavior from the queue.
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 |
# File 'ext/posix_mq/posix_mq.c', line 643 static VALUE send0(VALUE self, VALUE buffer) { struct posix_mq *mq = get(self, 1); struct rw_args x; setup_send_buffer(&x, buffer); x.des = mq->des; x.timeout = NULL; x.msg_prio = 0; retry: WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0); if (x.retval < 0) { if (errno == EINTR) goto retry; rb_sys_fail("mq_send"); } return self; } |
#attr ⇒ Object
mq.attr => mq_attr
Returns a POSIX_MQ::Attr struct containing the attributes of the message queue. See the mq_getattr(3) manpage for more details.
789 790 791 792 793 794 795 796 797 798 799 800 801 |
# File 'ext/posix_mq/posix_mq.c', line 789 static VALUE getattr(VALUE self) { struct posix_mq *mq = get(self, 1); if (mq_getattr(mq->des, &mq->attr) < 0) rb_sys_fail("mq_getattr"); return rb_funcall(cAttr, id_new, 4, LONG2NUM(mq->attr.mq_flags), LONG2NUM(mq->attr.mq_maxmsg), LONG2NUM(mq->attr.mq_msgsize), LONG2NUM(mq->attr.mq_curmsgs)); } |
#attr=(astruct) ⇒ Object
mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
Only the IO::NONBLOCK flag may be set or unset (zero) in this manner. See the mq_setattr(3) manpage for more details.
Consider using the POSIX_MQ#nonblock= method as it is easier and more natural to use.
813 814 815 816 817 818 819 820 821 822 823 824 |
# File 'ext/posix_mq/posix_mq.c', line 813 static VALUE setattr(VALUE self, VALUE astruct) { struct posix_mq *mq = get(self, 1); struct mq_attr newattr; rstruct2mqattr(&newattr, astruct, 0); if (mq_setattr(mq->des, &newattr, NULL) < 0) rb_sys_fail("mq_setattr"); return astruct; } |
#autoclose=(autoclose) ⇒ Object
mq.autoclose = boolean => boolean
Determines whether or not the mq will be closed automatically at finalization.
1111 1112 1113 1114 1115 1116 1117 1118 |
# File 'ext/posix_mq/posix_mq.c', line 1111 static VALUE setautoclose(VALUE self, VALUE autoclose) { struct posix_mq *mq = get(self, 1); MQ_IO_SET_AUTOCLOSE(mq, autoclose); mq->autoclose = RTEST(autoclose) ? 1 : 0; return autoclose; } |
#autoclose? ⇒ Boolean
mq.autoclose? => boolean
Returns whether or not the mq will be closed automatically at finalization.
1127 1128 1129 1130 1131 1132 |
# File 'ext/posix_mq/posix_mq.c', line 1127 static VALUE autoclose_p(VALUE self) { struct posix_mq *mq = get(self, 1); return mq->autoclose ? Qtrue : Qfalse; } |
#close ⇒ Object
mq.close => nil
Closes the underlying message queue descriptor. If this descriptor had a registered notification request, the request will be removed so another descriptor or process may register a notification request. Message queue descriptors are automatically closed by garbage collection.
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 |
# File 'ext/posix_mq/posix_mq.c', line 836 static VALUE _close(VALUE self) { struct posix_mq *mq; if (IDEMPOTENT_IO_CLOSE) { /* defined in extconf.rb */ mq = get(self, 0); if (!mq || (mq->des == MQD_INVALID)) return Qnil; } else { mq = get(self, 1); } if (! MQ_IO_CLOSE(mq)) { if (mq_close(mq->des) < 0) rb_sys_fail("mq_close"); } mq->des = MQD_INVALID; return Qnil; } |
#closed? ⇒ Boolean
mq.closed? => true or false
Returns true if the message queue descriptor is closed and therefore unusable, otherwise false
864 865 866 867 868 869 |
# File 'ext/posix_mq/posix_mq.c', line 864 static VALUE closed(VALUE self) { struct posix_mq *mq = get(self, 0); return mq->des == MQD_INVALID ? Qtrue : Qfalse; } |
#dup ⇒ Object Also known as: clone
There’s no point in ever duping a POSIX_MQ object. All send/receive operations are atomic and only one native thread may be notified at a time
74 75 76 |
# File 'lib/posix_mq.rb', line 74 def dup self end |
#name ⇒ Object
mq.name => string
Returns the string name of message queue associated with mq
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 |
# File 'ext/posix_mq/posix_mq.c', line 877 static VALUE name(VALUE self) { struct posix_mq *mq = get(self, 0); if (NIL_P(mq->name)) { /* * We could use readlink(2) on /proc/self/fd/N, but lots of * care required. * http://stackoverflow.com/questions/1188757/ */ rb_raise(rb_eArgError, "can not get name of an adopted socket"); } /* XXX compatibility: in retrospect, we could return a frozen string */ return rb_str_dup(mq->name); } |
#nonblock=(nb) ⇒ Object
mq.nonblock = boolean => boolean
Enables or disables non-blocking operation for the message queue descriptor. Errno::EAGAIN will be raised in situations where the queue would block. This is not compatible with timeout arguments to POSIX_MQ#send and POSIX_MQ#receive.
1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 |
# File 'ext/posix_mq/posix_mq.c', line 1084 static VALUE setnonblock(VALUE self, VALUE nb) { struct mq_attr newattr; struct posix_mq *mq = get(self, 1); if (nb == Qtrue) newattr.mq_flags = O_NONBLOCK; else if (nb == Qfalse) newattr.mq_flags = 0; else rb_raise(rb_eArgError, "must be true or false"); if (mq_setattr(mq->des, &newattr, &mq->attr) < 0) rb_sys_fail("mq_setattr"); mq->attr.mq_flags = newattr.mq_flags; return nb; } |
#nonblock? ⇒ Boolean
mq.nonblock? => true or false
Returns the current non-blocking state of the message queue descriptor.
1066 1067 1068 1069 1070 1071 1072 1073 |
# File 'ext/posix_mq/posix_mq.c', line 1066 static VALUE nonblock_p(VALUE self) { struct posix_mq *mq = get(self, 1); if (mq_getattr(mq->des, &mq->attr) < 0) rb_sys_fail("mq_getattr"); return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse; } |
#notify(&block) ⇒ Object
Executes the given block upon reception of the next message in an empty queue. If the message queue is not empty, then this block will only be fired after the queue is emptied and repopulated with one message.
This block will only be executed upon the arrival of the first message and must be reset/reenabled for subsequent notifications. This block will execute in a separate Ruby Thread (and thus will safely have the GVL by default).
This method is only supported on platforms that implement SIGEV_THREAD functionality in mq_notify(3). So far we only know of glibc + Linux supporting this. Please let us know if your platform can support this functionality and are willing to test for us <[email protected]>
As far as we can tell, this method is not very useful nor efficient. You would be better served using signals or just blocking. On Linux and FreeBSD, you can use POSIX_MQ with I/O multiplexing (IO.select, EventMachine), too.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/posix_mq.rb', line 50 def notify(&block) block.arity == 1 or raise ArgumentError, "arity of notify block must be 1" r, w = IO.pipe notify_exec(w, Thread.new(block) do |blk| begin begin r.read(1) or raise Errno::EINTR rescue Errno::EINTR, Errno::EAGAIN retry end blk.call(self) ensure notify_cleanup r.close rescue nil w.close rescue nil end end) nil end |
#notify=(arg) ⇒ Object
mq.notify = signal => signal
Registers the notification request to deliver a given signal to the current process when message is received. If signal is nil, it will unregister and disable the notification request to allow other processes to register a request. If signal is false, it will register a no-op notification request which will prevent other processes from registering a notification. If signal is an IO object, it will spawn a thread upon the arrival of the next message and write one “\0” byte to the file descriptor belonging to that IO object. Only one process may have a notification request for a queue at a time, Errno::EBUSY will be raised if there is already a notification request registration for the queue.
Notifications are only fired once and processes must reregister for subsequent notifications.
For readers of the mq_notify(3) manpage, passing false is equivalent to SIGEV_NONE, and passing nil is equivalent of passing a NULL notification pointer to mq_notify(3).
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 |
# File 'ext/posix_mq/posix_mq.c', line 1026 static VALUE setnotify(VALUE self, VALUE arg) { struct posix_mq *mq = get(self, 1); struct sigevent not; struct sigevent * notification = ¬ VALUE rv = arg; notify_cleanup(self); not.sigev_notify = SIGEV_SIGNAL; switch (TYPE(arg)) { case T_FALSE: not.sigev_notify = SIGEV_NONE; break; case T_NIL: notification = NULL; break; case T_FIXNUM: not.sigev_signo = NUM2INT(arg); break; case T_SYMBOL: case T_STRING: not.sigev_signo = lookup_sig(arg); rv = INT2NUM(not.sigev_signo); break; default: rb_raise(rb_eArgError, "must be a signal or nil"); } my_mq_notify(mq->des, notification); return rv; } |
#receive(*args) ⇒ Object
mq.receive([buffer, [timeout]]) => [ message, priority ]
Takes the highest priority message off the queue and returns an array containing the message as a String and the Integer priority of the message.
If the optional buffer is present, then it must be a String which will receive the data.
If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.
On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.
709 710 711 712 |
# File 'ext/posix_mq/posix_mq.c', line 709 static VALUE receive(int argc, VALUE *argv, VALUE self) { return _receive(PMQ_WANTARRAY, argc, argv, self); } |
#send(*args) ⇒ Object
mq.send(string [,priority[, timeout]]) => true
Inserts the given string into the message queue with an optional, unsigned integer priority. If the optional timeout is specified, then Errno::ETIMEDOUT will be raised if the operation cannot complete before timeout seconds has elapsed. Without timeout, this method may block until the queue is writable.
On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.
600 601 602 603 |
# File 'ext/posix_mq/posix_mq.c', line 600 static VALUE my_send(int argc, VALUE *argv, VALUE self) { return _send(0, argc, argv, self); } |
#shift(*args) ⇒ Object
mq.shift([buffer, [timeout]]) => message
Takes the highest priority message off the queue and returns the message as a String.
If the optional buffer is present, then it must be a String which will receive the data.
If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.
On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.
732 733 734 735 |
# File 'ext/posix_mq/posix_mq.c', line 732 static VALUE shift(int argc, VALUE *argv, VALUE self) { return _receive(0, argc, argv, self); } |
#to_io ⇒ Object
mq.to_io => IO
Returns an IO.select-able IO object. This method is only available under Linux and FreeBSD and is not intended to be portable.
672 673 674 675 676 677 678 679 680 681 682 683 684 685 |
# File 'ext/posix_mq/posix_mq.c', line 672 static VALUE to_io(VALUE self) { struct posix_mq *mq = get(self, 1); int fd = MQD_TO_FD(mq->des); if (NIL_P(mq->io)) { mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd)); if (!mq->autoclose) rb_funcall(mq->io, id_setautoclose, 1, Qfalse); } return mq->io; } |
#tryreceive(*args) ⇒ Object
mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
Exactly like POSIX_MQ#receive, except it returns nil instead of raising Errno::EAGAIN when non-blocking operation is desired.
This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.
1175 1176 1177 1178 |
# File 'ext/posix_mq/posix_mq.c', line 1175 static VALUE tryreceive(int argc, VALUE *argv, VALUE self) { return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self); } |
#trysend(*args) ⇒ Object
mq.trysend(string [,priority[, timeout]]) => true or false
Exactly like POSIX_MQ#send, except it returns false instead of raising Errno::EAGAIN when non-blocking operation is desired and returns true on success instead of nil.
This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.
1145 1146 1147 1148 |
# File 'ext/posix_mq/posix_mq.c', line 1145 static VALUE trysend(int argc, VALUE *argv, VALUE self) { return _send(PMQ_TRY, argc, argv, self); } |
#tryshift(*args) ⇒ Object
mq.tryshift([buffer [, timeout]]) => message or nil
Exactly like POSIX_MQ#shift, except it returns nil instead of raising Errno::EAGAIN when non-blocking operation is desired.
This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.
1160 1161 1162 1163 |
# File 'ext/posix_mq/posix_mq.c', line 1160 static VALUE tryshift(int argc, VALUE *argv, VALUE self) { return _receive(PMQ_TRY, argc, argv, self); } |
#unlink ⇒ Object
mq.unlink => mq
Unlinks the message queue to prevent other processes from accessing it. All existing queue descriptors to this queue including those opened by other processes are unaffected. The queue will only be destroyed when the last process with open descriptors to this queue closes the descriptors.
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 |
# File 'ext/posix_mq/posix_mq.c', line 560 static VALUE _unlink(VALUE self) { struct posix_mq *mq = get(self, 0); int rv; if (NIL_P(mq->name)) { rb_raise(rb_eArgError, "can not unlink an adopted socket"); } assert(TYPE(mq->name) == T_STRING && "mq->name is not a string"); rv = mq_unlink(RSTRING_PTR(mq->name)); if (rv < 0) rb_sys_fail("mq_unlink"); return self; } |