Module: Iodine::Protocol

Defined in:
lib/iodine/protocol.rb,
ext/iodine/iodine_protocol.c

Overview

The Protocol class is used only for documenting the Protocol API, it will not be included when requiring ‘iodine`.

A dynamic (stateful) protocol is defined as a Ruby class instance which is in control of one single connection.

It is called dynamic because it is dynamically allocated for each connection and then discarded, also it sounded better then calling it “the stateful protocol”, even though that’s what it actually is.

It is (mostly) thread-safe as long as it’s operations are limited to the scope of the object.

The Callbacks

A protocol class MUST contain ONE of the following callbacks:

on_data

called whened there’s data available to be read, but no data was read just yet. ‘on_data` will not be called again untill all the existing network buffer was read (edge triggered event).

on_message(buffer)

the default ‘on_data` implementation creates a 1Kb buffer and reads data while recycling the same String memory space. The buffer is forwarded to the `on_message` callback before being recycled. The buffer object will be over-written once `on_message` returns, so creating a persistent copy requires `buffer.dup`.

A protocol class MAY contain any of the following optional callbacks:

on_open

called after a new connection was accepted and the protocol was linked with Iodine’s Protocol API. Initialization should be performed here.

ping

called whenever timeout was reached. The default implementation will close the connection unless a protocol task (#defer, ‘on_data` or `on_message`) are busy in the background.

on_shutdown

called if the connection is still open while the server is shutting down. This allows the protocol to send a “going away” frame before the connection is closed and ‘on_close` is called.

on_close

called after a connection was closed, for any cleanup (if any).

WARNING: for thread safety and connection management, ‘on_open`, `on_shutdown`, `on_close` and `ping` will all be performed within the reactor’s main thread. Do not run long running tasks within these callbacks, or the server might block while you do. Use #defer to run protocol related tasks (this locks the connection, preventing it from running more then one task at a time and offering thread safety), or #run to run asynchronous tasks that aren’t protocol related.

Connection timeouts

By setting a class variable called ‘@timeout` it is possible to define a default timeout for new connections. However, changing this default timeout should be performed using the #timeout methods.

The API:

After a new connection is accepted and a new protocol object is created, the protocol will be linked with Iodine’s Protocol API. Only the main protocol will be able to access the API within ‘initialize`, so it’s best to use ‘on_open` for any Initialization required.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.defer(*args) ⇒ Object

Schedules a block to execute (defers the blocks execution).

When this function is called by a Protocol instance, a lock on the connection will be used to prevent multiple tasks / callbacks from running concurrently. i.e.

defer { write "this will run in a lock" }

Otherwise, the deferred task will run acconrding to the requested concurrency model.

Iodine.defer { puts "this will run concurrently" }
Iodine.run { puts "this will run concurrently" }

Tasks scheduled before calling Iodine.start will run once for every process.

Returns the block given (or ‘false`).

Notice*: There’s a possibility that the rask will never be called if it was associated with a specific connection (the method was called as an instance method) and the connection was closed before the deferred task was performed.



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'ext/iodine/iodine_protocol.c', line 379

static VALUE dyn_defer(int argc, VALUE *argv, VALUE self) {
  rb_need_block();
  intptr_t fd;
  // check arguments.
  if (argc > 1)
    rb_raise(rb_eArgError, "this function expects no more then 1 (optional) "
                           "argument.");
  else if (argc == 1) {
    Check_Type(*argv, T_FIXNUM);
    fd = FIX2LONG(*argv);
  } else
    fd = iodine_get_fd(self);

  if (!sock_isvalid(fd))
    return Qfalse;

  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  facil_defer(.uuid = fd, .task = iodine_perform_task_and_free,
              .type = FIO_PR_LOCK_TASK, .arg = (void *)block,
              .fallback = iodine_clear_task);
  return block;
}

Instance Method Details

#closeObject

Closes a connection.

The connection will be closed only once all the data was sent.

Returns self.



323
324
325
326
327
# File 'ext/iodine/iodine_protocol.c', line 323

static VALUE dyn_close(VALUE self) {
  intptr_t fd = iodine_get_fd(self);
  sock_close(fd);
  return self;
}

#conn_idObject

Returns a connection’s localized ID which is valid for *this process* (not a machine or internet unique value).

Once the connection is closed and the ‘on_close` callback was called, this method returns `nil`.

This can be used together with a true process wide UUID to uniquely identify a connection across the internet.



339
340
341
342
343
344
# File 'ext/iodine/iodine_protocol.c', line 339

static VALUE dyn_uuid(VALUE self) {
  intptr_t uuid = iodine_get_fd(self);
  if (!uuid || uuid == -1)
    return Qnil;
  return LONG2FIX(uuid);
}

#defer(*args) ⇒ Object

Schedules a block to execute (defers the blocks execution).

When this function is called by a Protocol instance, a lock on the connection will be used to prevent multiple tasks / callbacks from running concurrently. i.e.

defer { write "this will run in a lock" }

Otherwise, the deferred task will run acconrding to the requested concurrency model.

Iodine.defer { puts "this will run concurrently" }
Iodine.run { puts "this will run concurrently" }

Tasks scheduled before calling Iodine.start will run once for every process.

Returns the block given (or ‘false`).

Notice*: There’s a possibility that the rask will never be called if it was associated with a specific connection (the method was called as an instance method) and the connection was closed before the deferred task was performed.



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'ext/iodine/iodine_protocol.c', line 379

static VALUE dyn_defer(int argc, VALUE *argv, VALUE self) {
  rb_need_block();
  intptr_t fd;
  // check arguments.
  if (argc > 1)
    rb_raise(rb_eArgError, "this function expects no more then 1 (optional) "
                           "argument.");
  else if (argc == 1) {
    Check_Type(*argv, T_FIXNUM);
    fd = FIX2LONG(*argv);
  } else
    fd = iodine_get_fd(self);

  if (!sock_isvalid(fd))
    return Qfalse;

  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  facil_defer(.uuid = fd, .task = iodine_perform_task_and_free,
              .type = FIO_PR_LOCK_TASK, .arg = (void *)block,
              .fallback = iodine_clear_task);
  return block;
}

#on_closeObject

Override this callback to handle the event.



74
75
76
77
# File 'ext/iodine/iodine_protocol.c', line 74

static VALUE not_implemented(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_dataObject

A default on_data implementation will read up to 1Kb into a reusable buffer from the socket and call the ‘on_message` callback.

It is recommended that you implement this callback if messages might require more then 1Kb of space.



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'ext/iodine/iodine_protocol.c', line 108

static VALUE default_on_data(VALUE self) {
  VALUE buff = rb_ivar_get(self, iodine_buff_var_id);
  if (buff == Qnil) {
    rb_ivar_set(self, iodine_buff_var_id, (buff = rb_str_buf_new(1024)));
  }
  do {
    dyn_read(1, &buff, self);
    if (!RSTRING_LEN(buff))
      return Qnil;
    rb_funcall(self, iodine_on_message_func_id, 1, buff);
  } while (RSTRING_LEN(buff) == (ssize_t)rb_str_capacity(buff));
  return Qnil;
}

#on_drainedObject

Override this callback to handle the event.



87
88
89
90
91
# File 'ext/iodine/iodine_protocol.c', line 87

static VALUE not_implemented_drained(VALUE self) {
  RubyCaller.call(self, rb_intern2("on_ready", 8));
  (void)(self);
  return Qnil;
}

#on_message(data) ⇒ Object

Override this callback to handle the event.



94
95
96
97
98
# File 'ext/iodine/iodine_protocol.c', line 94

static VALUE not_implemented2(VALUE self, VALUE data) {
  (void)(self);
  (void)(data);
  return Qnil;
}

#on_openObject

Override this callback to handle the event.



74
75
76
77
# File 'ext/iodine/iodine_protocol.c', line 74

static VALUE not_implemented(VALUE self) {
  (void)(self);
  return Qnil;
}

#on_ready(data) ⇒ Object

DEPRECATED! Please override #on_drained instead.



80
81
82
83
84
# File 'ext/iodine/iodine_protocol.c', line 80

static VALUE not_implemented_on_ready(VALUE self, VALUE data) {
  (void)(self);
  (void)(data);
  return Qnil;
}

#on_shutdownObject

Override this callback to handle the event.



74
75
76
77
# File 'ext/iodine/iodine_protocol.c', line 74

static VALUE not_implemented(VALUE self) {
  (void)(self);
  return Qnil;
}

#open?Boolean

Returns true if the connection is open and false if closed.

Returns:

  • (Boolean)


349
350
351
352
353
354
# File 'ext/iodine/iodine_protocol.c', line 349

static VALUE dyn_is_open(VALUE self) {
  intptr_t uuid = iodine_get_fd(self);
  if (uuid && sock_isvalid(uuid))
    return Qtrue;
  return Qfalse;
}

#pingObject

Override this callback to handle the event. The default implementation will close the connection.



69
70
71
72
# File 'ext/iodine/iodine_protocol.c', line 69

static VALUE not_implemented_ping(VALUE self) {
  sock_close(iodine_get_fd(self));
  return Qnil;
}

#publish(*args) ⇒ Object

Publishes a message to a channel.

Can be used using two Strings:

publish(to, message)

The method accepts an optional ‘engine` argument:

publish(to, message, my_pubsub_engine)

Alternatively, accepts the following named arguments:

:to

The channel to publish to (required).

:message

The message to be published (required).

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.



796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
# File 'ext/iodine/iodine_pubsub.c', line 796

VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
  VALUE rb_ch, rb_msg, rb_engine = Qnil;
  const pubsub_engine_s *engine = NULL;
  switch (argc) {
  case 3:
    /* fallthrough */
    rb_engine = argv[2];
  case 2:
    rb_ch = argv[0];
    rb_msg = argv[1];
    break;
  case 1: {
    /* single argument must be a Hash */
    Check_Type(argv[0], T_HASH);
    rb_ch = rb_hash_aref(argv[0], to_sym_id);
    if (rb_ch == Qnil || rb_ch == Qfalse) {
      rb_ch = rb_hash_aref(argv[0], channel_sym_id);
    }
    rb_msg = rb_hash_aref(argv[0], message_sym_id);
    rb_engine = rb_hash_aref(argv[0], engine_varid);
  } break;
  default:
    rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
  }

  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required.");
  }
  Check_Type(rb_msg, T_STRING);

  if (rb_ch == Qnil || rb_ch == Qfalse)
    rb_raise(rb_eArgError, "target / channel is required .");
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  if (rb_engine == Qfalse) {
    engine = PUBSUB_PROCESS_ENGINE;
  } else if (rb_engine == Qnil) {
    engine = NULL;
  } else {
    engine = iodine_engine_ruby2facil(rb_engine);
  }

  FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
  FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));

  intptr_t ret =
      pubsub_publish(.engine = engine, .channel = ch, .message = msg);
  fiobj_free(ch);
  fiobj_free(msg);
  if (!ret)
    return Qfalse;
  return Qtrue;
  (void)self;
}

#read(*args) ⇒ Object

Reads up to ‘n` bytes from the network connection. The number of bytes to be read (n) is:

  • the number of bytes set in the optional ‘buffer_or_length` argument.

  • the String capacity (not length) of the String passed as the optional ‘buffer_or_length` argument.

  • 1024 Bytes (1Kb) if the optional ‘buffer_or_length` is either missing or contains a String with a capacity less then 1Kb.

Returns a String (either the same one used as the buffer or a new one) on a successful read. Returns ‘nil` if no data was available.



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'ext/iodine/iodine_protocol.c', line 191

static VALUE dyn_read(int argc, VALUE *argv, VALUE self) {
  if (argc > 1) {
    rb_raise(
        rb_eArgError,
        "read accepts only one argument - a Fixnum (buffer length) or a String "
        "(it's capacity - or 1Kb, whichever's the higher - will be used as "
        "buffer's length).");
    return Qnil;
  }
  VALUE buffer = (argc == 1 ? argv[0] : Qnil);
  if (buffer != Qnil && TYPE(buffer) != T_FIXNUM && TYPE(buffer) != T_STRING) {
    rb_raise(rb_eTypeError,
             "buffer should either be a length (a new string will be created) "
             "or a string (reading will be limited to the original string's "
             "capacity or 1Kb - whichever the larger).");
    return Qnil;
  }
  VALUE str;
  long len;
  intptr_t fd = iodine_get_fd(self);
  if (buffer == Qnil) {
    buffer = LONG2FIX(1024);
  }
  if (TYPE(buffer) == T_FIXNUM) {
    len = FIX2LONG(buffer);
    if (len <= 0)
      len = 1024;
    str = rb_str_buf_new(len);
    // create a rb_String with X length and take it's pointer
    // rb_str_resize(VALUE str, long len)
    // RSTRING_PTR(str)
  } else {
    // take the string's pointer and length
    len = rb_str_capacity(buffer);
    // make sure the string is modifiable
    rb_str_modify(buffer);
    // resize the string if needed.
    if (len < 1024)
      rb_str_resize(buffer, (len = 1024));
    str = buffer;
  }
  ssize_t in = sock_read(fd, RSTRING_PTR(str), len);
  // make sure it's binary encoded
  rb_enc_associate_index(str, IodineBinaryEncodingIndex);
  // set actual size....
  if (in > 0)
    rb_str_set_len(str, (long)in);
  else {
    rb_str_set_len(str, 0);
    str = Qnil;
  }
  // return empty string? or fix above if to return Qnil?
  return str;
}

#subscribe(*args) ⇒ Object

Subscribes the connection to a Pub/Sub channel.

Since this connection’s data packaging is unknown, a block (or handler) is required to handle pub/sub events.

The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:

subscribe("my_stream") {|from, msg| p msg }
subscribe("my_stream", match: :redis) {|from, msg| p msg }
subscribe(to: "my_stream")  {|from, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc

The first argument must be either a String or a Hash.

The second, optional, argument must be a Hash (if given).

The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):

:match

The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.

:to

The channel / subject to subscribe to.

:handler

and Proc like object, must answer to ‘call(from, msg)`.

Returns an Iodine::PubSub::Subscription object that answers to:

close

closes the connection.

to_s

returns the subscription’s target (stream / channel / subject).

(str) :: returns true if the string is an exact match for the target (even if the target itself is a pattern).



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'ext/iodine/iodine_protocol.c', line 159

static VALUE iodine_proto_subscribe(int argc, VALUE *argv, VALUE self) {
  // clang-format on
  intptr_t uuid = iodine_get_fd(self);
  if (!uuid || (VALUE)uuid == Qnil || uuid < 0)
    return Qfalse;
  VALUE sub = iodine_subscribe(argc, argv, NULL, IODINE_PUBSUB_GLOBAL);
  if (sub == Qnil || sub == Qfalse)
    return Qfalse;
  Registry.add(sub);

  iodine_protocol_s *pr = iodine_get_cdata(self);

  fio_ls_push(&pr->subscriptions, (void *)sub);
  return sub;
}

#switch_protocol(handler) ⇒ Object



523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'ext/iodine/iodine_protocol.c', line 523

VALUE dyn_switch_prot(VALUE self, VALUE handler) {
  uint8_t timeout;
  intptr_t fd = iodine_get_fd(self);
  if (TYPE(handler) == T_CLASS) {
    // get the timeout
    VALUE rb_tout = rb_ivar_get(handler, iodine_timeout_var_id);
    timeout = (TYPE(rb_tout) == T_FIXNUM) ? FIX2UINT(rb_tout) : 0;
    // include the Protocol module, preventing coder errors
    rb_include_module(handler, IodineProtocol);
    handler = RubyCaller.call(handler, iodine_new_func_id);
  } else {
    // include the Protocol module in the object's class
    VALUE p_class = rb_obj_class(handler);
    // include the Protocol module, preventing coder errors
    rb_include_module(p_class, IodineProtocol);
    // get the timeout
    VALUE rb_tout = rb_ivar_get(p_class, iodine_timeout_var_id);
    if (rb_tout == Qnil)
      rb_tout = rb_ivar_get(handler, iodine_timeout_var_id);
    timeout = (TYPE(rb_tout) == T_FIXNUM) ? FIX2UINT(rb_tout) : 0;
  }
  if (facil_attach(fd, dyn_set_protocol(fd, handler, timeout)))
    return Qnil;
  return handler;
}

#timeoutObject

Returns the connection’s timeout.



309
310
311
312
313
314
# File 'ext/iodine/iodine_protocol.c', line 309

static VALUE dyn_get_timeout(VALUE self) {
  intptr_t fd = iodine_get_fd(self);
  uint8_t tout = facil_get_timeout(fd);
  unsigned int tout_int = tout;
  return UINT2NUM(tout_int);
}

#timeout=(timeout) ⇒ Object

Update’s a connection’s timeout.

Returns self.



297
298
299
300
301
302
303
304
# File 'ext/iodine/iodine_protocol.c', line 297

static VALUE dyn_set_timeout(VALUE self, VALUE timeout) {
  intptr_t fd = iodine_get_fd(self);
  unsigned int tout = FIX2UINT(timeout);
  if (tout > 255)
    tout = 255;
  facil_set_timeout(fd, tout);
  return self;
}

#write(data) ⇒ Object

Writes data to the connection. Returns ‘false` on error and `self` on success.



249
250
251
252
253
254
255
256
# File 'ext/iodine/iodine_protocol.c', line 249

static VALUE dyn_write(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  intptr_t fd = iodine_get_fd(self);
  if (sock_write(fd, RSTRING_PTR(data), RSTRING_LEN(data))) {
    return Qfalse;
  }
  return self;
}

#write!(data) ⇒ Object

Moves a String to iodine’s socket’s buffer. This is a zero-copy write and requires that the string remain unchanged during the process.

For example, Strings received by ‘on_message` can’t be used, because they use a recyclable buffer and they will be destroyed once ‘on_message` returns.



265
266
267
268
269
270
271
272
273
274
# File 'ext/iodine/iodine_protocol.c', line 265

static VALUE dyn_write_move(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  Registry.add(data);
  intptr_t fd = iodine_get_fd(self);
  if (sock_write2(.uuid = fd, .buffer = RSTRING_PTR(data),
                  .length = RSTRING_LEN(data),
                  .dealloc = (void (*)(void *))Registry.remove))
    return Qfalse;
  return self;
}

#write_urgent(data) ⇒ Object

Writes data to the connection. The data will be sent as soon as possible without fragmantation of previously scheduled data.

Returns ‘false` on error and `self` on success.



282
283
284
285
286
287
288
289
290
# File 'ext/iodine/iodine_protocol.c', line 282

static VALUE dyn_write_urgent(VALUE self, VALUE data) {
  Check_Type(data, T_STRING);
  intptr_t fd = iodine_get_fd(self);
  Registry.add(data);
  if (sock_write(fd, RSTRING_PTR(data), RSTRING_LEN(data))) {
    return Qfalse;
  }
  return self;
}