Class: Iodine::PubSub::RedisEngine
- Defined in:
- ext/iodine/iodine_pubsub.c
Instance Method Summary collapse
-
#initialize(*args) ⇒ Object
constructor
Initializes a new RedisEngine for Pub/Sub.
-
#send(*args) ⇒ Object
Sends commands / messages to the underlying Redis Pub connection.
Methods inherited from Engine
#deregister, #publish, #register, #reset, #subscribe, #unsubscribe
Constructor Details
#initialize(*args) ⇒ Object
Initializes a new RedisEngine for Pub/Sub.
use:
RedisEngine.new(address, port = 6379, ping_interval = 0)
Accepts:
- address
-
the Redis server’s address. Required.
- port
-
the Redis Server port. Default: 6379
- ping
-
the PING interval up to 255 seconds. Default: 0 (~5 minutes).
- auth
-
authentication password. Default: none.
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 |
# File 'ext/iodine/iodine_pubsub.c', line 484
static VALUE redis_engine_initialize(int argc, VALUE *argv, VALUE self) {
if (argc < 1 || argc > 4)
rb_raise(rb_eArgError,
"wrong number of arguments (given %d, expected 1..4).", argc);
VALUE address = argv[0];
VALUE port = argc >= 2 ? argv[1] : Qnil;
VALUE ping = argc >= 3 ? argv[2] : Qnil;
VALUE auth = argc >= 4 ? argv[3] : Qnil;
Check_Type(address, T_STRING);
if (port != Qnil) {
if (TYPE(port) == T_FIXNUM)
port = rb_fix2str(port, 10);
Check_Type(port, T_STRING);
}
if (ping != Qnil)
Check_Type(ping, T_FIXNUM);
if (auth != Qnil) {
Check_Type(auth, T_STRING);
}
size_t iping = FIX2LONG(ping);
if (iping > 255)
rb_raise(rb_eRangeError, "ping_interval too big (0..255)");
iodine_engine_s *engine;
Data_Get_Struct(self, iodine_engine_s, engine);
engine->handler = self;
engine->p =
redis_engine_create(.address = StringValueCStr(address),
.port =
(port == Qnil ? "6379" : StringValueCStr(port)),
.ping_interval = iping,
.auth = (auth == Qnil ? NULL : StringValueCStr(auth)),
.auth_len = (auth == Qnil ? 0 : RSTRING_LEN(auth)));
engine->dealloc = redis_engine_destroy;
if (!engine->p)
rb_raise(rb_eRuntimeError, "unknown error, can't initialize RedisEngine.");
return self;
}
|
Instance Method Details
#send(*args) ⇒ Object
Sends commands / messages to the underlying Redis Pub connection.
The method accepts an optional callback block. i.e.:
redis.send("Echo", "Hello World!") do |reply|
p reply # => ["Hello World!"]
end
This connection is only for publishing and database commands. The Sub commands, such as SUBSCRIBE and PSUBSCRIBE, will break the engine.
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'ext/iodine/iodine_pubsub.c', line 429
static VALUE redis_send(int argc, VALUE *argv, VALUE self) {
if (argc < 1)
rb_raise(rb_eArgError,
"wrong number of arguments (given %d, expected at least 1).",
argc);
Check_Type(argv[0], T_STRING);
FIOBJ data = FIOBJ_INVALID;
FIOBJ cmd = FIOBJ_INVALID;
if (argc > 1) {
for (int i = 0; i < argc; ++i) {
if (TYPE(argv[i]) == T_SYMBOL)
argv[i] = rb_sym2str(argv[i]);
if (TYPE(argv[i]) != T_FIXNUM)
Check_Type(argv[i], T_STRING);
}
data = fiobj_ary_new();
for (int i = 0; i < argc; ++i) {
if (TYPE(argv[i]) == T_FIXNUM)
fiobj_ary_push(data, fiobj_num_new(FIX2LONG(argv[i])));
else
fiobj_ary_push(
data, fiobj_str_new(RSTRING_PTR(argv[i]), RSTRING_LEN(argv[i])));
}
}
cmd = fiobj_str_new(RSTRING_PTR(argv[0]), RSTRING_LEN(argv[0]));
iodine_engine_s *e;
Data_Get_Struct(self, iodine_engine_s, e);
if (rb_block_given_p()) {
VALUE block = rb_block_proc();
Registry.add(block);
redis_engine_send(e->p, cmd, data, redis_callback, (void *)block);
return block;
} else {
redis_engine_send(e->p, cmd, data, NULL, NULL);
}
fiobj_free(cmd);
fiobj_free(data);
return Qtrue;
}
|