Class: SysVMQ
- Inherits:
-
Object
- Object
- SysVMQ
- Defined in:
- ext/sysvmq.c
Constant Summary collapse
- IPC_CREAT =
Define platform specific constants from headers
INT2NUM(IPC_CREAT)
- IPC_EXCL =
INT2NUM(IPC_EXCL)
- IPC_NOWAIT =
INT2NUM(IPC_NOWAIT)
- IPC_RMID =
INT2NUM(IPC_RMID)
- IPC_SET =
INT2NUM(IPC_SET)
- IPC_STAT =
INT2NUM(IPC_STAT)
- IPC_INFO =
INT2NUM(IPC_INFO)
Instance Method Summary collapse
-
#destroy ⇒ Object
Proxies a call with IPC_RMID to
sysvmq_statsto remove the queue. -
#initialize(key, buffer_size, flags) ⇒ Object
constructor
other calls that require a
msgid, for convienence and to share the buffer. -
#receive(*args) ⇒ Object
Receive a message from the message queue.
-
#send(*args) ⇒ Object
Sends a message to the message queue.
-
#stats(*args) ⇒ Object
TODO: IPC_SET is currently not supported.
Constructor Details
#initialize(key, buffer_size, flags) ⇒ Object
other calls that require a msgid, for convienence and to share the buffer.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'ext/sysvmq.c', line 295
VALUE
sysvmq_initialize(VALUE self, VALUE key, VALUE buffer_size, VALUE flags)
{
sysvmq_t* sysv;
size_t msgbuf_size;
// TODO: Also support string keys, so you can pass '0xDEADC0DE'
Check_Type(key, T_FIXNUM);
Check_Type(flags, T_FIXNUM);
Check_Type(buffer_size, T_FIXNUM);
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
// TODO: This probably doesn't hold on all platforms.
sysv->key = FIX2LONG(key);
while ((sysv->id = msgget(sysv->key, FIX2INT(flags))) < 0) {
if (errno == EINTR) {
rb_thread_wait_for(polling_interval); // TODO: Really necessary here?
continue;
}
rb_sys_fail("Failed opening the message queue.");
}
// Allocate the msgbuf buffer once for the instance, to not allocate a buffer
// for each message sent. This makes SysVMQ not thread-safe (requiring a
// buffer for each thread), but is a reasonable trade-off for now for the
// performance.
sysv->buffer_size = FIX2INT(buffer_size);
msgbuf_size = sysv->buffer_size * sizeof(char) + sizeof(long);
// Note that this is a zero-length array, so we size the struct to size of the
// header (long, the mtype) and then the rest of the space for message buffer.
sysv->msgbuf = (sysvmq_msgbuf_t*) xmalloc(msgbuf_size);
return self;
}
|
Instance Method Details
#destroy ⇒ Object
Proxies a call with IPC_RMID to sysvmq_stats to remove the queue.
136 137 138 139 140 141 142 |
# File 'ext/sysvmq.c', line 136
static VALUE
sysvmq_destroy(VALUE self)
{
VALUE argv[1];
argv[0] = INT2FIX(IPC_RMID);
return sysvmq_stats(1, argv, self);
}
|
#receive(*args) ⇒ Object
Receive a message from the message queue.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'ext/sysvmq.c', line 172
VALUE
sysvmq_receive(int argc, VALUE *argv, VALUE self)
{
VALUE type = INT2FIX(0);
VALUE flags = INT2FIX(0);
sysvmq_t* sysv;
sysvmq_blocking_call_t blocking;
if (argc > 2) {
rb_raise(rb_eArgError, "Wrong number of arguments (0..2)");
}
if (argc >= 1) type = argv[0];
if (argc == 2) flags = argv[1];
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
Check_Type(type, T_FIXNUM);
Check_Type(flags, T_FIXNUM);
// Attach blocking call parameters to the struct passed to the blocking
// function wrapper.
blocking.flags = FIX2INT(flags);
blocking.type = FIX2INT(type);
blocking.sysv = sysv;
// msgrcv(2) can block sending a message, if IPC_NOWAIT is not passed.
// We unlock the GVL waiting for the call so other threads (e.g. signal
// handling) can continue to work. Sets `msg_size` on `blocking` with the size
// of the message returned.
while (rb_thread_call_without_gvl2(sysvmq_maybe_blocking_receive, &blocking, RUBY_UBF_IO, NULL) == NULL
&& blocking.retval < 0) {
if (errno == EINTR) {
rb_thread_check_ints();
continue;
}
rb_sys_fail("Failed receiving message from queue");
}
// Reencode with default external encoding
return rb_enc_str_new(sysv->msgbuf->mtext, blocking.retval, rb_default_external_encoding());
}
|
#send(*args) ⇒ Object
Sends a message to the message queue.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'ext/sysvmq.c', line 232
VALUE
sysvmq_send(int argc, VALUE *argv, VALUE self)
{
VALUE message;
VALUE priority = INT2FIX(1);
VALUE flags = INT2FIX(0);
sysvmq_blocking_call_t blocking;
sysvmq_t* sysv;
if (argc > 3 || argc == 0) {
rb_raise(rb_eArgError, "Wrong number of arguments (1..3)");
}
message = argv[0];
if (argc >= 2) priority = argv[1];
if (argc == 3) flags = argv[2];
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
Check_Type(flags, T_FIXNUM);
Check_Type(priority, T_FIXNUM);
// TODO: Call to_s on message if it responds to
// Attach blocking call parameters to the struct passed to the blocking
// function wrapper.
blocking.flags = FIX2INT(flags);
blocking.size = RSTRING_LEN(message);
blocking.sysv = sysv;
// The buffer can be obtained from `sysvmq_maybe_blocking_send`, instead of
// passing it, set it directly on the instance struct.
sysv->msgbuf->mtype = FIX2INT(priority);
if (blocking.size > sysv->buffer_size) {
rb_raise(rb_eArgError, "Size of message is bigger than buffer size.");
}
// TODO: Can a string copy be avoided?
strncpy(sysv->msgbuf->mtext, StringValueCStr(message), blocking.size);
// msgsnd(2) can block waiting for a message, if IPC_NOWAIT is not passed.
// We unlock the GVL waiting for the call so other threads (e.g. signal
// handling) can continue to work.
while (rb_thread_call_without_gvl2(sysvmq_maybe_blocking_send, &blocking, RUBY_UBF_IO, NULL) == NULL
&& blocking.retval < 0) {
if (errno == EINTR) {
rb_thread_check_ints();
continue;
}
rb_sys_fail("Failed sending message to queue");
}
return message;
}
|
#stats(*args) ⇒ Object
TODO: IPC_SET is currently not supported.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'ext/sysvmq.c', line 87
static VALUE
sysvmq_stats(int argc, VALUE *argv, VALUE self)
{
struct msqid_ds info;
VALUE info_hash;
VALUE cmd;
sysvmq_t* sysv;
// Optional argument handling
if (argc > 1) {
rb_raise(rb_eArgError, "Wrong number of arguments (0..1)");
}
// Default to IPC_STAT
cmd = argc == 1 ? argv[0] : INT2FIX(IPC_STAT);
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
// TODO: Does FIX2INT actually perform this check already?
Check_Type(cmd, T_FIXNUM);
while (msgctl(sysv->id, FIX2INT(cmd), &info) < 0) {
if (errno == EINTR) {
rb_thread_wait_for(polling_interval);
continue;
}
rb_sys_fail("Failed executing msgctl(2) command.");
}
// Map values from struct to a hash
// TODO: Add all the fields
// TODO: They are probably not ints..
info_hash = rb_hash_new();
rb_hash_aset(info_hash, ID2SYM(rb_intern("count")), INT2FIX(info.msg_qnum));
rb_hash_aset(info_hash, ID2SYM(rb_intern("maximum_size")), INT2FIX(info.msg_qbytes));
// TODO: Can probably make a better checker here for whether the struct
// actually has the member.
// TODO: BSD support?
#ifdef __linux__
rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.__msg_cbytes));
#elif __APPLE__
rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.msg_cbytes));
#endif
return info_hash;
}
|