Module: Iodine::PubSub

Defined in:
lib/iodine/pubsub.rb,
ext/iodine/iodine_pubsub.c

Overview

Note:

From –subscribe_pattern Wikipedia: publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead characterize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.

Iodine is equiped with an internal pub/sub service that allows improved resource management from a deployment perspective.

The common paradigm, which is implemented by pub/sub services like Redis, is for a “client” to “subscribe” to one or more “channels”. Messages are streamed to these “channels” by different “publishers” (the application / other clients) and are broadcasted to the “clients” through their “subscription”.

Iodine’s approach it to offload pub/sub resource costs from the pub/sub service (which is usually expensive to scale) onto the application layer.

For example, the default (‘nil`) pub/sub Engine implements an internal pub/sub service that manages subscriptions (clients and channels) throughout an Iodine process cluster without any need to connect to an external pub/sub service.

If Iodine was runninng with 8 processes and 16 threads per process, a publishing in process A will be delivered to subscribers in process B.

In addition, by inheriting the Engine class, it’s easy to create pub/sub engines that connect to this underlying pub/sub service. This means that Iodine will call the engine’s ‘subscribe` method only once per channel and once messages arrive, Iodine will distribute the messages to all the subscribed clients.

Defined Under Namespace

Classes: Engine, Redis

Constant Summary collapse

CLUSTER =

CLUSTER publishes data to all the subscribers in the process cluster.

iodine_pubsub_make_C_engine(PUBSUB_CLUSTER_ENGINE)
PROCESS =

PROCESS publishes data to all the subscribers in a single process.

iodine_pubsub_make_C_engine(PUBSUB_PROCESS_ENGINE)

Class Method Summary collapse

Class Method Details

.attach(engine) ⇒ Object

Attaches an Engine to the pub/sub system (more than a single engine can be attached at the same time).

After an engine was attached, it’s callbacks will be called (Iodine::PubSub::Engine#subscribe and Iodine::PubSub::Engine#unsubscribe) in response to Pub/Sub events.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'ext/iodine/iodine_pubsub.c', line 316

static VALUE iodine_pubsub_attach(VALUE self, VALUE engine) {
  iodine_pubsub_s *e = iodine_pubsub_CData(engine);
  if (!e) {
    rb_raise(rb_eTypeError, "not a valid engine");
    return Qnil;
  }
  if (e->handler == Qnil) {
    e->handler = engine;
  }
  IodineStore.add(engine);
  pubsub_engine_register(e->engine);
  return engine;
  (void)self;
}

.defaultObject

Returns the default Engine for pub/sub methods.



299
300
301
302
303
304
305
306
# File 'ext/iodine/iodine_pubsub.c', line 299

static VALUE iodine_pubsub_default_get(VALUE self) {
  VALUE def = rb_ivar_get(self, rb_intern2("default_engine", 14));
  if (def == Qnil) {
    def = rb_const_get(self, rb_intern2("CLUSTER", 7));
    iodine_pubsub_default_set(self, def);
  }
  return def;
}

.default=(engine) ⇒ Object

Sets the default Engine for pub/sub methods.



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'ext/iodine/iodine_pubsub.c', line 281

static VALUE iodine_pubsub_default_set(VALUE self, VALUE engine) {
  if (engine == Qnil) {
    engine = rb_const_get(self, rb_intern2("CLUSTER", 7));
  }
  iodine_pubsub_s *e = iodine_pubsub_CData(engine);
  if (!e) {
    rb_raise(rb_eTypeError, "not a valid engine");
    return Qnil;
  }
  if (e->handler == Qnil) {
    e->handler = engine;
  }
  PUBSUB_DEFAULT_ENGINE = e->engine;
  rb_ivar_set(self, rb_intern2("default_engine", 14), engine);
  return engine;
}

.dettach(engine) ⇒ Object

Removes an Engine from the pub/sub system.

After an Engine was detached, Iodine will no longer call the Engine‘s callbacks (Iodine::PubSub::Engine#subscribe and Iodine::PubSub::Engine#unsubscribe)



338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'ext/iodine/iodine_pubsub.c', line 338

static VALUE iodine_pubsub_dettach(VALUE self, VALUE engine) {
  iodine_pubsub_s *e = iodine_pubsub_CData(engine);
  if (!e) {
    rb_raise(rb_eTypeError, "not a valid engine");
    return Qnil;
  }
  if (e->handler == Qnil) {
    e->handler = engine;
  }
  IodineStore.remove(engine);
  pubsub_engine_deregister(e->engine);
  return engine;
  (void)self;
}

.reset(engine) ⇒ Object

Forces Iodine to call the Iodine::PubSub::Engine#subscribe callback for all existing subscriptions (i.e., when reconnecting to a Pub/Sub backend such as Redis).



358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'ext/iodine/iodine_pubsub.c', line 358

static VALUE iodine_pubsub_reset(VALUE self, VALUE engine) {
  iodine_pubsub_s *e = iodine_pubsub_CData(engine);
  if (!e) {
    rb_raise(rb_eTypeError, "not a valid engine");
    return Qnil;
  }
  if (e->handler == Qnil) {
    e->handler = engine;
  }
  pubsub_engine_resubscribe(e->engine);
  return engine;
  (void)self;
}