Class: SysVMQ
- Inherits:
-
Object
- Object
- SysVMQ
- Defined in:
- ext/sysvmq.c
Constant Summary collapse
- IPC_CREAT =
Define platform specific constants from headers
INT2NUM(IPC_CREAT)
- IPC_EXCL =
INT2NUM(IPC_EXCL)
- IPC_NOWAIT =
INT2NUM(IPC_NOWAIT)
- IPC_RMID =
INT2NUM(IPC_RMID)
- IPC_SET =
INT2NUM(IPC_SET)
- IPC_STAT =
INT2NUM(IPC_STAT)
- IPC_INFO =
INT2NUM(IPC_INFO)
Instance Method Summary collapse
-
#destroy ⇒ Object
Proxies a call with IPC_RMID to
sysvmq_statsto remove the queue. -
#initialize(key, buffer_size, flags) ⇒ Object
constructor
other calls that require a
msgid, for convienence and to share the buffer. -
#receive(*args) ⇒ Object
Receive a message from the message queue.
-
#send(*args) ⇒ Object
Sends a message to the message queue.
-
#stats(*args) ⇒ Object
TODO: IPC_SET is currently not supported.
Constructor Details
#initialize(key, buffer_size, flags) ⇒ Object
other calls that require a msgid, for convienence and to share the buffer.
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'ext/sysvmq.c', line 346 VALUE sysvmq_initialize(VALUE self, VALUE key, VALUE buffer_size, VALUE flags) { sysvmq_t* sysv; size_t msgbuf_size; // TODO: Also support string keys, so you can pass '0xDEADC0DE' Check_Type(key, T_FIXNUM); Check_Type(flags, T_FIXNUM); Check_Type(buffer_size, T_FIXNUM); TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); // (key_t) is a 32-bit integer (int). It's defined as `int` (at least on OS X // and Linux). However, `FIX2INT()` (from Ruby) will complain if the key is // something in the range 2^31-2^32, because of the sign bit. We use UINT to // trick Ruby, so it won't complain. sysv->key = (key_t) FIX2UINT(key); while ((sysv->id = msgget(sysv->key, FIX2INT(flags))) < 0) { if (errno == EINTR) { rb_thread_wait_for(polling_interval); // TODO: Really necessary here? continue; } rb_sys_fail("Failed opening the message queue."); } // Allocate the msgbuf buffer once for the instance, to not allocate a buffer // for each sent. This makes SysVMQ not thread-safe (requiring a // buffer for each thread), but is a reasonable trade-off for now for the // performance. sysv->buffer_size = (size_t) FIX2LONG(buffer_size + 1); msgbuf_size = sysv->buffer_size * sizeof(char) + sizeof(long); // Note that this is a zero-length array, so we size the struct to size of the // header (long, the mtype) and then the rest of the space for buffer. sysv->msgbuf = (sysvmq_msgbuf_t*) xmalloc(msgbuf_size); return self; } |
Instance Method Details
#destroy ⇒ Object
Proxies a call with IPC_RMID to sysvmq_stats to remove the queue.
152 153 154 155 156 157 158 |
# File 'ext/sysvmq.c', line 152 static VALUE sysvmq_destroy(VALUE self) { VALUE argv[1]; argv[0] = INT2FIX(IPC_RMID); return sysvmq_stats(1, argv, self); } |
#receive(*args) ⇒ Object
Receive a message from the message queue.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'ext/sysvmq.c', line 193 VALUE sysvmq_receive(int argc, VALUE *argv, VALUE self) { VALUE type = INT2FIX(0); VALUE flags = INT2FIX(0); sysvmq_t* sysv; sysvmq_blocking_call_t blocking; if (argc > 2) { rb_raise(rb_eArgError, "Wrong number of arguments (0..2)"); } if (argc >= 1) type = argv[0]; if (argc == 2) flags = argv[1]; TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); Check_Type(type, T_FIXNUM); Check_Type(flags, T_FIXNUM); // Attach blocking call parameters to the struct passed to the blocking // function wrapper. blocking.flags = FIX2INT(flags); blocking.type = FIX2LONG(type); blocking.sysv = sysv; // Initialize error so it's never a garbage value, if // `sysvmq_maybe_blocking_receive` was interrupted at a non-nice time. blocking.error = UNINITIALIZED_ERROR; blocking.length = UNINITIALIZED_ERROR; if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) { while(sysvmq_maybe_blocking_receive(&blocking) == NULL && blocking.error < 0) { if (errno == EINTR) { continue; } rb_sys_fail("Failed recieving message from queue"); } } else { // msgrcv(2) can block sending a message, if IPC_NOWAIT is not passed. // We unlock the GVL waiting for the call so other threads (e.g. signal // handling) can continue to work. Sets `length` on `blocking` with the size // of the message returned. while (WITHOUT_GVL(sysvmq_maybe_blocking_receive, &blocking, RUBY_UBF_IO, NULL) == NULL && blocking.error < 0) { if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) { continue; } rb_sys_fail("Failed receiving message from queue"); } } // Guard it.. assert(blocking.length != UNINITIALIZED_ERROR); // Reencode with default external encoding return rb_str_new(sysv->msgbuf->mtext, blocking.length); } |
#send(*args) ⇒ Object
Sends a message to the message queue.
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'ext/sysvmq.c', line 268 VALUE sysvmq_send(int argc, VALUE *argv, VALUE self) { VALUE ; VALUE priority = INT2FIX(1); VALUE flags = INT2FIX(0); sysvmq_blocking_call_t blocking; sysvmq_t* sysv; if (argc > 3 || argc == 0) { rb_raise(rb_eArgError, "Wrong number of arguments (1..3)"); } = argv[0]; if (argc >= 2) priority = argv[1]; if (argc == 3) flags = argv[2]; = rb_funcall(, rb_intern("to_s"), 0); TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); Check_Type(flags, T_FIXNUM); Check_Type(priority, T_FIXNUM); // TODO: Call to_s on if it responds to // Attach blocking call parameters to the struct passed to the blocking // function wrapper. blocking.flags = FIX2INT(flags); blocking.size = RSTRING_LEN(); blocking.sysv = sysv; // See msgrcv(2) wrapper blocking.error = UNINITIALIZED_ERROR; blocking.length = UNINITIALIZED_ERROR; // The buffer can be obtained from `sysvmq_maybe_blocking_send`, instead of // passing it, set it directly on the instance struct. sysv->msgbuf->mtype = FIX2INT(priority); if (blocking.size > sysv->buffer_size) { rb_raise(rb_eArgError, "Size of message is bigger than buffer size."); } // TODO: Can a string copy be avoided? memcpy(sysv->msgbuf->mtext, RSTRING_PTR(), blocking.size); // Non-blocking call, skip the expensive GVL release/acquire if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) { while(sysvmq_maybe_blocking_send(&blocking) == NULL && blocking.error < 0) { if (errno == EINTR) { continue; } rb_sys_fail("Failed sending message to queue"); } } else { // msgsnd(2) can block waiting for a , if IPC_NOWAIT is not passed. // We unlock the GVL waiting for the call so other threads (e.g. signal // handling) can continue to work. while (WITHOUT_GVL(sysvmq_maybe_blocking_send, &blocking, RUBY_UBF_IO, NULL) == NULL && blocking.error < 0) { if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) { continue; } rb_sys_fail("Failed sending message to queue"); } } return ; } |
#stats(*args) ⇒ Object
TODO: IPC_SET is currently not supported.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'ext/sysvmq.c', line 103 static VALUE sysvmq_stats(int argc, VALUE *argv, VALUE self) { struct msqid_ds info; VALUE info_hash; VALUE cmd; sysvmq_t* sysv; // Optional argument handling if (argc > 1) { rb_raise(rb_eArgError, "Wrong number of arguments (0..1)"); } // Default to IPC_STAT cmd = argc == 1 ? argv[0] : INT2FIX(IPC_STAT); TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv); // TODO: Does FIX2INT actually perform this check already? Check_Type(cmd, T_FIXNUM); while (msgctl(sysv->id, FIX2INT(cmd), &info) < 0) { if (errno == EINTR) { rb_thread_wait_for(polling_interval); continue; } rb_sys_fail("Failed executing msgctl(2) command."); } // Map values from struct to a hash // TODO: Add all the fields // TODO: They are probably not ints.. info_hash = rb_hash_new(); rb_hash_aset(info_hash, ID2SYM(rb_intern("count")), INT2FIX(info.msg_qnum)); rb_hash_aset(info_hash, ID2SYM(rb_intern("maximum_size")), INT2FIX(info.msg_qbytes)); // TODO: Can probably make a better checker here for whether the struct // actually has the member. // TODO: BSD support? #ifdef __linux__ rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.__msg_cbytes)); #elif __APPLE__ rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.msg_cbytes)); #endif return info_hash; } |