Module: Iodine::Websocket
- Defined in:
- lib/iodine/websocket.rb,
ext/iodine/iodine_websockets.c
Overview
This module lists the available API for Websocket connections and classes.
This module is mixed in (using ‘extend` and `include`) with the Websocket Callback Object (as specified by the proposed Rack specification.
The websocket API is divided into three main groups:
-
Client <=> Server <=> Client relations (#subscribe, #publish, etc’).
-
Task scheduling (Websocket.defer, #defer, Websocket.each).
Notice that Websocket callback objects (as specified by the proposed Rack specification MUST provide an ‘on_message(data)` callback.
Class Method Summary collapse
-
.count ⇒ Object
Returns the number of active websocket connections (including connections that are in the process of closing down).
-
.defer(ws_uuid) ⇒ Object
Schedules a block of code to run for the specified websocket at a later time, (if the connection is open).
-
.each ⇒ Object
Runs the required block for each websocket.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the websocket connection.
-
#conn_id ⇒ Object
Returns a connection’s UUID which is valid for *this process* (not a machine or internet unique value).
-
#defer(*args) ⇒ Object
Schedules a block of code to execute at a later time, if the connection is still open and while preventing concurent code from running for the same connection object.
-
#has_pending? ⇒ Boolean
Returns a weak indication as to the state of the socket’s buffer.
-
#on_close ⇒ Object
Please implement your own callback for this event.
-
#on_open ⇒ Object
Please implement your own callback for this event.
-
#on_ready ⇒ Object
Please implement your own callback for this event.
-
#on_shutdown ⇒ Object
Please implement your own callback for this event.
-
#publish(args) ⇒ Object
Publishes a message to a channel.
-
#subscribe(args) ⇒ Object
Subscribes the websocket to a channel belonging to a specific pub/sub service (using an PubSub::Engine to connect Iodine to the service).
-
#subscribed?(args) ⇒ Boolean
Searches for the subscription ID for the describes subscription.
-
#unsubscribe(sub_id) ⇒ Object
Cancels the subscription matching ‘sub_id`.
-
#write(data) ⇒ Object
Writes data to the websocket.
Class Method Details
.count ⇒ Object
Returns the number of active websocket connections (including connections that are in the process of closing down).
162 163 164 165 |
# File 'ext/iodine/iodine_websockets.c', line 162
static VALUE iodine_ws_count(VALUE self) {
return LONG2FIX(websocket_count());
(void)self;
}
|
.defer(ws_uuid) ⇒ Object
Schedules a block of code to run for the specified websocket at a later time, (if the connection is open). The block will run within the connection’s lock, offering a fast concurrency synchronizing tool.
The block of code will receive the websocket’s callback object. i.e.
Iodine::Websocket.defer(uuid) {|ws| ws.write "I'm doing this" }
On success returns the block, otherwise (connection invalid) returns ‘false`.
A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 |
# File 'ext/iodine/iodine_websockets.c', line 615
static VALUE iodine_class_defer(VALUE self, VALUE ws_uuid) {
(void)(self);
intptr_t fd = FIX2LONG(ws_uuid);
if (!sock_isvalid(fd))
return Qfalse;
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
.fallback = iodine_defer_fallback);
return block;
}
|
.each ⇒ Object
Runs the required block for each websocket.
Tasks will be performed asynchronously, within each connection’s lock, so no connection will have more then one task being performed at the same time (similar to #defer).
Also, unlike Iodine.run, the block will not be called unless the websocket is still open at the time it’s execution begins.
Always returns ‘self`.
590 591 592 593 594 595 596 597 598 599 |
# File 'ext/iodine/iodine_websockets.c', line 590
static VALUE iodine_ws_class_each(VALUE self) {
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
iodine_ws_run_each(-1, block);
return self;
}
|
Instance Method Details
#close ⇒ Object
Closes the websocket connection. The connection is only closed after existing data in the outgoing buffer is sent.
128 129 130 131 132 133 134 |
# File 'ext/iodine/iodine_websockets.c', line 128
static VALUE iodine_ws_close(VALUE self) {
ws_s *ws = get_ws(self);
if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
return Qfalse;
websocket_close(ws);
return self;
}
|
#conn_id ⇒ Object
Returns a connection’s UUID which is valid for *this process* (not a machine or internet unique value).
This can be used together with a true process wide UUID to uniquely identify a connection across the internet.
184 185 186 187 |
# File 'ext/iodine/iodine_websockets.c', line 184 static VALUE iodine_ws_uuid(VALUE self) { intptr_t uuid = get_uuid(self); return LONG2FIX(uuid); } |
#defer(*args) ⇒ Object
Schedules a block of code to execute at a later time, if the connection is still open and while preventing concurent code from running for the same connection object.
An optional ‘conn_id` argument can be passed along, so that the block of code will run for the requested connection rather then this connection.
Careful*: as this might cause this connection’s object to run code concurrently when data owned by this connection is accessed from within the block of code.
On success returns the block, otherwise (connection invalid) returns ‘false`. A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'ext/iodine/iodine_websockets.c', line 223
static VALUE iodine_defer(int argc, VALUE *argv, VALUE self) {
intptr_t fd;
// check arguments.
if (argc > 1)
rb_raise(rb_eArgError, "this function expects no more then 1 (optional) "
"argument.");
else if (argc == 1) {
Check_Type(*argv, T_FIXNUM);
fd = FIX2LONG(*argv);
if (!sock_isvalid(fd))
return Qfalse;
} else
fd = iodine_get_fd(self);
if (!fd)
rb_raise(rb_eArgError, "This method requires a target connection.");
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
.fallback = iodine_defer_fallback);
return block;
}
|
#has_pending? ⇒ Boolean
Returns a weak indication as to the state of the socket’s buffer. If the server has data in the buffer that wasn’t written to the socket, ‘has_pending?` will return `true`, otherwise `false` will be returned.
172 173 174 175 |
# File 'ext/iodine/iodine_websockets.c', line 172 static VALUE iodine_ws_has_pending(VALUE self) { intptr_t uuid = get_uuid(self); return sock_has_pending(uuid) ? Qtrue : Qfalse; } |
#on_close ⇒ Object
Please implement your own callback for this event.
697 698 699 700 |
# File 'ext/iodine/iodine_websockets.c', line 697
static VALUE empty_func(VALUE self) {
(void)(self);
return Qnil;
}
|
#on_open ⇒ Object
Please implement your own callback for this event.
697 698 699 700 |
# File 'ext/iodine/iodine_websockets.c', line 697
static VALUE empty_func(VALUE self) {
(void)(self);
return Qnil;
}
|
#on_ready ⇒ Object
Please implement your own callback for this event.
697 698 699 700 |
# File 'ext/iodine/iodine_websockets.c', line 697
static VALUE empty_func(VALUE self) {
(void)(self);
return Qnil;
}
|
#on_shutdown ⇒ Object
Please implement your own callback for this event.
697 698 699 700 |
# File 'ext/iodine/iodine_websockets.c', line 697
static VALUE empty_func(VALUE self) {
(void)(self);
return Qnil;
}
|
#publish(args) ⇒ Object
Publishes a message to a channel.
Accepts a single Hash argument with the following possible options:
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
- :channel
-
Required (unless :pattern). The channel to publish to.
- :pattern
-
An alternative to the required :channel, publishes to a pattern.
This is NOT supported by Redis and it’s limited to the local process cluster.
- :message
-
REQUIRED. The message to be published.
:
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 |
# File 'ext/iodine/iodine_websockets.c', line 433
static VALUE iodine_ws_publish(VALUE self, VALUE args) {
Check_Type(args, T_HASH);
uint8_t use_pattern = 0;
VALUE rb_ch = rb_hash_aref(args, channel_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
use_pattern = 1;
rb_ch = rb_hash_aref(args, pattern_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
}
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
VALUE rb_msg = rb_hash_aref(args, message_var_id);
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required for the :publish method.");
}
Check_Type(rb_msg, T_STRING);
pubsub_engine_s *engine =
iodine_engine_ruby2facil(rb_hash_aref(args, engine_var_id));
intptr_t subid =
pubsub_publish(.engine = engine, .channel.name = (RSTRING_PTR(rb_ch)),
.channel.len = (RSTRING_LEN(rb_ch)),
.msg.data = (RSTRING_PTR(rb_msg)),
.msg.len = (RSTRING_LEN(rb_msg)),
.use_pattern = use_pattern);
if (!subid)
return Qfalse;
return Qtrue;
(void)self;
}
|
#subscribe(args) ⇒ Object
Subscribes the websocket to a channel belonging to a specific pub/sub service (using an PubSub::Engine to connect Iodine to the service).
The function accepts a single argument (a Hash) and an optional block.
If no block is provided, the message is sent directly to the websocket client.
Accepts a single Hash argument with the following possible options:
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
- :channel
-
Required (unless :pattern). The channel to subscribe to.
- :pattern
-
An alternative to the required :channel, subscribes to a pattern.
- :force
-
This can be set to either nil, :text or :binary and controls the way
the message will be forwarded to the websocket client. This is only valid if no block was provided. Defaults to smart encoding based testing.
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'ext/iodine/iodine_websockets.c', line 298
static VALUE iodine_ws_subscribe(VALUE self, VALUE args) {
Check_Type(args, T_HASH);
ws_s *ws = get_ws(self);
if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
return Qfalse;
uint8_t use_pattern = 0, force_text = 0, force_binary = 0;
VALUE rb_ch = rb_hash_aref(args, channel_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
use_pattern = 1;
rb_ch = rb_hash_aref(args, pattern_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
}
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
VALUE tmp = rb_hash_aref(args, force_var_id);
if (tmp == text_var_id)
force_text = 1;
else if (tmp == binary_var_id)
force_binary = 1;
VALUE block = 0;
if (rb_block_given_p()) {
block = rb_block_proc();
Registry.add(block);
}
pubsub_engine_s *engine =
iodine_engine_ruby2facil(rb_hash_aref(args, engine_var_id));
uintptr_t subid = websocket_subscribe(
ws, .channel.name = RSTRING_PTR(rb_ch), .channel.len = RSTRING_LEN(rb_ch),
.engine = engine, .use_pattern = use_pattern, .force_text = force_text,
.force_binary = force_binary,
.on_message = (block ? on_pubsub_notificationin : NULL),
.on_unsubscribe = (block ? iodine_on_unsubscribe : NULL),
.udata = (void *)block);
if (!subid)
return Qnil;
return ULL2NUM(subid);
}
|
#subscribed?(args) ⇒ Boolean
Searches for the subscription ID for the describes subscription.
Takes the same arguments as #subscribe, a single Hash argument with the following possible options:
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
- :channel
-
The subscription’s channel.
- :pattern
-
An alternative to the required :channel, subscribes to a pattern.
- :force
-
This can be set to either nil, :text or :binary and controls the way
the message will be forwarded to the websocket client. This is only valid if no block was provided. Defaults to smart encoding based testing.
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'ext/iodine/iodine_websockets.c', line 360
static VALUE iodine_ws_is_subscribed(VALUE self, VALUE args) {
Check_Type(args, T_HASH);
ws_s *ws = get_ws(self);
if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
return Qfalse;
uint8_t use_pattern = 0, force_text = 0, force_binary = 0;
VALUE rb_ch = rb_hash_aref(args, channel_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
use_pattern = 1;
rb_ch = rb_hash_aref(args, pattern_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
}
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
VALUE tmp = rb_hash_aref(args, force_var_id);
if (tmp == text_var_id)
force_text = 1;
else if (tmp == binary_var_id)
force_binary = 1;
VALUE block = 0;
if (rb_block_given_p()) {
block = rb_block_proc();
}
pubsub_engine_s *engine =
iodine_engine_ruby2facil(rb_hash_aref(args, engine_var_id));
uintptr_t subid = websocket_find_sub(
ws, .channel.name = RSTRING_PTR(rb_ch), .channel.len = RSTRING_LEN(rb_ch),
.engine = engine, .use_pattern = use_pattern, .force_text = force_text,
.force_binary = force_binary,
.on_message = (block ? on_pubsub_notificationin : NULL),
.udata = (void *)block);
if (!subid)
return Qnil;
return LONG2NUM(subid);
}
|
#unsubscribe(sub_id) ⇒ Object
Cancels the subscription matching ‘sub_id`.
406 407 408 409 410 411 412 413 414 415 |
# File 'ext/iodine/iodine_websockets.c', line 406
static VALUE iodine_ws_unsubscribe(VALUE self, VALUE sub_id) {
if (sub_id == Qnil || sub_id == Qfalse)
return Qnil;
ws_s *ws = get_ws(self);
if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
return Qfalse;
Check_Type(sub_id, T_FIXNUM);
websocket_unsubscribe(ws, NUM2LONG(sub_id));
return Qnil;
}
|
#write(data) ⇒ Object
Writes data to the websocket.
Returns ‘true` on success or `false if the websocket was closed or an error occurred.
‘write` will return immediately UNLESS resources are insufficient. If the global `write` buffer is full, `write` will block until a buffer “packet” becomes available and can be assigned to the socket.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'ext/iodine/iodine_websockets.c', line 145
static VALUE iodine_ws_write(VALUE self, VALUE data) {
Check_Type(data, T_STRING);
ws_s *ws = get_ws(self);
// if ((void *)ws == (void *)0x04 || (void *)data == (void *)0x04 ||
// RSTRING_PTR(data) == (void *)0x04)
// fprintf(stderr, "iodine_ws_write: self = %p ; data = %p\n"
// "\t\tString ptr: %p, String length: %lu\n",
// (void *)ws, (void *)data, RSTRING_PTR(data), RSTRING_LEN(data));
if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
return Qfalse;
websocket_write(ws, RSTRING_PTR(data), RSTRING_LEN(data),
rb_enc_get(data) == IodineUTF8Encoding);
return Qtrue;
}
|