Module: Iodine::Websocket

Defined in:
ext/iodine/iodine_websocket.c

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.defer(ws_uuid) ⇒ Object

Schedules a block of code to run for the specified connection at a later time, (if the connection is open) and while preventing concurent code from running for the same connection object.

The block of code will receive the connection’s object. i.e.

Iodine::Websocket.defer(uuid) {|ws| ws.write "I'm doing this" }

On success returns the block, otherwise (connection invalid) returns ‘false`. A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'ext/iodine/iodine_websocket.c', line 388

static VALUE iodine_class_defer(VALUE self, VALUE ws_uuid) {
  (void)(self);
  intptr_t fd = FIX2LONG(ws_uuid);
  if (!sock_isvalid(fd))
    return Qfalse;
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);

  facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
              .fallback = iodine_defer_fallback);
  return block;
}

.eachObject

Runs the required block for each dynamic protocol connection.

Tasks will be performed asynchronously, within each connections lock, so no connection will have more then one task being performed at the same time (similar to #defer).

Also, unlike Iodine.run, the block will not be called unless the connection remains open at the time it’s execution is scheduled.

Always returns ‘self`.



364
365
366
367
368
369
370
371
372
373
# File 'ext/iodine/iodine_websocket.c', line 364

static VALUE iodine_ws_class_each(VALUE self) {
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  iodine_ws_run_each(-1, block);
  return self;
}

.each_write(data) ⇒ Object

Writes data to all the Websocket connections sharing the same process (worker) except ‘self`.

If a block is given, it will be passed each Websocket connection in turn (much like ‘each`) and send the data only if the block returns a “truthy” value (i.e. NOT `false` or `nil`).

See both #write and #each for more details.



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'ext/iodine/iodine_websocket.c', line 274

static VALUE iodine_ws_multiwrite(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  ws_s *ws = get_ws(self);
  // if ((void *)ws == (void *)0x04 || (void *)data == (void *)0x04 ||
  //     RSTRING_PTR(data) == (void *)0x04)
  //   fprintf(stderr, "iodine_ws_write: self = %p ; data = %p\n"
  //                   "\t\tString ptr: %p, String length: %lu\n",
  //           (void *)ws, (void *)data, RSTRING_PTR(data), RSTRING_LEN(data));
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    ws = NULL;

  VALUE block = Qnil;
  if (rb_block_given_p())
    block = rb_block_proc();
  if (block != Qnil)
    Registry.add(block);
  websocket_write_each(.origin = ws, .data = RSTRING_PTR(data),
                       .length = RSTRING_LEN(data),
                       .is_text = (rb_enc_get(data) == UTF8Encoding),
                       .on_finished = iodine_ws_write_each_complete,
                       .filter =
                           ((block == Qnil) ? NULL : iodine_ws_if_callback),
                       .arg = (void *)block);
  return Qtrue;
}

Instance Method Details

#closeObject

Closes the websocket connection. The connection is only closed after existing data in the outgoing buffer is sent.



122
123
124
125
126
127
128
# File 'ext/iodine/iodine_websocket.c', line 122

static VALUE iodine_ws_close(VALUE self) {
  ws_s *ws = get_ws(self);
  if (((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    return Qfalse;
  websocket_close(ws);
  return self;
}

#conn_idObject

Returns a connection’s UUID which is valid for **this process** (not a machine or internet unique value).

This can be used together with a true process wide UUID to uniquely identify a connection across the internet.



178
179
180
181
# File 'ext/iodine/iodine_websocket.c', line 178

static VALUE iodine_ws_uuid(VALUE self) {
  intptr_t uuid = get_uuid(self);
  return LONG2FIX(uuid);
}

#countObject

Returns the number of active websocket connections (including connections that are in the process of closing down).



156
157
158
159
# File 'ext/iodine/iodine_websocket.c', line 156

static VALUE iodine_ws_count(VALUE self) {
  return LONG2FIX(websocket_count());
  (void)self;
}

#defer(*args) ⇒ Object

Schedules a block of code to execute at a later time, if the connection is still open and while preventing concurent code from running for the same connection object.

An optional ‘uuid` argument can be passed along, so that the block of code will run for the requested connection rather then this connection.

Careful*: as this might cause this connection’s object to run code concurrently when data owned by this connection is accessed from within the block of code.

On success returns the block, otherwise (connection invalid) returns ‘false`. A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'ext/iodine/iodine_websocket.c', line 219

static VALUE iodine_defer(int argc, VALUE *argv, VALUE self) {
  intptr_t fd;
  // check arguments.
  if (argc > 1)
    rb_raise(rb_eArgError, "this function expects no more then 1 (optional) "
                           "argument.");
  else if (argc == 1) {
    Check_Type(*argv, T_FIXNUM);
    fd = FIX2LONG(*argv);
    if (!sock_isvalid(fd))
      return Qfalse;
  } else
    fd = iodine_get_fd(self);
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);

  facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
              .fallback = iodine_defer_fallback);
  return block;
}

#eachObject

Performs a block of code for each websocket connection. The function returns the block of code.

The block of code should accept a single variable which is the websocket connection.

i.e.:

def on_message data
  msg = data.dup; # data will be overwritten once the function exists.
  each {|ws| ws.write msg}
end

The block of code will be executed asynchronously, to avoid having two blocks of code running at the same time and minimizing race conditions when using multilple threads.



340
341
342
343
344
345
346
347
348
349
350
# File 'ext/iodine/iodine_websocket.c', line 340

static VALUE iodine_ws_each(VALUE self) {
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qnil;
  Registry.add(block);
  intptr_t fd = get_uuid(self);
  iodine_ws_run_each(fd, block);
  return block;
}

#each_write(data) ⇒ Object

Writes data to all the Websocket connections sharing the same process (worker) except ‘self`.

If a block is given, it will be passed each Websocket connection in turn (much like ‘each`) and send the data only if the block returns a “truthy” value (i.e. NOT `false` or `nil`).

See both #write and #each for more details.



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'ext/iodine/iodine_websocket.c', line 274

static VALUE iodine_ws_multiwrite(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  ws_s *ws = get_ws(self);
  // if ((void *)ws == (void *)0x04 || (void *)data == (void *)0x04 ||
  //     RSTRING_PTR(data) == (void *)0x04)
  //   fprintf(stderr, "iodine_ws_write: self = %p ; data = %p\n"
  //                   "\t\tString ptr: %p, String length: %lu\n",
  //           (void *)ws, (void *)data, RSTRING_PTR(data), RSTRING_LEN(data));
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    ws = NULL;

  VALUE block = Qnil;
  if (rb_block_given_p())
    block = rb_block_proc();
  if (block != Qnil)
    Registry.add(block);
  websocket_write_each(.origin = ws, .data = RSTRING_PTR(data),
                       .length = RSTRING_LEN(data),
                       .is_text = (rb_enc_get(data) == UTF8Encoding),
                       .on_finished = iodine_ws_write_each_complete,
                       .filter =
                           ((block == Qnil) ? NULL : iodine_ws_if_callback),
                       .arg = (void *)block);
  return Qtrue;
}

#has_pending?Boolean

Returns a weak indication as to the state of the socket’s buffer. If the server has data in the buffer that wasn’t written to the socket, ‘has_pending?` will return `true`, otherwise `false` will be returned.

Returns:

  • (Boolean)


166
167
168
169
# File 'ext/iodine/iodine_websocket.c', line 166

static VALUE iodine_ws_has_pending(VALUE self) {
  intptr_t uuid = get_uuid(self);
  return sock_has_pending(uuid) ? Qtrue : Qfalse;
}

#on_closeObject

Please implement your own callback for this event.



489
490
491
492
# File 'ext/iodine/iodine_websocket.c', line 489

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_openObject

Please implement your own callback for this event.



489
490
491
492
# File 'ext/iodine/iodine_websocket.c', line 489

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_readyObject

Please implement your own callback for this event.



489
490
491
492
# File 'ext/iodine/iodine_websocket.c', line 489

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_shutdownObject

Please implement your own callback for this event.



489
490
491
492
# File 'ext/iodine/iodine_websocket.c', line 489

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#write(data) ⇒ Object

Writes data to the websocket.

Returns ‘true` on success or `false if the websocket was closed or an error occurred.

‘write` will return immediately UNLESS resources are insufficient. If the global `write` buffer is full, `write` will block until a buffer “packet” becomes available and can be assigned to the socket.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'ext/iodine/iodine_websocket.c', line 139

static VALUE iodine_ws_write(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  ws_s *ws = get_ws(self);
  // if ((void *)ws == (void *)0x04 || (void *)data == (void *)0x04 ||
  //     RSTRING_PTR(data) == (void *)0x04)
  //   fprintf(stderr, "iodine_ws_write: self = %p ; data = %p\n"
  //                   "\t\tString ptr: %p, String length: %lu\n",
  //           (void *)ws, (void *)data, RSTRING_PTR(data), RSTRING_LEN(data));
  if (!ws || ((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
    return Qfalse;
  websocket_write(ws, RSTRING_PTR(data), RSTRING_LEN(data),
                  rb_enc_get(data) == UTF8Encoding);
  return Qtrue;
}