Module: Iodine::Websocket

Defined in:
lib/iodine/websocket.rb,
ext/iodine/iodine_websockets.c

Overview

This module lists the available API for WebSocket and EventSource (SSE) connections.

This module is mixed in (using ‘extend` and `include`) with the WebSocket Callback Object (as specified by the proposed Rack specification.

The server performs ‘extend` to allow the application to be namespace agnostic (so the server can be replaced without effecting the application).

The websocket API is divided into three main groups:

Notice that Websocket callback objects (as specified by the proposed Rack specification MUST provide an ‘on_message(data)` callback.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.publish(*args) ⇒ Object

Publishes a message to a channel.

Can be used using two Strings:

publish(to, message)

The method accepts an optional ‘engine` argument:

publish(to, message, my_pubsub_engine)

Alternatively, accepts the following named arguments:

:to

The channel to publish to (required).

:message

The message to be published (required).

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.



796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
# File 'ext/iodine/iodine_pubsub.c', line 796

VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
  VALUE rb_ch, rb_msg, rb_engine = Qnil;
  const pubsub_engine_s *engine = NULL;
  switch (argc) {
  case 3:
    /* fallthrough */
    rb_engine = argv[2];
  case 2:
    rb_ch = argv[0];
    rb_msg = argv[1];
    break;
  case 1: {
    /* single argument must be a Hash */
    Check_Type(argv[0], T_HASH);
    rb_ch = rb_hash_aref(argv[0], to_sym_id);
    if (rb_ch == Qnil || rb_ch == Qfalse) {
      rb_ch = rb_hash_aref(argv[0], channel_sym_id);
    }
    rb_msg = rb_hash_aref(argv[0], message_sym_id);
    rb_engine = rb_hash_aref(argv[0], engine_varid);
  } break;
  default:
    rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
  }

  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required.");
  }
  Check_Type(rb_msg, T_STRING);

  if (rb_ch == Qnil || rb_ch == Qfalse)
    rb_raise(rb_eArgError, "target / channel is required .");
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  if (rb_engine == Qfalse) {
    engine = PUBSUB_PROCESS_ENGINE;
  } else if (rb_engine == Qnil) {
    engine = NULL;
  } else {
    engine = iodine_engine_ruby2facil(rb_engine);
  }

  FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
  FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));

  intptr_t ret =
      pubsub_publish(.engine = engine, .channel = ch, .message = msg);
  fiobj_free(ch);
  fiobj_free(msg);
  if (!ret)
    return Qfalse;
  return Qtrue;
  (void)self;
}

Instance Method Details

#closeObject

Closes the websocket connection. The connection is only closed after existing data in the outgoing buffer is sent.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'ext/iodine/iodine_websockets.c', line 52

static VALUE iodine_ws_close(VALUE self) {
  void *ws = get_ws(self);
  if (!ws) {
    return Qfalse;
  }
  iodine_pubsub_type_e c_type = (iodine_pubsub_type_e)iodine_get_cdata(self);
  switch (c_type) {
  case IODINE_PUBSUB_WEBSOCKET:
    /* WebSockets*/
    if (((protocol_s *)ws)->service != WEBSOCKET_ID_STR) {
      return Qfalse;
    }
    websocket_close(ws);
    break;
  case IODINE_PUBSUB_SSE:
    /* SSE */
    http_sse_close(ws);
    break;
  case IODINE_PUBSUB_GLOBAL:
  /* fallthrough */
  default:
    return Qfalse;
    break;
  }
  return self;
}

#defer(*args) ⇒ Object

Schedules a block of code to execute at a later time, if the connection is still open and while preventing concurent code from running for the same connection object.

An optional ‘conn_id` argument can be passed along, so that the block of code will run for the requested connection rather then this connection.

Careful*: as this might cause this connection’s object to run code concurrently when data owned by this connection is accessed from within the block of code.

On success returns the block, otherwise (connection invalid) returns ‘false`. A sucessful event registration doesn’t guaranty that the block will be called (the connection might close between the event registration and the execution).



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'ext/iodine/iodine_websockets.c', line 185

static VALUE iodine_defer(int argc, VALUE *argv, VALUE self) {
  intptr_t fd;
  // check arguments.
  if (argc > 1)
    rb_raise(rb_eArgError, "this function expects no more then 1 (optional) "
                           "argument.");
  else if (argc == 1) {
    Check_Type(*argv, T_FIXNUM);
    fd = FIX2LONG(*argv);
    if (!sock_isvalid(fd))
      return Qfalse;
  } else
    fd = iodine_get_fd(self);
  if (!fd)
    rb_raise(rb_eArgError, "This method requires a target connection.");
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);

  facil_defer(.uuid = fd, .task = iodine_perform_defer, .arg = (void *)block,
              .fallback = iodine_defer_fallback);
  return block;
}

#on_closeObject

Please implement your own callback for this event.



363
364
365
366
# File 'ext/iodine/iodine_websockets.c', line 363

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_drainedObject

Please implement your own callback for this event.



369
370
371
372
373
# File 'ext/iodine/iodine_websockets.c', line 369

static VALUE empty_func_drained(VALUE self) {
  RubyCaller.call(self, iodine_on_ready_id);
  (void)(self);
  return Qnil;
}

#on_openObject

Please implement your own callback for this event.



363
364
365
366
# File 'ext/iodine/iodine_websockets.c', line 363

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_readyObject

DEPRECATED! Please override #on_drained instead.



376
377
378
379
# File 'ext/iodine/iodine_websockets.c', line 376

static VALUE empty_func_on_ready(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_shutdownObject

Please implement your own callback for this event.



363
364
365
366
# File 'ext/iodine/iodine_websockets.c', line 363

static VALUE empty_func(VALUE self) {
  (void)(self);
  return Qnil;
}

#open?Boolean

Returns true is the connection is open, false if it isn’t.

Returns:

  • (Boolean)


135
136
137
138
139
140
# File 'ext/iodine/iodine_websockets.c', line 135

static VALUE iodine_ws_is_open(VALUE self) {
  intptr_t uuid = get_uuid(self);
  if (uuid && sock_isvalid(uuid))
    return Qtrue;
  return Qfalse;
}

#pendingObject

Returns the number of pending writes or -1 if the connection is closed



125
126
127
128
129
130
# File 'ext/iodine/iodine_websockets.c', line 125

static VALUE iodine_ws_has_pending(VALUE self) {
  intptr_t uuid = get_uuid(self);
  if (!uuid || sock_isclosed(uuid))
    return INT2NUM(-1);
  return SIZET2NUM(sock_pending(uuid));
}

#publish(*args) ⇒ Object

Publishes a message to a channel.

Can be used using two Strings:

publish(to, message)

The method accepts an optional ‘engine` argument:

publish(to, message, my_pubsub_engine)

Alternatively, accepts the following named arguments:

:to

The channel to publish to (required).

:message

The message to be published (required).

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.



796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
# File 'ext/iodine/iodine_pubsub.c', line 796

VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
  VALUE rb_ch, rb_msg, rb_engine = Qnil;
  const pubsub_engine_s *engine = NULL;
  switch (argc) {
  case 3:
    /* fallthrough */
    rb_engine = argv[2];
  case 2:
    rb_ch = argv[0];
    rb_msg = argv[1];
    break;
  case 1: {
    /* single argument must be a Hash */
    Check_Type(argv[0], T_HASH);
    rb_ch = rb_hash_aref(argv[0], to_sym_id);
    if (rb_ch == Qnil || rb_ch == Qfalse) {
      rb_ch = rb_hash_aref(argv[0], channel_sym_id);
    }
    rb_msg = rb_hash_aref(argv[0], message_sym_id);
    rb_engine = rb_hash_aref(argv[0], engine_varid);
  } break;
  default:
    rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
  }

  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required.");
  }
  Check_Type(rb_msg, T_STRING);

  if (rb_ch == Qnil || rb_ch == Qfalse)
    rb_raise(rb_eArgError, "target / channel is required .");
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  if (rb_engine == Qfalse) {
    engine = PUBSUB_PROCESS_ENGINE;
  } else if (rb_engine == Qnil) {
    engine = NULL;
  } else {
    engine = iodine_engine_ruby2facil(rb_engine);
  }

  FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
  FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));

  intptr_t ret =
      pubsub_publish(.engine = engine, .channel = ch, .message = msg);
  fiobj_free(ch);
  fiobj_free(msg);
  if (!ret)
    return Qfalse;
  return Qtrue;
  (void)self;
}

#subscribe(*args) ⇒ Object

Subscribes the connection to a Pub/Sub channel.

The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:

subscribe("my_stream") {|from, msg| p msg }
subscribe("my_stream", match: :redis) {|from, msg| p msg }
subscribe(to: "my_stream")  {|from, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc

The first argument must be either a String or a Hash.

The second, optional, argument must be a Hash (if given).

The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):

:match

The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.

:to

The channel / subject to subscribe to.

:as

valid for WebSocket connections only. can be either ‘:text` or `:binary`. `:text` is the default transport for pub/sub events.

Returns an PubSub::Subscription object that answers to:

#.close

closes the connection.

#.to_s

returns the subscription’s target (stream / channel / subject).

#.==(str)

returns true if the string is an exact match for the target (even if the target itself is a pattern).



284
285
286
287
288
289
# File 'ext/iodine/iodine_websockets.c', line 284

static VALUE iodine_ws_subscribe(int argc, VALUE *argv, VALUE self) {
  // clang-format on
  ws_s *owner = get_ws(self);
  return iodine_subscribe(argc, argv, owner,
                          (iodine_pubsub_type_e)iodine_get_cdata(self));
}

#write(data) ⇒ Object

Writes data to the websocket.

Returns ‘true` on success or `false if the websocket was closed or an error occurred.

‘write` will return immediately, adding the data to the outgoing queue.

If the connection is closed, ‘write` will raise an exception.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'ext/iodine/iodine_websockets.c', line 89

static VALUE iodine_ws_write(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  void *ws = get_ws(self);
  iodine_pubsub_type_e c_type = (iodine_pubsub_type_e)iodine_get_cdata(self);
  if (!ws || !c_type) {
    rb_raise(rb_eIOError, "Connection is closed");
    return Qfalse;
  }
  switch (c_type) {
  case IODINE_PUBSUB_WEBSOCKET:
    /* WebSockets*/
    if (((protocol_s *)ws)->service != WEBSOCKET_ID_STR)
      goto error;
    websocket_write(ws, RSTRING_PTR(data), RSTRING_LEN(data),
                    rb_enc_get(data) == IodineUTF8Encoding);
    return Qtrue;
    break;
  case IODINE_PUBSUB_SSE:
    /* SSE */
    http_sse_write(
        ws, .data = {.data = RSTRING_PTR(data), .len = RSTRING_LEN(data)});
    return Qtrue;
    break;
  case IODINE_PUBSUB_GLOBAL:
  /* fallthrough */
  default:
  error:
    rb_raise(rb_eIOError, "Connection is closed");
    return Qfalse;
  }
  return Qfalse;
}