Class: Iodine::PubSub::RedisEngine

Inherits:
Engine
  • Object
show all
Defined in:
ext/iodine/iodine_pubsub.c

Instance Method Summary collapse

Methods inherited from Engine

#deregister, #publish, #register, #reset, #subscribe, #unsubscribe

Constructor Details

#initialize(*args) ⇒ Object

Initializes a new RedisEngine for Pub/Sub.

use:

RedisEngine.new(address, port = 6379, ping_interval = 0)

Accepts:

address

the Redis server’s address. Required.

port

the Redis Server port. Default: 6379

ping

the PING interval up to 255 seconds. Default: 0 (~5 minutes).

auth

authentication password. Default: none.



484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'ext/iodine/iodine_pubsub.c', line 484

static VALUE redis_engine_initialize(int argc, VALUE *argv, VALUE self) {
  if (argc < 1 || argc > 4)
    rb_raise(rb_eArgError,
             "wrong number of arguments (given %d, expected 1..4).", argc);
  VALUE address = argv[0];
  VALUE port = argc >= 2 ? argv[1] : Qnil;
  VALUE ping = argc >= 3 ? argv[2] : Qnil;
  VALUE auth = argc >= 4 ? argv[3] : Qnil;
  Check_Type(address, T_STRING);
  if (port != Qnil) {
    if (TYPE(port) == T_FIXNUM)
      port = rb_fix2str(port, 10);
    Check_Type(port, T_STRING);
  }
  if (ping != Qnil)
    Check_Type(ping, T_FIXNUM);
  if (auth != Qnil) {
    Check_Type(auth, T_STRING);
  }
  size_t iping = FIX2LONG(ping);
  if (iping > 255)
    rb_raise(rb_eRangeError, "ping_interval too big (0..255)");

  iodine_engine_s *engine;
  Data_Get_Struct(self, iodine_engine_s, engine);
  engine->handler = self;
  engine->p =
      redis_engine_create(.address = StringValueCStr(address),
                          .port =
                              (port == Qnil ? "6379" : StringValueCStr(port)),
                          .ping_interval = iping,
                          .auth = (auth == Qnil ? NULL : StringValueCStr(auth)),
                          .auth_len = (auth == Qnil ? 0 : RSTRING_LEN(auth)));
  engine->dealloc = redis_engine_destroy;
  if (!engine->p)
    rb_raise(rb_eRuntimeError, "unknown error, can't initialize RedisEngine.");
  return self;
}

Instance Method Details

#send(*args) ⇒ Object

Sends commands / messages to the underlying Redis Pub connection.

The method accepts an optional callback block. i.e.:

redis.send("Echo", "Hello World!") do |reply|
   p reply # => ["Hello World!"]
end

This connection is only for publishing and database commands. The Sub commands, such as SUBSCRIBE and PSUBSCRIBE, will break the engine.



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'ext/iodine/iodine_pubsub.c', line 429

static VALUE redis_send(int argc, VALUE *argv, VALUE self) {
  if (argc < 1)
    rb_raise(rb_eArgError,
             "wrong number of arguments (given %d, expected at least 1).",
             argc);
  Check_Type(argv[0], T_STRING);
  FIOBJ data = FIOBJ_INVALID;
  FIOBJ cmd = FIOBJ_INVALID;
  if (argc > 1) {
    for (int i = 0; i < argc; ++i) {
      if (TYPE(argv[i]) == T_SYMBOL)
        argv[i] = rb_sym2str(argv[i]);
      if (TYPE(argv[i]) != T_FIXNUM)
        Check_Type(argv[i], T_STRING);
    }
    data = fiobj_ary_new();
    for (int i = 0; i < argc; ++i) {
      if (TYPE(argv[i]) == T_FIXNUM)
        fiobj_ary_push(data, fiobj_num_new(FIX2LONG(argv[i])));
      else
        fiobj_ary_push(
            data, fiobj_str_new(RSTRING_PTR(argv[i]), RSTRING_LEN(argv[i])));
    }
  }
  cmd = fiobj_str_new(RSTRING_PTR(argv[0]), RSTRING_LEN(argv[0]));
  iodine_engine_s *e;
  Data_Get_Struct(self, iodine_engine_s, e);

  if (rb_block_given_p()) {
    VALUE block = rb_block_proc();
    Registry.add(block);
    redis_engine_send(e->p, cmd, data, redis_callback, (void *)block);
    return block;
  } else {
    redis_engine_send(e->p, cmd, data, NULL, NULL);
  }
  fiobj_free(cmd);
  fiobj_free(data);
  return Qtrue;
}