Class: Iodine::PubSub::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/iodine/pubsub.rb,
ext/iodine/iodine_pubsub.c

Overview

The Engine class makes it easy to use leverage Iodine’s pub/sub system using external services.

Iodine comes with two built-in engines:

  • ‘Iodine::PubSub::Engine::CLUSTER` will distribute messages to all subscribers in the process cluster.

  • ‘Iodine::PubSub::Engine::PROCESS` will distribute messages to all subscribers sharing the same process.

It’s recommended that Engine instances be initialized only after Iodine started running (or the ‘fork`ing of the engine’s connection will introduce communication issues).

For this reason, the best approcah to initialization would be:

class MyEngineClass < Iodine::PubSub::Engine
     # ...
end

Iodine.run do
   MyEngine = MyEngineClass.new
end

Engine child classes MUST override the #subscribe, #unsubscribe and #publish in order to perform this actions using the backend service (i.e. using Redis).

Once an Engine instance receives a message from the backend service, it should forward the message to the Iodine distribution layer using the Iodine.publish method, setting the 3rd argument to ‘false`.

Iodine will than distribute the message to all registered clients in that specific process (if the engine is cluster wide, set the 3rd argument to CLUSTER.

Direct Known Subclasses

Redis

Instance Method Summary collapse

Instance Method Details

#publish(to, message) ⇒ Object

OVERRIDE this callback - it will be called by Iodine whenever the Iodine.publish (or Connection#publish) is called for this engine.

If this Iodine::PubSub::Engine is set as the default Iodine::PubSub::Engine, then any call to Iodine.publish (or Connection#publish will invoke this callback (unless another Iodine::PubSub::Engine was specified).

NOTE: this callback is called per process event (not per cluster event) and the Iodine::PubSub::Engine is responsible for message propagation.



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'ext/iodine/iodine_pubsub.c', line 191

static VALUE iodine_pubsub_publish(VALUE self, VALUE to, VALUE message) {
  iodine_pubsub_s *e = iodine_pubsub_CData(self);
  if (e->engine == &e->do_not_touch) {
    /* this is a Ruby engine, nothing to do. */
    return Qnil;
  }
  FIOBJ ch, msg;
  ch = fiobj_str_new(RSTRING_PTR(to), RSTRING_LEN(to));
  msg = fiobj_str_new(RSTRING_PTR(message), RSTRING_LEN(message));
  e->engine->publish(e->engine, ch, msg);
  fiobj_free(ch);
  fiobj_free(msg);
  return self;
  (void)self;
  (void)to;
  (void)message;
}

#subscribe(to, match) ⇒ Object

OVERRIDE this callback - it will be called by Iodine whenever the process CLUSTER (not just this process) subscribes to a stream / channel.



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'ext/iodine/iodine_pubsub.c', line 140

static VALUE iodine_pubsub_subscribe(VALUE self, VALUE to, VALUE match) {
  return Qnil;
#if 0
  iodine_pubsub_s *e = iodine_pubsub_CData(self);
  if (e->engine == &e->do_not_touch) {
    /* this is a Ruby engine, nothing to do. */
    return Qnil;
  }
  FIOBJ ch = fiobj_str_new(RSTRING_PTR(to), RSTRING_LEN(to));
  e->engine->subscribe(e->engine, ch, SYM2ID(match) == redis_id);
  fiobj_free(ch);
  return to;
#endif
  (void)self;
  (void)to;
  (void)match;
}

#unsubscribe(to, match) ⇒ Object

OVERRIDE this callback - it will be called by Iodine whenever the whole process CLUSTER (not just this process) unsubscribes from a stream / channel.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'ext/iodine/iodine_pubsub.c', line 162

static VALUE iodine_pubsub_unsubscribe(VALUE self, VALUE to, VALUE match) {
  return Qnil;
#if 0
  iodine_pubsub_s *e = iodine_pubsub_CData(self);
  if (e->engine == &e->do_not_touch) {
    /* this is a Ruby engine, nothing to do. */
    return Qnil;
  }
  FIOBJ ch = fiobj_str_new(RSTRING_PTR(to), RSTRING_LEN(to));
  e->engine->unsubscribe(e->engine, ch, SYM2ID(match) == redis_id);
  fiobj_free(ch);
  return to;
#endif
  (void)self;
  (void)to;
  (void)match;
}