Module: Iodine::PubSub

Defined in:
lib/iodine/pubsub.rb,
ext/iodine/iodine_pubsub.c

Overview

Note:

From –subscribe_pattern Wikipedia: publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead characterize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.

Iodine is equiped with an internal pub/sub service that allows improved resource management from a deployment perspective.

The common paradigm, which is implemented by pub/sub services like Redis, is for a “client” to “subscribe” to one or more “channels”. Messages are streamed to these “channels” by different “publishers” (the application / other clients) and are broadcasted to the “clients” through their “subscription”.

Iodine’s approach it to offload pub/sub resource costs from the pub/sub service (which is usually expensive to scale) onto the application layer.

For example, the default (‘nil`) pub/sub Engine implements an internal pub/sub service that manages subscriptions (clients and channels) throughout an Iodine process cluster without any need to connect to an external pub/sub service.

If Iodine was runninng with 8 processes and 16 threads per process, a publishing in process A will be delivered to subscribers in process B.

In addition, by inheriting the Engine class, it’s easy to create pub/sub engines that connect to this underlying pub/sub service. This means that Iodine will call the engine’s ‘subscribe` method only once per channel and once messages arrive, Iodine will distribute the messages to all the subscribed clients.

Defined Under Namespace

Classes: Engine, RedisEngine, Subscription

Constant Summary collapse

CLUSTER =

This is the (currently) default pub/sub engine. It will distribute messages to all subscribers in the process cluster.

engine_in_c
SINGLE_PROCESS =

This is a single process pub/sub engine. It will distribute messages to all subscribers sharing the same process.

engine_in_c

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.default_engineObject

Returns the default Pub/Sub engine (if any).

See Iodine::PubSub and Engine for more details.



557
558
559
560
# File 'ext/iodine/iodine_pubsub.c', line 557

static VALUE ips_get_default(VALUE self) {
  return rb_ivar_get(Iodine, default_pubsubid);
  (void)self;
}

.default_engine=(en) ⇒ Object

Sets the default Pub/Sub engine to be used.

See Iodine::PubSub and Engine for more details.



532
533
534
535
536
537
538
539
540
541
542
543
# File 'ext/iodine/iodine_pubsub.c', line 532

static VALUE ips_set_default(VALUE self, VALUE en) {
  iodine_engine_s *e;
  Data_Get_Struct(en, iodine_engine_s, e);
  if (!e)
    rb_raise(rb_eArgError, "deafult engine must be an Iodine::PubSub::Engine.");
  if (!e->p)
    rb_raise(rb_eArgError, "This Iodine::PubSub::Engine is broken.");
  rb_ivar_set(Iodine, default_pubsubid, en);
  PUBSUB_DEFAULT_ENGINE = e->p;
  return en;
  (void)self;
}

.publish(*args) ⇒ Object

Publishes a message to a channel.

Can be used using two Strings:

publish(to, message)

The method accepts an optional ‘engine` argument:

publish(to, message, my_pubsub_engine)

Alternatively, accepts the following named arguments:

:to

The channel to publish to (required).

:message

The message to be published (required).

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.



796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
# File 'ext/iodine/iodine_pubsub.c', line 796

VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
  VALUE rb_ch, rb_msg, rb_engine = Qnil;
  const pubsub_engine_s *engine = NULL;
  switch (argc) {
  case 3:
    /* fallthrough */
    rb_engine = argv[2];
  case 2:
    rb_ch = argv[0];
    rb_msg = argv[1];
    break;
  case 1: {
    /* single argument must be a Hash */
    Check_Type(argv[0], T_HASH);
    rb_ch = rb_hash_aref(argv[0], to_sym_id);
    if (rb_ch == Qnil || rb_ch == Qfalse) {
      rb_ch = rb_hash_aref(argv[0], channel_sym_id);
    }
    rb_msg = rb_hash_aref(argv[0], message_sym_id);
    rb_engine = rb_hash_aref(argv[0], engine_varid);
  } break;
  default:
    rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
  }

  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required.");
  }
  Check_Type(rb_msg, T_STRING);

  if (rb_ch == Qnil || rb_ch == Qfalse)
    rb_raise(rb_eArgError, "target / channel is required .");
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  if (rb_engine == Qfalse) {
    engine = PUBSUB_PROCESS_ENGINE;
  } else if (rb_engine == Qnil) {
    engine = NULL;
  } else {
    engine = iodine_engine_ruby2facil(rb_engine);
  }

  FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
  FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));

  intptr_t ret =
      pubsub_publish(.engine = engine, .channel = ch, .message = msg);
  fiobj_free(ch);
  fiobj_free(msg);
  if (!ret)
    return Qfalse;
  return Qtrue;
  (void)self;
}

.subscribe(*args) ⇒ Object

Subscribes to a Pub/Sub channel.

The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:

subscribe("my_stream") {|from, msg| p msg }
subscribe("my_stream", match: :redis) {|from, msg| p msg }
subscribe(to: "my_stream")  {|from, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc

The first argument must be either a String or a Hash.

The second, optional, argument must be a Hash (if given).

The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):

:match

The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.

:to

The channel / subject to subscribe to.

Returns an Subscription object that answers to:

close

closes the connection.

to_s

returns the subscription’s target (stream / channel / subject).

(str) :: returns true if the string is an exact match for the target (even if the target itself is a pattern).



768
769
770
771
772
# File 'ext/iodine/iodine_pubsub.c', line 768

static VALUE iodine_subscribe_global(int argc, VALUE *argv, VALUE self) {
  // clang-format on
  return iodine_subscribe(argc, argv, NULL, IODINE_PUBSUB_GLOBAL);
  (void)self;
}

Instance Method Details

#deregister(engine) ⇒ Object

This method removes the engine from the pub/sub system.



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'ext/iodine/iodine_pubsub.c', line 118

static VALUE iodine_engine_deregister2(VALUE self, VALUE engine) {
  iodine_engine_s *e;
  Data_Get_Struct(engine, iodine_engine_s, e);
  if (e->p) {
    pubsub_engine_deregister(e->p);
    Registry.remove(engine);
    return Qtrue;
  }
  Registry.remove(engine);
  return Qfalse;
  (void)self;
  (void)engine;
}

#register(engine) ⇒ Object

This method adds the engine to the pub/sub system, allowing it to recieve system wide notifications.



93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'ext/iodine/iodine_pubsub.c', line 93

static VALUE iodine_engine_register2(VALUE self, VALUE engine) {
  iodine_engine_s *e;
  Registry.add(engine);
  Data_Get_Struct(engine, iodine_engine_s, e);
  if (e->p) {
    e->handler = engine;
    pubsub_engine_register(e->p);
    return Qtrue;
  }
  return Qfalse;
  (void)self;
  (void)engine;
}

#reset(engine) ⇒ Object

This method resets the engine, (re)sending all the current subscription data as if the #register method was just called.



143
144
145
146
147
148
149
150
151
152
153
154
# File 'ext/iodine/iodine_pubsub.c', line 143

static VALUE iodine_engine_reset2(VALUE self, VALUE engine) {
  iodine_engine_s *e;
  Data_Get_Struct(engine, iodine_engine_s, e);
  if (e->p) {
    e->handler = engine;
    pubsub_engine_resubscribe(e->p);
    return Qtrue;
  }
  return Qfalse;
  (void)self;
  (void)engine;
}