Class: Iodine::PubSub::RedisEngine

Inherits:
Engine
  • Object
show all
Defined in:
ext/iodine/iodine_pubsub.c

Instance Method Summary collapse

Methods inherited from Engine

#distribute, #publish, #subscribe, #unsubscribe

Constructor Details

#initialize(*args) ⇒ Object

Initializes a new RedisEngine for Pub/Sub.

use:

RedisEngine.new(address, port = 6379, ping_interval = 0)

Accepts:

address

the Redis server’s address. Required.

port

the Redis Server port. Default: 6379

ping

the PING interval. Default: 0 (~5 minutes).

auth

authentication password. Default: none.



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'ext/iodine/iodine_pubsub.c', line 377

static VALUE redis_engine_initialize(int argc, VALUE *argv, VALUE self) {
  if (argc < 1 || argc > 4)
    rb_raise(rb_eArgError,
             "wrong number of arguments (given %d, expected 1..4).", argc);
  VALUE address = argv[0];
  VALUE port = argc >= 2 ? argv[1] : Qnil;
  VALUE ping = argc >= 3 ? argv[2] : Qnil;
  VALUE auth = argc >= 4 ? argv[3] : Qnil;
  Check_Type(address, T_STRING);
  if (port != Qnil) {
    if (TYPE(port) == T_FIXNUM)
      port = rb_fix2str(port, 10);
    Check_Type(port, T_STRING);
  }
  if (ping != Qnil)
    Check_Type(ping, T_FIXNUM);
  if (auth != Qnil) {
    Check_Type(auth, T_STRING);
  }
  size_t iping = FIX2LONG(ping);
  if (iping > 255)
    rb_raise(rb_eRangeError, "ping_interval too big (0..255)");

  iodine_engine_s *engine;
  Data_Get_Struct(self, iodine_engine_s, engine);
  engine->handler = self;
  engine->p =
      redis_engine_create(.address = StringValueCStr(address),
                          .port =
                              (port == Qnil ? "6379" : StringValueCStr(port)),
                          .ping_interval = iping,
                          .auth = (auth == Qnil ? NULL : StringValueCStr(auth)),
                          .auth_len = (auth == Qnil ? 0 : RSTRING_LEN(auth)));
  if (!engine->p)
    rb_raise(rb_eRuntimeError, "unknown error, can't initialize RedisEngine.");
  engine->dealloc = redis_engine_destroy;
  return self;
}

Instance Method Details

#send(*args) ⇒ Object

Sends commands / messages to the underlying Redis Pub connection.

The method accepts an optional callback block. i.e.:

redis.send("Echo", "Hello World!") do |reply|
   p reply # => ["Hello World!"]
end

This connection is only for publishing and database commands. The Sub commands, such as SUBSCRIBE and PSUBSCRIBE, will break the engine.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'ext/iodine/iodine_pubsub.c', line 316

static VALUE redis_send(int argc, VALUE *argv, VALUE self) {
  if (argc < 1)
    rb_raise(rb_eArgError,
             "wrong number of arguments (given %d, expected at least 1).",
             argc);
  resp_object_s *cmd = NULL;
  Check_Type(argv[0], T_STRING);

  iodine_engine_s *e;
  Data_Get_Struct(self, iodine_engine_s, e);
  cmd = resp_arr2obj(argc, NULL);
  for (int i = 0; i < argc; i++) {
    switch (TYPE(argv[i])) {
    case T_SYMBOL:
      argv[i] = rb_sym2str(argv[i]);
    /* Fallthrough */
    case T_STRING:
      resp_obj2arr(cmd)->array[i] =
          resp_str2obj(RSTRING_PTR(argv[i]), RSTRING_LEN(argv[i]));
      break;
    case T_FIXNUM:
      resp_obj2arr(cmd)->array[i] = resp_num2obj(FIX2LONG(argv[i]));
      break;
    default:
      goto error;
      break;
    }
  }

  if (rb_block_given_p()) {
    VALUE block = rb_block_proc();
    Registry.add(block);
    redis_engine_send(e->p, cmd, redis_callback, (void *)block);
    return block;
  } else {
    redis_engine_send(e->p, cmd, NULL, NULL);
  }
  return Qtrue;
error:
  if (cmd)
    resp_free_object(cmd);
  rb_raise(rb_eArgError, "Arguments can only include Strings, Symbols and "
                         "Integers - no arrays or hashes or other objects can "
                         "be sent.");
  return self;
}