Class: Iodine::PubSub::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/iodine/pubsub.rb,
ext/iodine/iodine_pubsub.c

Overview

The Engine class makes it easy to use leverage Iodine’s pub/sub system using external services.

Iodine comes with two built-in engines:

  • ‘Iodine::PubSub::Engine::CLUSTER` will distribute messages to all subscribers in the process cluster.

  • ‘Iodine::PubSub::Engine::PROCESS` will distribute messages to all subscribers sharing the same process.

It’s recommended that Engine instances be initialized only after Iodine started running (or the ‘fork`ing of the engine’s connection will introduce communication issues).

For this reason, the best approcah to initialization would be:

class MyEngineClass < Iodine::PubSub::Engine
     # ...
end

Iodine.run do
   MyEngine = MyEngineClass.new
end

Engine child classes MUST override the #subscribe, #unsubscribe and #publish in order to perform this actions using the backend service (i.e. using Redis).

Once an Engine instance receives a message from the backend service, it should forward the message to the Iodine distribution layer using the Iodine.publish method, setting the 3rd argument to ‘false`.

Iodine will than distribute the message to all registered clients in that specific process (if the engine is cluster wide, set the 3rd argument to CLUSTER.

Direct Known Subclasses

Redis

Instance Method Summary collapse

Instance Method Details

#publish(to, message) ⇒ Object

OVERRIDE this callback - it will be called by Iodine whenever the Iodine.publish (or Connection#publish) is called for this engine.

If this Iodine::PubSub::Engine is set as the default Iodine::PubSub::Engine, then any call to Iodine.publish (or Connection#publish will invoke this callback (unless another Iodine::PubSub::Engine was specified).

NOTE: this callback is called per process event (not per cluster event) and the Iodine::PubSub::Engine is responsible for message propagation.



182
183
184
185
186
187
188
189
190
191
# File 'ext/iodine/iodine_pubsub.c', line 182

static VALUE iodine_pubsub_publish(VALUE self, VALUE to, VALUE message) {
  iodine_pubsub_s *e = iodine_pubsub_CData(self);
  if (e->engine == &e->do_not_touch) {
    /* this is a Ruby engine, nothing to do. */
    return Qnil;
  }
  e->engine->publish(e->engine, IODINE_RSTRINFO(to), IODINE_RSTRINFO(message),
                     0);
  return self;
}

#subscribe(to, match) ⇒ Object

OVERRIDE this callback - it will be called by Iodine whenever the process CLUSTER (not just this process) subscribes to a stream / channel.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'ext/iodine/iodine_pubsub.c', line 131

static VALUE iodine_pubsub_subscribe(VALUE self, VALUE to, VALUE match) {
  return Qnil;
#if 0
  iodine_pubsub_s *e = iodine_pubsub_CData(self);
  if (e->engine == &e->do_not_touch) {
    /* this is a Ruby engine, nothing to do. */
    return Qnil;
  }
  FIOBJ ch = fiobj_str_new(RSTRING_PTR(to), RSTRING_LEN(to));
  e->engine->subscribe(e->engine, ch, SYM2ID(match) == redis_id);
  fiobj_free(ch);
  return to;
#endif
  (void)self;
  (void)to;
  (void)match;
}

#unsubscribe(to, match) ⇒ Object

OVERRIDE this callback - it will be called by Iodine whenever the whole process CLUSTER (not just this process) unsubscribes from a stream / channel.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'ext/iodine/iodine_pubsub.c', line 153

static VALUE iodine_pubsub_unsubscribe(VALUE self, VALUE to, VALUE match) {
  return Qnil;
#if 0
  iodine_pubsub_s *e = iodine_pubsub_CData(self);
  if (e->engine == &e->do_not_touch) {
    /* this is a Ruby engine, nothing to do. */
    return Qnil;
  }
  FIOBJ ch = fiobj_str_new(RSTRING_PTR(to), RSTRING_LEN(to));
  e->engine->unsubscribe(e->engine, ch, SYM2ID(match) == redis_id);
  fiobj_free(ch);
  return to;
#endif
  (void)self;
  (void)to;
  (void)match;
}