Class: Iodine::PubSub::RedisEngine
- Defined in:
- ext/iodine/iodine_pubsub.c
Instance Method Summary collapse
-
#initialize(*args) ⇒ Object
constructor
Initializes a new RedisEngine for Pub/Sub.
-
#send(*args) ⇒ Object
Sends commands / messages to the underlying Redis Pub connection.
Methods inherited from Engine
#distribute, #publish, #subscribe, #unsubscribe
Constructor Details
#initialize(*args) ⇒ Object
Initializes a new RedisEngine for Pub/Sub.
use:
RedisEngine.new(address, port = 6379, ping_interval = 0)
Accepts:
- address
-
the Redis server’s address. Required.
- port
-
the Redis Server port. Default: 6379
- ping
-
the PING interval. Default: 0 (~5 minutes).
- auth
-
authentication password. Default: none.
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'ext/iodine/iodine_pubsub.c', line 377
static VALUE redis_engine_initialize(int argc, VALUE *argv, VALUE self) {
if (argc < 1 || argc > 4)
rb_raise(rb_eArgError,
"wrong number of arguments (given %d, expected 1..4).", argc);
VALUE address = argv[0];
VALUE port = argc >= 2 ? argv[1] : Qnil;
VALUE ping = argc >= 3 ? argv[2] : Qnil;
VALUE auth = argc >= 4 ? argv[3] : Qnil;
Check_Type(address, T_STRING);
if (port != Qnil) {
if (TYPE(port) == T_FIXNUM)
port = rb_fix2str(port, 10);
Check_Type(port, T_STRING);
}
if (ping != Qnil)
Check_Type(ping, T_FIXNUM);
if (auth != Qnil) {
Check_Type(auth, T_STRING);
}
size_t iping = FIX2LONG(ping);
if (iping > 255)
rb_raise(rb_eRangeError, "ping_interval too big (0..255)");
iodine_engine_s *engine;
Data_Get_Struct(self, iodine_engine_s, engine);
engine->handler = self;
engine->p =
redis_engine_create(.address = StringValueCStr(address),
.port =
(port == Qnil ? "6379" : StringValueCStr(port)),
.ping_interval = iping,
.auth = (auth == Qnil ? NULL : StringValueCStr(auth)),
.auth_len = (auth == Qnil ? 0 : RSTRING_LEN(auth)));
if (!engine->p)
rb_raise(rb_eRuntimeError, "unknown error, can't initialize RedisEngine.");
engine->dealloc = redis_engine_destroy;
return self;
}
|
Instance Method Details
#send(*args) ⇒ Object
Sends commands / messages to the underlying Redis Pub connection.
The method accepts an optional callback block. i.e.:
redis.send("Echo", "Hello World!") do |reply|
p reply # => ["Hello World!"]
end
This connection is only for publishing and database commands. The Sub commands, such as SUBSCRIBE and PSUBSCRIBE, will break the engine.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'ext/iodine/iodine_pubsub.c', line 316
static VALUE redis_send(int argc, VALUE *argv, VALUE self) {
if (argc < 1)
rb_raise(rb_eArgError,
"wrong number of arguments (given %d, expected at least 1).",
argc);
resp_object_s *cmd = NULL;
Check_Type(argv[0], T_STRING);
iodine_engine_s *e;
Data_Get_Struct(self, iodine_engine_s, e);
cmd = resp_arr2obj(argc, NULL);
for (int i = 0; i < argc; i++) {
switch (TYPE(argv[i])) {
case T_SYMBOL:
argv[i] = rb_sym2str(argv[i]);
/* Fallthrough */
case T_STRING:
resp_obj2arr(cmd)->array[i] =
resp_str2obj(RSTRING_PTR(argv[i]), RSTRING_LEN(argv[i]));
break;
case T_FIXNUM:
resp_obj2arr(cmd)->array[i] = resp_num2obj(FIX2LONG(argv[i]));
break;
default:
goto error;
break;
}
}
if (rb_block_given_p()) {
VALUE block = rb_block_proc();
Registry.add(block);
redis_engine_send(e->p, cmd, redis_callback, (void *)block);
return block;
} else {
redis_engine_send(e->p, cmd, NULL, NULL);
}
return Qtrue;
error:
if (cmd)
resp_free_object(cmd);
rb_raise(rb_eArgError, "Arguments can only include Strings, Symbols and "
"Integers - no arrays or hashes or other objects can "
"be sent.");
return self;
}
|