Class: POSIX::Mqueue

Inherits:
Object
  • Object
show all
Defined in:
lib/posix/mqueue/version.rb,
ext/posix/mqueue.c

Constant Summary collapse

VERSION =
"0.0.3"

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'ext/posix/mqueue.c', line 127

VALUE posix_mqueue_initialize(VALUE self, VALUE queue)
{
  // TODO: Modify these options from initialize arguments
  // TODO: Set nonblock and handle error in #push
  struct mq_attr attr = {
    .mq_flags   = 0,          // Flags, 0 or O_NONBLOCK
    .mq_maxmsg  = 10,         // Max messages in queue
    .mq_msgsize = 4096,       // Max message size (bytes)
    .mq_curmsgs = 0           // # currently in queue
  };

  mqueue_t* data;
  TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);

  if (data->fd != -1) {
    // This would cause a memleak otherwise
    rb_raise(rb_eRuntimeError, "Illegal reinitialization");
  }

  data->attr = attr;
  data->queue_len = RSTRING_LEN(queue);
  data->queue = ruby_strdup(StringValueCStr(queue));
  data->fd = mq_open(data->queue, O_CREAT | O_RDWR, S_IRWXU | S_IRWXO | S_IRWXG, &data->attr);

  if (data->fd == (mqd_t)-1) {
    rb_sys_fail("Failed opening the message queue");
  }

  return self;
}

Instance Method Details

#receiveObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'ext/posix/mqueue.c', line 98

VALUE posix_mqueue_receive(VALUE self)
{
  int err;
  size_t buf_size;
  char *buf;
  VALUE str;

  mqueue_t* data;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);

  buf_size = data->attr.mq_msgsize + 1;

  // Make sure the buffer is capable
  buf = (char*)malloc(buf_size);

  // TODO: Specify priority
  err = mq_receive(data->fd, buf, buf_size, NULL);

  if (err < 0) {
    rb_sys_fail("Message retrieval failed");
  }

  str = rb_str_new(buf, err);
  free(buf);

  return str;
}

#send(message) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'ext/posix/mqueue.c', line 77

VALUE posix_mqueue_send(VALUE self, VALUE message)
{
  int err;
  mqueue_t* data;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);

  if (!RB_TYPE_P(message, T_STRING)) { 
    rb_raise(rb_eTypeError, "Message must be a string"); 
  }

  // TODO: Custom priority
  err = mq_send(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10);

  if (err < 0) {
    rb_sys_fail("Message sending failed");
  }
  
  return Qtrue;
}


64
65
66
67
68
69
70
71
72
73
74
75
# File 'ext/posix/mqueue.c', line 64

VALUE posix_mqueue_unlink(VALUE self)
{
  mqueue_t* data;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);

  if (mq_unlink(data->queue) == -1) {
    rb_sys_fail("Message queue unlinking failed");
  }

  return Qtrue;
}