Module: Iodine::Websocket
- Defined in:
- lib/iodine/websocket.rb,
ext/iodine/iodine_websockets.c
Overview
This module lists the available API for WebSocket and EventSource (SSE) connections.
This module is mixed in (using ‘extend` and `include`) with the WebSocket Callback Object (as specified by the proposed Rack specification.
The server performs ‘extend` to allow the application to be namespace agnostic (so the server can be replaced without effecting the application).
The websocket API is divided into three main groups:
-
Client <=> Server <=> Pub/Sub relations (#subscribe, #publish.
-
Task scheduling (defer, #defer, each).
Notice that Websocket callback objects (as specified by the proposed Rack specification MUST provide an ‘on_message(data)` callback.
Class Method Summary collapse
-
.publish(*args) ⇒ Object
Publishes a message to a channel.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the websocket connection.
-
#defer(*args) ⇒ Object
Schedules a block of code to execute at a later time, if the connection is still open and while preventing concurent code from running for the same connection object.
-
#on_close ⇒ Object
Please implement your own callback for this event.
-
#on_drained ⇒ Object
Please implement your own callback for this event.
-
#on_open ⇒ Object
Please implement your own callback for this event.
-
#on_ready ⇒ Object
DEPRECATED! Please override #on_drained instead.
-
#on_shutdown ⇒ Object
Please implement your own callback for this event.
-
#open? ⇒ Boolean
Returns true is the connection is open, false if it isn’t.
-
#pending ⇒ Object
Returns the number of pending writes or -1 if the connection is closed.
-
#publish(*args) ⇒ Object
Publishes a message to a channel.
-
#subscribe(*args) ⇒ Object
Subscribes the connection to a Pub/Sub channel.
-
#write(data) ⇒ Object
Writes data to the websocket.
Class Method Details
.publish(*args) ⇒ Object
Publishes a message to a channel.
Can be used using two Strings:
publish(to, )
The method accepts an optional ‘engine` argument:
publish(to, , my_pubsub_engine)
Alternatively, accepts the following named arguments:
- :to
-
The channel to publish to (required).
- :message
-
The message to be published (required).
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 |
# File 'ext/iodine/iodine_pubsub.c', line 796
VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
VALUE rb_ch, rb_msg, rb_engine = Qnil;
const pubsub_engine_s *engine = NULL;
switch (argc) {
case 3:
/* fallthrough */
rb_engine = argv[2];
case 2:
rb_ch = argv[0];
rb_msg = argv[1];
break;
case 1: {
/* single argument must be a Hash */
Check_Type(argv[0], T_HASH);
rb_ch = rb_hash_aref(argv[0], to_sym_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
rb_ch = rb_hash_aref(argv[0], channel_sym_id);
}
rb_msg = rb_hash_aref(argv[0], message_sym_id);
rb_engine = rb_hash_aref(argv[0], engine_varid);
} break;
default:
rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
}
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required.");
}
Check_Type(rb_msg, T_STRING);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "target / channel is required .");
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
if (rb_engine == Qfalse) {
engine = PUBSUB_PROCESS_ENGINE;
} else if (rb_engine == Qnil) {
engine = NULL;
} else {
engine = iodine_engine_ruby2facil(rb_engine);
}
FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));
intptr_t ret =
pubsub_publish(.engine = engine, .channel = ch, .message = msg);
fiobj_free(ch);
fiobj_free(msg);
if (!ret)
return Qfalse;
return Qtrue;
(void)self;
}
|
Instance Method Details
#close ⇒ Object
Closes the websocket connection. The connection is only closed after existing data in the outgoing buffer is sent.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'ext/iodine/iodine_websockets.c', line 52
static VALUE iodine_ws_close(VALUE self) {
void *ws = get_ws(self);
if (!ws) {
return Qfalse;
}
iodine_pubsub_type_e c_type = (iodine_pubsub_type_e)iodine_get_cdata(self);
switch (c_type) {
case IODINE_PUBSUB_WEBSOCKET:
/* WebSockets*/
if (((protocol_s *)ws)->service != WEBSOCKET_ID_STR) {
return Qfalse;
}
websocket_close(ws);
break;
case IODINE_PUBSUB_SSE:
/* SSE */
http_sse_close(ws);
break;
case IODINE_PUBSUB_GLOBAL:
/* fallthrough */
default:
return Qfalse;
break;
}
return self;
}
|
#defer(*args) ⇒ Object
Schedules a block of code to execute at a later time, if the connection is still open and while preventing concurent code from running for the same connection object.
An optional ‘conn_id` argument can be passed along, so that the block of code will run for the requested connection rather then this connection.
Careful*: as this might cause this connection’s object to run code concurrently when data owned by this connection is accessed from within the block of code.
On success returns the block, otherwise (connection invalid) returns ‘false`. A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'ext/iodine/iodine_websockets.c', line 185
static VALUE iodine_defer(int argc, VALUE *argv, VALUE self) {
intptr_t fd;
// check arguments.
if (argc > 1)
rb_raise(rb_eArgError, "this function expects no more then 1 (optional) "
"argument.");
else if (argc == 1) {
Check_Type(*argv, T_FIXNUM);
fd = FIX2LONG(*argv);
if (!sock_isvalid(fd))
return Qfalse;
} else
fd = iodine_get_fd(self);
if (!fd)
rb_raise(rb_eArgError, "This method requires a target connection.");
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
.fallback = iodine_defer_fallback);
return block;
}
|
#on_close ⇒ Object
Please implement your own callback for this event.
363 364 365 366 |
# File 'ext/iodine/iodine_websockets.c', line 363
static VALUE empty_func(VALUE self) {
(void)(self);
return Qnil;
}
|
#on_drained ⇒ Object
Please implement your own callback for this event.
369 370 371 372 373 |
# File 'ext/iodine/iodine_websockets.c', line 369
static VALUE empty_func_drained(VALUE self) {
RubyCaller.call(self, iodine_on_ready_id);
(void)(self);
return Qnil;
}
|
#on_open ⇒ Object
Please implement your own callback for this event.
363 364 365 366 |
# File 'ext/iodine/iodine_websockets.c', line 363
static VALUE empty_func(VALUE self) {
(void)(self);
return Qnil;
}
|
#on_ready ⇒ Object
DEPRECATED! Please override #on_drained instead.
376 377 378 379 |
# File 'ext/iodine/iodine_websockets.c', line 376
static VALUE empty_func_on_ready(VALUE self) {
(void)(self);
return Qnil;
}
|
#on_shutdown ⇒ Object
Please implement your own callback for this event.
363 364 365 366 |
# File 'ext/iodine/iodine_websockets.c', line 363
static VALUE empty_func(VALUE self) {
(void)(self);
return Qnil;
}
|
#open? ⇒ Boolean
Returns true is the connection is open, false if it isn’t.
135 136 137 138 139 140 |
# File 'ext/iodine/iodine_websockets.c', line 135
static VALUE iodine_ws_is_open(VALUE self) {
intptr_t uuid = get_uuid(self);
if (uuid && sock_isvalid(uuid))
return Qtrue;
return Qfalse;
}
|
#pending ⇒ Object
Returns the number of pending writes or -1 if the connection is closed
125 126 127 128 129 130 |
# File 'ext/iodine/iodine_websockets.c', line 125
static VALUE iodine_ws_has_pending(VALUE self) {
intptr_t uuid = get_uuid(self);
if (!uuid || sock_isclosed(uuid))
return INT2NUM(-1);
return SIZET2NUM(sock_pending(uuid));
}
|
#publish(*args) ⇒ Object
Publishes a message to a channel.
Can be used using two Strings:
publish(to, )
The method accepts an optional ‘engine` argument:
publish(to, , my_pubsub_engine)
Alternatively, accepts the following named arguments:
- :to
-
The channel to publish to (required).
- :message
-
The message to be published (required).
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 |
# File 'ext/iodine/iodine_pubsub.c', line 796
VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
VALUE rb_ch, rb_msg, rb_engine = Qnil;
const pubsub_engine_s *engine = NULL;
switch (argc) {
case 3:
/* fallthrough */
rb_engine = argv[2];
case 2:
rb_ch = argv[0];
rb_msg = argv[1];
break;
case 1: {
/* single argument must be a Hash */
Check_Type(argv[0], T_HASH);
rb_ch = rb_hash_aref(argv[0], to_sym_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
rb_ch = rb_hash_aref(argv[0], channel_sym_id);
}
rb_msg = rb_hash_aref(argv[0], message_sym_id);
rb_engine = rb_hash_aref(argv[0], engine_varid);
} break;
default:
rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
}
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required.");
}
Check_Type(rb_msg, T_STRING);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "target / channel is required .");
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
if (rb_engine == Qfalse) {
engine = PUBSUB_PROCESS_ENGINE;
} else if (rb_engine == Qnil) {
engine = NULL;
} else {
engine = iodine_engine_ruby2facil(rb_engine);
}
FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));
intptr_t ret =
pubsub_publish(.engine = engine, .channel = ch, .message = msg);
fiobj_free(ch);
fiobj_free(msg);
if (!ret)
return Qfalse;
return Qtrue;
(void)self;
}
|
#subscribe(*args) ⇒ Object
Subscribes the connection to a Pub/Sub channel.
The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:
subscribe("my_stream") {|from, msg| p msg }
subscribe("my_stream", match: :redis) {|from, msg| p msg }
subscribe(to: "my_stream") {|from, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc
The first argument must be either a String or a Hash.
The second, optional, argument must be a Hash (if given).
The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):
- :match
-
The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.
- :to
-
The channel / subject to subscribe to.
- :as
-
valid for WebSocket connections only. can be either ‘:text` or `:binary`. `:text` is the default transport for pub/sub events.
Returns an PubSub::Subscription object that answers to:
- #.close
-
closes the connection.
- #.to_s
-
returns the subscription’s target (stream / channel / subject).
- #.==(str)
-
returns true if the string is an exact match for the target (even if the target itself is a pattern).
284 285 286 287 288 289 |
# File 'ext/iodine/iodine_websockets.c', line 284
static VALUE iodine_ws_subscribe(int argc, VALUE *argv, VALUE self) {
// clang-format on
ws_s *owner = get_ws(self);
return iodine_subscribe(argc, argv, owner,
(iodine_pubsub_type_e)iodine_get_cdata(self));
}
|
#write(data) ⇒ Object
Writes data to the websocket.
Returns ‘true` on success or `false if the websocket was closed or an error occurred.
‘write` will return immediately, adding the data to the outgoing queue.
If the connection is closed, ‘write` will raise an exception.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'ext/iodine/iodine_websockets.c', line 89
static VALUE iodine_ws_write(VALUE self, VALUE data) {
Check_Type(data, T_STRING);
void *ws = get_ws(self);
iodine_pubsub_type_e c_type = (iodine_pubsub_type_e)iodine_get_cdata(self);
if (!ws || !c_type) {
rb_raise(rb_eIOError, "Connection is closed");
return Qfalse;
}
switch (c_type) {
case IODINE_PUBSUB_WEBSOCKET:
/* WebSockets*/
if (((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
goto error;
websocket_write(ws, RSTRING_PTR(data), RSTRING_LEN(data),
rb_enc_get(data) == IodineUTF8Encoding);
return Qtrue;
break;
case IODINE_PUBSUB_SSE:
/* SSE */
http_sse_write(
ws, .data = {.data = RSTRING_PTR(data), .len = RSTRING_LEN(data)});
return Qtrue;
break;
case IODINE_PUBSUB_GLOBAL:
/* fallthrough */
default:
error:
rb_raise(rb_eIOError, "Connection is closed");
return Qfalse;
}
return Qfalse;
}
|