Module: Iodine::PubSub
- Defined in:
- lib/iodine/pubsub.rb,
ext/iodine/iodine_pubsub.c
Overview
From –subscribe_pattern Wikipedia: publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead characterize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.
Iodine is equiped with an internal pub/sub service that allows improved resource management from a deployment perspective.
The common paradigm, which is implemented by pub/sub services like Redis, is for a “client” to “subscribe” to one or more “channels”. Messages are streamed to these “channels” by different “publishers” (the application / other clients) and are broadcasted to the “clients” through their “subscription”.
Iodine’s approach it to offload pub/sub resource costs from the pub/sub service (which is usually expensive to scale) onto the application layer.
For example, the default (‘nil`) pub/sub Engine implements an internal pub/sub service that manages subscriptions (clients and channels) throughout an Iodine process cluster without any need to connect to an external pub/sub service.
If Iodine was runninng with 8 processes and 16 threads per process, a publishing in process A will be delivered to subscribers in process B.
In addition, by inheriting the Engine class, it’s easy to create pub/sub engines that connect to this underlying pub/sub service. This means that Iodine will call the engine’s ‘subscribe` method only once per channel and once messages arrive, Iodine will distribute the messages to all the subscribed clients.
Defined Under Namespace
Classes: Engine, RedisEngine, Subscription
Constant Summary collapse
- CLUSTER =
This is the (currently) default pub/sub engine. It will distribute messages to all subscribers in the process cluster.
engine_in_c
- SINGLE_PROCESS =
This is a single process pub/sub engine. It will distribute messages to all subscribers sharing the same process.
engine_in_c
Class Method Summary collapse
-
.default_engine ⇒ Object
Returns the default Pub/Sub engine (if any).
-
.default_engine=(en) ⇒ Object
Sets the default Pub/Sub engine to be used.
-
.publish(*args) ⇒ Object
Publishes a message to a channel.
-
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub channel.
Instance Method Summary collapse
-
#deregister(engine) ⇒ Object
This method removes the engine from the pub/sub system.
-
#register(engine) ⇒ Object
This method adds the engine to the pub/sub system, allowing it to recieve system wide notifications.
-
#reset(engine) ⇒ Object
This method resets the engine, (re)sending all the current subscription data as if the #register method was just called.
Class Method Details
.default_engine ⇒ Object
Returns the default Pub/Sub engine (if any).
See Iodine::PubSub and Engine for more details.
557 558 559 560 |
# File 'ext/iodine/iodine_pubsub.c', line 557
static VALUE ips_get_default(VALUE self) {
return rb_ivar_get(Iodine, default_pubsubid);
(void)self;
}
|
.default_engine=(en) ⇒ Object
Sets the default Pub/Sub engine to be used.
See Iodine::PubSub and Engine for more details.
532 533 534 535 536 537 538 539 540 541 542 543 |
# File 'ext/iodine/iodine_pubsub.c', line 532
static VALUE ips_set_default(VALUE self, VALUE en) {
iodine_engine_s *e;
Data_Get_Struct(en, iodine_engine_s, e);
if (!e)
rb_raise(rb_eArgError, "deafult engine must be an Iodine::PubSub::Engine.");
if (!e->p)
rb_raise(rb_eArgError, "This Iodine::PubSub::Engine is broken.");
rb_ivar_set(Iodine, default_pubsubid, en);
PUBSUB_DEFAULT_ENGINE = e->p;
return en;
(void)self;
}
|
.publish(*args) ⇒ Object
Publishes a message to a channel.
Can be used using two Strings:
publish(to, )
The method accepts an optional ‘engine` argument:
publish(to, , my_pubsub_engine)
Alternatively, accepts the following named arguments:
- :to
-
The channel to publish to (required).
- :message
-
The message to be published (required).
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 |
# File 'ext/iodine/iodine_pubsub.c', line 796
VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
VALUE rb_ch, rb_msg, rb_engine = Qnil;
const pubsub_engine_s *engine = NULL;
switch (argc) {
case 3:
/* fallthrough */
rb_engine = argv[2];
case 2:
rb_ch = argv[0];
rb_msg = argv[1];
break;
case 1: {
/* single argument must be a Hash */
Check_Type(argv[0], T_HASH);
rb_ch = rb_hash_aref(argv[0], to_sym_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
rb_ch = rb_hash_aref(argv[0], channel_sym_id);
}
rb_msg = rb_hash_aref(argv[0], message_sym_id);
rb_engine = rb_hash_aref(argv[0], engine_varid);
} break;
default:
rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
}
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required.");
}
Check_Type(rb_msg, T_STRING);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "target / channel is required .");
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
if (rb_engine == Qfalse) {
engine = PUBSUB_PROCESS_ENGINE;
} else if (rb_engine == Qnil) {
engine = NULL;
} else {
engine = iodine_engine_ruby2facil(rb_engine);
}
FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));
intptr_t ret =
pubsub_publish(.engine = engine, .channel = ch, .message = msg);
fiobj_free(ch);
fiobj_free(msg);
if (!ret)
return Qfalse;
return Qtrue;
(void)self;
}
|
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub channel.
The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:
subscribe("my_stream") {|from, msg| p msg }
subscribe("my_stream", match: :redis) {|from, msg| p msg }
subscribe(to: "my_stream") {|from, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc
The first argument must be either a String or a Hash.
The second, optional, argument must be a Hash (if given).
The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):
- :match
-
The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.
- :to
-
The channel / subject to subscribe to.
Returns an Subscription object that answers to:
- close
-
closes the connection.
- to_s
-
returns the subscription’s target (stream / channel / subject).
(str) :: returns true if the string is an exact match for the target (even if the target itself is a pattern).
768 769 770 771 772 |
# File 'ext/iodine/iodine_pubsub.c', line 768
static VALUE iodine_subscribe_global(int argc, VALUE *argv, VALUE self) {
// clang-format on
return iodine_subscribe(argc, argv, NULL, IODINE_PUBSUB_GLOBAL);
(void)self;
}
|
Instance Method Details
#deregister(engine) ⇒ Object
This method removes the engine from the pub/sub system.
118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'ext/iodine/iodine_pubsub.c', line 118
static VALUE iodine_engine_deregister2(VALUE self, VALUE engine) {
iodine_engine_s *e;
Data_Get_Struct(engine, iodine_engine_s, e);
if (e->p) {
pubsub_engine_deregister(e->p);
Registry.remove(engine);
return Qtrue;
}
Registry.remove(engine);
return Qfalse;
(void)self;
(void)engine;
}
|
#register(engine) ⇒ Object
This method adds the engine to the pub/sub system, allowing it to recieve system wide notifications.
93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'ext/iodine/iodine_pubsub.c', line 93
static VALUE iodine_engine_register2(VALUE self, VALUE engine) {
iodine_engine_s *e;
Registry.add(engine);
Data_Get_Struct(engine, iodine_engine_s, e);
if (e->p) {
e->handler = engine;
pubsub_engine_register(e->p);
return Qtrue;
}
return Qfalse;
(void)self;
(void)engine;
}
|
#reset(engine) ⇒ Object
This method resets the engine, (re)sending all the current subscription data as if the #register method was just called.
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'ext/iodine/iodine_pubsub.c', line 143
static VALUE iodine_engine_reset2(VALUE self, VALUE engine) {
iodine_engine_s *e;
Data_Get_Struct(engine, iodine_engine_s, e);
if (e->p) {
e->handler = engine;
pubsub_engine_resubscribe(e->p);
return Qtrue;
}
return Qfalse;
(void)self;
(void)engine;
}
|