Class: POSIX_MQ

Inherits:
Object
  • Object
show all
Defined in:
lib/posix_mq.rb,
ext/posix_mq/posix_mq.c

Overview

This class represents an POSIX message queue descriptor (mqd_t) object. It matches the C API for POSIX messages queues closely.

See the README for examples on how to use it.

Defined Under Namespace

Classes: Attr

Constant Summary collapse

OPEN_MAX =

The maximum number of open message descriptors supported by the system. This may be -1, in which case it is dynamically set at runtime. Consult your operating system documentation for system-specific information about this.

LONG2NUM(sysconf(_SC_MQ_OPEN_MAX))
PRIO_MAX =

The maximum priority that may be specified for POSIX_MQ#send On POSIX-compliant systems, this is at least 31, but some systems allow higher limits. The minimum priority is always zero.

LONG2NUM(sysconf(_SC_MQ_PRIO_MAX))

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.for_fd(socket) ⇒ Object

POSIX_MQ.for_fd(socket) => mq

Adopts a socket as a POSIX message queue. Argument will be checked to ensure it is a POSIX message queue socket.

This is useful for adopting systemd sockets passed via the ListenMessageQueue directive. Returns a POSIX_MQ instance. This method is only available under Linux and FreeBSD and is not intended to be portable.



406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'ext/posix_mq/posix_mq.c', line 406

static VALUE for_fd(VALUE klass, VALUE socket)
{
  VALUE mqv = alloc(klass);
  struct posix_mq *mq = get(mqv, 0);
  mqd_t mqd;

  mq->name = Qnil;
  mqd = FD_TO_MQD(NUM2INT(socket));

  if (mq_getattr(mqd, &mq->attr) < 0)
    rb_sys_fail("provided file descriptor is not a POSIX MQ");

  mq->des = mqd;
  return mqv;
}

.open(*args) ⇒ Object

Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit. All all arguments are passed to POSIX_MQ.new.



20
21
22
23
24
25
26
27
28
# File 'lib/posix_mq.rb', line 20

def self.open(*args)
  mq = new(*args)
  block_given? or return mq
  begin
    yield mq
  ensure
    mq.close unless mq.closed?
  end
end

POSIX_MQ.unlink(name) => 1

Unlinks the message queue given by name. The queue will be destroyed when the last process with the queue open closes its queue descriptors.



540
541
542
543
544
545
546
547
548
# File 'ext/posix_mq/posix_mq.c', line 540

static VALUE s_unlink(VALUE self, VALUE name)
{
  int rv = mq_unlink(StringValueCStr(name));

  if (rv < 0)
    rb_sys_fail("mq_unlink");

  return INT2NUM(1);
}

Instance Method Details

#<<(buffer) ⇒ Object

mq << string => mq

Inserts the given string into the message queue with a default priority of 0 and no timeout.

Returns itself so its calls may be chained. This use is only recommended only for users who expect blocking behavior from the queue.



643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
# File 'ext/posix_mq/posix_mq.c', line 643

static VALUE send0(VALUE self, VALUE buffer)
{
  struct posix_mq *mq = get(self, 1);
  struct rw_args x;

  setup_send_buffer(&x, buffer);
  x.des = mq->des;
  x.timeout = NULL;
  x.msg_prio = 0;

retry:
  WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
  if (x.retval < 0) {
    if (errno == EINTR)
      goto retry;
    rb_sys_fail("mq_send");
  }

  return self;
}

#attrObject

mq.attr => mq_attr

Returns a POSIX_MQ::Attr struct containing the attributes of the message queue. See the mq_getattr(3) manpage for more details.



789
790
791
792
793
794
795
796
797
798
799
800
801
# File 'ext/posix_mq/posix_mq.c', line 789

static VALUE getattr(VALUE self)
{
  struct posix_mq *mq = get(self, 1);

  if (mq_getattr(mq->des, &mq->attr) < 0)
    rb_sys_fail("mq_getattr");

  return rb_funcall(cAttr, id_new, 4,
                    LONG2NUM(mq->attr.mq_flags),
                    LONG2NUM(mq->attr.mq_maxmsg),
                    LONG2NUM(mq->attr.mq_msgsize),
                    LONG2NUM(mq->attr.mq_curmsgs));
}

#attr=(astruct) ⇒ Object

mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr

Only the IO::NONBLOCK flag may be set or unset (zero) in this manner. See the mq_setattr(3) manpage for more details.

Consider using the POSIX_MQ#nonblock= method as it is easier and more natural to use.



813
814
815
816
817
818
819
820
821
822
823
824
# File 'ext/posix_mq/posix_mq.c', line 813

static VALUE setattr(VALUE self, VALUE astruct)
{
  struct posix_mq *mq = get(self, 1);
  struct mq_attr newattr;

  rstruct2mqattr(&newattr, astruct, 0);

  if (mq_setattr(mq->des, &newattr, NULL) < 0)
    rb_sys_fail("mq_setattr");

  return astruct;
}

#autoclose=(autoclose) ⇒ Object

mq.autoclose = boolean => boolean

Determines whether or not the mq will be closed automatically at finalization.



1111
1112
1113
1114
1115
1116
1117
1118
# File 'ext/posix_mq/posix_mq.c', line 1111

static VALUE setautoclose(VALUE self, VALUE autoclose)
{
  struct posix_mq *mq = get(self, 1);

  MQ_IO_SET_AUTOCLOSE(mq, autoclose);
  mq->autoclose = RTEST(autoclose) ? 1 : 0;
  return autoclose;
}

#autoclose?Boolean

mq.autoclose? => boolean

Returns whether or not the mq will be closed automatically at finalization.

Returns:

  • (Boolean)


1127
1128
1129
1130
1131
1132
# File 'ext/posix_mq/posix_mq.c', line 1127

static VALUE autoclose_p(VALUE self)
{
  struct posix_mq *mq = get(self, 1);

  return mq->autoclose ? Qtrue : Qfalse;
}

#closeObject

mq.close => nil

Closes the underlying message queue descriptor. If this descriptor had a registered notification request, the request will be removed so another descriptor or process may register a notification request. Message queue descriptors are automatically closed by garbage collection.



836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
# File 'ext/posix_mq/posix_mq.c', line 836

static VALUE _close(VALUE self)
{
  struct posix_mq *mq;

  if (IDEMPOTENT_IO_CLOSE) { /* defined in extconf.rb */
    mq = get(self, 0);
    if (!mq || (mq->des == MQD_INVALID))
      return Qnil;
  } else {
    mq = get(self, 1);
  }

  if (! MQ_IO_CLOSE(mq)) {
    if (mq_close(mq->des) < 0)
      rb_sys_fail("mq_close");
  }
  mq->des = MQD_INVALID;

  return Qnil;
}

#closed?Boolean

mq.closed? => true or false

Returns true if the message queue descriptor is closed and therefore unusable, otherwise false

Returns:

  • (Boolean)


864
865
866
867
868
869
# File 'ext/posix_mq/posix_mq.c', line 864

static VALUE closed(VALUE self)
{
  struct posix_mq *mq = get(self, 0);

  return mq->des == MQD_INVALID ? Qtrue : Qfalse;
}

#dupObject Also known as: clone

There’s no point in ever duping a POSIX_MQ object. All send/receive operations are atomic and only one native thread may be notified at a time



74
75
76
# File 'lib/posix_mq.rb', line 74

def dup
  self
end

#nameObject

mq.name => string

Returns the string name of message queue associated with mq



877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
# File 'ext/posix_mq/posix_mq.c', line 877

static VALUE name(VALUE self)
{
  struct posix_mq *mq = get(self, 0);

  if (NIL_P(mq->name)) {
    /*
     * We could use readlink(2) on /proc/self/fd/N, but lots of
     * care required.
     * http://stackoverflow.com/questions/1188757/
     */
    rb_raise(rb_eArgError, "can not get name of an adopted socket");
  }

  /* XXX compatibility: in retrospect, we could return a frozen string */
  return rb_str_dup(mq->name);
}

#nonblock=(nb) ⇒ Object

mq.nonblock = boolean => boolean

Enables or disables non-blocking operation for the message queue descriptor. Errno::EAGAIN will be raised in situations where the queue would block. This is not compatible with timeout arguments to POSIX_MQ#send and POSIX_MQ#receive.



1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
# File 'ext/posix_mq/posix_mq.c', line 1084

static VALUE setnonblock(VALUE self, VALUE nb)
{
  struct mq_attr newattr;
  struct posix_mq *mq = get(self, 1);

  if (nb == Qtrue)
    newattr.mq_flags = O_NONBLOCK;
  else if (nb == Qfalse)
    newattr.mq_flags = 0;
  else
    rb_raise(rb_eArgError, "must be true or false");

  if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
    rb_sys_fail("mq_setattr");

  mq->attr.mq_flags = newattr.mq_flags;

  return nb;
}

#nonblock?Boolean

mq.nonblock? => true or false

Returns the current non-blocking state of the message queue descriptor.

Returns:

  • (Boolean)


1066
1067
1068
1069
1070
1071
1072
1073
# File 'ext/posix_mq/posix_mq.c', line 1066

static VALUE nonblock_p(VALUE self)
{
  struct posix_mq *mq = get(self, 1);

  if (mq_getattr(mq->des, &mq->attr) < 0)
    rb_sys_fail("mq_getattr");
  return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
}

#notify(&block) ⇒ Object

Executes the given block upon reception of the next message in an empty queue. If the message queue is not empty, then this block will only be fired after the queue is emptied and repopulated with one message.

This block will only be executed upon the arrival of the first message and must be reset/reenabled for subsequent notifications. This block will execute in a separate Ruby Thread (and thus will safely have the GVL by default).

This method is only supported on platforms that implement SIGEV_THREAD functionality in mq_notify(3). So far we only know of glibc + Linux supporting this. Please let us know if your platform can support this functionality and are willing to test for us <[email protected]>

As far as we can tell, this method is not very useful nor efficient. You would be better served using signals or just blocking. On Linux and FreeBSD, you can use POSIX_MQ with I/O multiplexing (IO.select, EventMachine), too.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/posix_mq.rb', line 50

def notify(&block)
  block.arity == 1 or
    raise ArgumentError, "arity of notify block must be 1"
  r, w = IO.pipe
  notify_exec(w, Thread.new(block) do |blk|
    begin
      begin
        r.read(1) or raise Errno::EINTR
      rescue Errno::EINTR, Errno::EAGAIN
        retry
      end
      blk.call(self)
    ensure
      notify_cleanup
      r.close rescue nil
      w.close rescue nil
    end
  end)
  nil
end

#notify=(arg) ⇒ Object

mq.notify = signal => signal

Registers the notification request to deliver a given signal to the current process when message is received. If signal is nil, it will unregister and disable the notification request to allow other processes to register a request. If signal is false, it will register a no-op notification request which will prevent other processes from registering a notification. If signal is an IO object, it will spawn a thread upon the arrival of the next message and write one “\0” byte to the file descriptor belonging to that IO object. Only one process may have a notification request for a queue at a time, Errno::EBUSY will be raised if there is already a notification request registration for the queue.

Notifications are only fired once and processes must reregister for subsequent notifications.

For readers of the mq_notify(3) manpage, passing false is equivalent to SIGEV_NONE, and passing nil is equivalent of passing a NULL notification pointer to mq_notify(3).



1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
# File 'ext/posix_mq/posix_mq.c', line 1026

static VALUE setnotify(VALUE self, VALUE arg)
{
  struct posix_mq *mq = get(self, 1);
  struct sigevent not;
  struct sigevent * notification = &not;
  VALUE rv = arg;

  notify_cleanup(self);
  not.sigev_notify = SIGEV_SIGNAL;

  switch (TYPE(arg)) {
  case T_FALSE:
    not.sigev_notify = SIGEV_NONE;
    break;
  case T_NIL:
    notification = NULL;
    break;
  case T_FIXNUM:
    not.sigev_signo = NUM2INT(arg);
    break;
  case T_SYMBOL:
  case T_STRING:
    not.sigev_signo = lookup_sig(arg);
    rv = INT2NUM(not.sigev_signo);
    break;
  default:
    rb_raise(rb_eArgError, "must be a signal or nil");
  }

  my_mq_notify(mq->des, notification);

  return rv;
}

#receive(*args) ⇒ Object

mq.receive([buffer, [timeout]]) => [ message, priority ]

Takes the highest priority message off the queue and returns an array containing the message as a String and the Integer priority of the message.

If the optional buffer is present, then it must be a String which will receive the data.

If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.

On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.



709
710
711
712
# File 'ext/posix_mq/posix_mq.c', line 709

static VALUE receive(int argc, VALUE *argv, VALUE self)
{
  return _receive(PMQ_WANTARRAY, argc, argv, self);
}

#send(*args) ⇒ Object

mq.send(string [,priority[, timeout]]) => true

Inserts the given string into the message queue with an optional, unsigned integer priority. If the optional timeout is specified, then Errno::ETIMEDOUT will be raised if the operation cannot complete before timeout seconds has elapsed. Without timeout, this method may block until the queue is writable.

On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.



600
601
602
603
# File 'ext/posix_mq/posix_mq.c', line 600

static VALUE my_send(int argc, VALUE *argv, VALUE self)
{
  return _send(0, argc, argv, self);
}

#shift(*args) ⇒ Object

mq.shift([buffer, [timeout]]) => message

Takes the highest priority message off the queue and returns the message as a String.

If the optional buffer is present, then it must be a String which will receive the data.

If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.

On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.



732
733
734
735
# File 'ext/posix_mq/posix_mq.c', line 732

static VALUE shift(int argc, VALUE *argv, VALUE self)
{
  return _receive(0, argc, argv, self);
}

#to_ioObject

mq.to_io => IO

Returns an IO.select-able IO object. This method is only available under Linux and FreeBSD and is not intended to be portable.



672
673
674
675
676
677
678
679
680
681
682
683
684
685
# File 'ext/posix_mq/posix_mq.c', line 672

static VALUE to_io(VALUE self)
{
  struct posix_mq *mq = get(self, 1);
  int fd = MQD_TO_FD(mq->des);

  if (NIL_P(mq->io)) {
    mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));

    if (!mq->autoclose)
      rb_funcall(mq->io, id_setautoclose, 1, Qfalse);
  }

  return mq->io;
}

#tryreceive(*args) ⇒ Object

mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil

Exactly like POSIX_MQ#receive, except it returns nil instead of raising Errno::EAGAIN when non-blocking operation is desired.

This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.



1175
1176
1177
1178
# File 'ext/posix_mq/posix_mq.c', line 1175

static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
{
  return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
}

#trysend(*args) ⇒ Object

mq.trysend(string [,priority[, timeout]]) => true or false

Exactly like POSIX_MQ#send, except it returns false instead of raising Errno::EAGAIN when non-blocking operation is desired and returns true on success instead of nil.

This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.



1145
1146
1147
1148
# File 'ext/posix_mq/posix_mq.c', line 1145

static VALUE trysend(int argc, VALUE *argv, VALUE self)
{
  return _send(PMQ_TRY, argc, argv, self);
}

#tryshift(*args) ⇒ Object

mq.tryshift([buffer [, timeout]]) => message or nil

Exactly like POSIX_MQ#shift, except it returns nil instead of raising Errno::EAGAIN when non-blocking operation is desired.

This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.



1160
1161
1162
1163
# File 'ext/posix_mq/posix_mq.c', line 1160

static VALUE tryshift(int argc, VALUE *argv, VALUE self)
{
  return _receive(PMQ_TRY, argc, argv, self);
}

mq.unlink => mq

Unlinks the message queue to prevent other processes from accessing it. All existing queue descriptors to this queue including those opened by other processes are unaffected. The queue will only be destroyed when the last process with open descriptors to this queue closes the descriptors.



560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# File 'ext/posix_mq/posix_mq.c', line 560

static VALUE _unlink(VALUE self)
{
  struct posix_mq *mq = get(self, 0);
  int rv;

  if (NIL_P(mq->name)) {
    rb_raise(rb_eArgError, "can not unlink an adopted socket");
  }

  assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");

  rv = mq_unlink(RSTRING_PTR(mq->name));
  if (rv < 0)
    rb_sys_fail("mq_unlink");

  return self;
}