Module: Iodine::Websocket

Defined in:
lib/iodine/websocket.rb,
ext/iodine/iodine_websockets.c

Overview

This module lists the available API for Websocket connections and classes.

This module is mixed in (using ‘extend` and `include`) with the Websocket Callback Object (as specified by the proposed Rack specification.

The websocket API is divided into three main groups:

Notice that Websocket callback objects (as specified by the proposed Rack specification MUST provide an ‘on_message(data)` callback.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.countObject

Returns the number of active websocket connections (including connections that are in the process of closing down).



162
163
164
165
# File 'ext/iodine/iodine_websockets.c', line 162

static VALUE iodine_ws_count(VALUE self) {
  return LONG2FIX(websocket_count());
  (void)self;
}

.defer(ws_uuid) ⇒ Object

Schedules a block of code to run for the specified websocket at a later time, (if the connection is open). The block will run within the connection’s lock, offering a fast concurrency synchronizing tool.

The block of code will receive the websocket’s callback object. i.e.

Iodine::Websocket.defer(uuid) {|ws| ws.write "I'm doing this" }

On success returns the block, otherwise (connection invalid) returns ‘false`.

A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).



615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
# File 'ext/iodine/iodine_websockets.c', line 615

static VALUE iodine_class_defer(VALUE self, VALUE ws_uuid) {
  (void)(self);
  intptr_t fd = FIX2LONG(ws_uuid);
  if (!sock_isvalid(fd))
    return Qfalse;
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);

  facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
              .fallback = iodine_defer_fallback);
  return block;
}

.eachObject

Runs the required block for each websocket.

Tasks will be performed asynchronously, within each connection’s lock, so no connection will have more then one task being performed at the same time (similar to #defer).

Also, unlike Iodine.run, the block will not be called unless the websocket is still open at the time it’s execution begins.

Always returns ‘self`.



590
591
592
593
594
595
596
597
598
599
# File 'ext/iodine/iodine_websockets.c', line 590

static VALUE iodine_ws_class_each(VALUE self) {
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  iodine_ws_run_each(-1, block);
  return self;
}

Instance Method Details

#closeObject

Closes the websocket connection. The connection is only closed after existing data in the outgoing buffer is sent.



128
129
130
131
132
133
134
# File 'ext/iodine/iodine_websockets.c', line 128

static VALUE iodine_ws_close(VALUE self) {
  ws_s *ws = get_ws(self);
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    return Qfalse;
  websocket_close(ws);
  return self;
}

#conn_idObject

Returns a connection’s UUID which is valid for *this process* (not a machine or internet unique value).

This can be used together with a true process wide UUID to uniquely identify a connection across the internet.



184
185
186
187
# File 'ext/iodine/iodine_websockets.c', line 184

static VALUE iodine_ws_uuid(VALUE self) {
  intptr_t uuid = get_uuid(self);
  return LONG2FIX(uuid);
}

#defer(*args) ⇒ Object

Schedules a block of code to execute at a later time, if the connection is still open and while preventing concurent code from running for the same connection object.

An optional ‘conn_id` argument can be passed along, so that the block of code will run for the requested connection rather then this connection.

Careful*: as this might cause this connection’s object to run code concurrently when data owned by this connection is accessed from within the block of code.

On success returns the block, otherwise (connection invalid) returns ‘false`. A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'ext/iodine/iodine_websockets.c', line 223

static VALUE iodine_defer(int argc, VALUE *argv, VALUE self) {
  intptr_t fd;
  // check arguments.
  if (argc > 1)
    rb_raise(rb_eArgError, "this function expects no more then 1 (optional) "
                           "argument.");
  else if (argc == 1) {
    Check_Type(*argv, T_FIXNUM);
    fd = FIX2LONG(*argv);
    if (!sock_isvalid(fd))
      return Qfalse;
  } else
    fd = iodine_get_fd(self);
  if (!fd)
    rb_raise(rb_eArgError, "This method requires a target connection.");
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);

  facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
              .fallback = iodine_defer_fallback);
  return block;
}

#has_pending?Boolean

Returns a weak indication as to the state of the socket’s buffer. If the server has data in the buffer that wasn’t written to the socket, ‘has_pending?` will return `true`, otherwise `false` will be returned.

Returns:

  • (Boolean)


172
173
174
175
# File 'ext/iodine/iodine_websockets.c', line 172

static VALUE iodine_ws_has_pending(VALUE self) {
  intptr_t uuid = get_uuid(self);
  return sock_has_pending(uuid) ? Qtrue : Qfalse;
}

#on_closeObject

Please implement your own callback for this event.



697
698
699
700
# File 'ext/iodine/iodine_websockets.c', line 697

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_openObject

Please implement your own callback for this event.



697
698
699
700
# File 'ext/iodine/iodine_websockets.c', line 697

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_readyObject

Please implement your own callback for this event.



697
698
699
700
# File 'ext/iodine/iodine_websockets.c', line 697

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_shutdownObject

Please implement your own callback for this event.



697
698
699
700
# File 'ext/iodine/iodine_websockets.c', line 697

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#publish(args) ⇒ Object

Publishes a message to a channel.

Accepts a single Hash argument with the following possible options:

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.

:channel

Required (unless :pattern). The channel to publish to.

:pattern

An alternative to the required :channel, publishes to a pattern.

This is NOT supported by Redis and it’s limited to the local process cluster.

:message

REQUIRED. The message to be published.

:



433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'ext/iodine/iodine_websockets.c', line 433

static VALUE iodine_ws_publish(VALUE self, VALUE args) {
  Check_Type(args, T_HASH);
  uint8_t use_pattern = 0;

  VALUE rb_ch = rb_hash_aref(args, channel_var_id);
  if (rb_ch == Qnil || rb_ch == Qfalse) {
    use_pattern = 1;
    rb_ch = rb_hash_aref(args, pattern_var_id);
    if (rb_ch == Qnil || rb_ch == Qfalse)
      rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
  }
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  VALUE rb_msg = rb_hash_aref(args, message_var_id);
  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required for the :publish method.");
  }
  Check_Type(rb_msg, T_STRING);

  pubsub_engine_s *engine =
      iodine_engine_ruby2facil(rb_hash_aref(args, engine_var_id));

  intptr_t subid =
      pubsub_publish(.engine = engine, .channel.name = (RSTRING_PTR(rb_ch)),
                     .channel.len = (RSTRING_LEN(rb_ch)),
                     .msg.data = (RSTRING_PTR(rb_msg)),
                     .msg.len = (RSTRING_LEN(rb_msg)),
                     .use_pattern = use_pattern);
  if (!subid)
    return Qfalse;
  return Qtrue;
  (void)self;
}

#subscribe(args) ⇒ Object

Subscribes the websocket to a channel belonging to a specific pub/sub service (using an PubSub::Engine to connect Iodine to the service).

The function accepts a single argument (a Hash) and an optional block.

If no block is provided, the message is sent directly to the websocket client.

Accepts a single Hash argument with the following possible options:

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.

:channel

Required (unless :pattern). The channel to subscribe to.

:pattern

An alternative to the required :channel, subscribes to a pattern.

:force

This can be set to either nil, :text or :binary and controls the way

the message will be forwarded to the websocket client. This is only valid if no block was provided. Defaults to smart encoding based testing.



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'ext/iodine/iodine_websockets.c', line 298

static VALUE iodine_ws_subscribe(VALUE self, VALUE args) {
  Check_Type(args, T_HASH);
  ws_s *ws = get_ws(self);
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    return Qfalse;
  uint8_t use_pattern = 0, force_text = 0, force_binary = 0;

  VALUE rb_ch = rb_hash_aref(args, channel_var_id);
  if (rb_ch == Qnil || rb_ch == Qfalse) {
    use_pattern = 1;
    rb_ch = rb_hash_aref(args, pattern_var_id);
    if (rb_ch == Qnil || rb_ch == Qfalse)
      rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
  }
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  VALUE tmp = rb_hash_aref(args, force_var_id);
  if (tmp == text_var_id)
    force_text = 1;
  else if (tmp == binary_var_id)
    force_binary = 1;

  VALUE block = 0;
  if (rb_block_given_p()) {
    block = rb_block_proc();
    Registry.add(block);
  }

  pubsub_engine_s *engine =
      iodine_engine_ruby2facil(rb_hash_aref(args, engine_var_id));

  uintptr_t subid = websocket_subscribe(
      ws, .channel.name = RSTRING_PTR(rb_ch), .channel.len = RSTRING_LEN(rb_ch),
      .engine = engine, .use_pattern = use_pattern, .force_text = force_text,
      .force_binary = force_binary,
      .on_message = (block ? on_pubsub_notificationin : NULL),
      .on_unsubscribe = (block ? iodine_on_unsubscribe : NULL),
      .udata = (void *)block);
  if (!subid)
    return Qnil;
  return ULL2NUM(subid);
}

#subscribed?(args) ⇒ Boolean

Searches for the subscription ID for the describes subscription.

Takes the same arguments as #subscribe, a single Hash argument with the following possible options:

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.

:channel

The subscription’s channel.

:pattern

An alternative to the required :channel, subscribes to a pattern.

:force

This can be set to either nil, :text or :binary and controls the way

the message will be forwarded to the websocket client. This is only valid if no block was provided. Defaults to smart encoding based testing.

Returns:

  • (Boolean)


360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'ext/iodine/iodine_websockets.c', line 360

static VALUE iodine_ws_is_subscribed(VALUE self, VALUE args) {
  Check_Type(args, T_HASH);
  ws_s *ws = get_ws(self);
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    return Qfalse;
  uint8_t use_pattern = 0, force_text = 0, force_binary = 0;

  VALUE rb_ch = rb_hash_aref(args, channel_var_id);
  if (rb_ch == Qnil || rb_ch == Qfalse) {
    use_pattern = 1;
    rb_ch = rb_hash_aref(args, pattern_var_id);
    if (rb_ch == Qnil || rb_ch == Qfalse)
      rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
  }
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  VALUE tmp = rb_hash_aref(args, force_var_id);
  if (tmp == text_var_id)
    force_text = 1;
  else if (tmp == binary_var_id)
    force_binary = 1;

  VALUE block = 0;
  if (rb_block_given_p()) {
    block = rb_block_proc();
  }

  pubsub_engine_s *engine =
      iodine_engine_ruby2facil(rb_hash_aref(args, engine_var_id));

  uintptr_t subid = websocket_find_sub(
      ws, .channel.name = RSTRING_PTR(rb_ch), .channel.len = RSTRING_LEN(rb_ch),
      .engine = engine, .use_pattern = use_pattern, .force_text = force_text,
      .force_binary = force_binary,
      .on_message = (block ? on_pubsub_notificationin : NULL),
      .udata = (void *)block);
  if (!subid)
    return Qnil;
  return LONG2NUM(subid);
}

#unsubscribe(sub_id) ⇒ Object

Cancels the subscription matching ‘sub_id`.



406
407
408
409
410
411
412
413
414
415
# File 'ext/iodine/iodine_websockets.c', line 406

static VALUE iodine_ws_unsubscribe(VALUE self, VALUE sub_id) {
  if (sub_id == Qnil || sub_id == Qfalse)
    return Qnil;
  ws_s *ws = get_ws(self);
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    return Qfalse;
  Check_Type(sub_id, T_FIXNUM);
  websocket_unsubscribe(ws, NUM2LONG(sub_id));
  return Qnil;
}

#write(data) ⇒ Object

Writes data to the websocket.

Returns ‘true` on success or `false if the websocket was closed or an error occurred.

‘write` will return immediately UNLESS resources are insufficient. If the global `write` buffer is full, `write` will block until a buffer “packet” becomes available and can be assigned to the socket.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'ext/iodine/iodine_websockets.c', line 145

static VALUE iodine_ws_write(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  ws_s *ws = get_ws(self);
  // if ((void *)ws == (void *)0x04 || (void *)data == (void *)0x04 ||
  //     RSTRING_PTR(data) == (void *)0x04)
  //   fprintf(stderr, "iodine_ws_write: self = %p ; data = %p\n"
  //                   "\t\tString ptr: %p, String length: %lu\n",
  //           (void *)ws, (void *)data, RSTRING_PTR(data), RSTRING_LEN(data));
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    return Qfalse;
  websocket_write(ws, RSTRING_PTR(data), RSTRING_LEN(data),
                  rb_enc_get(data) == IodineUTF8Encoding);
  return Qtrue;
}