Module: Iodine::Protocol
- Defined in:
- lib/iodine/protocol.rb,
ext/iodine/iodine_protocol.c
Overview
The Protocol class is used only for documenting the Protocol API, it will not be included when requiring ‘iodine`.
A dynamic (stateful) protocol is defined as a Ruby class instance which is in control of one single connection.
It is called dynamic because it is dynamically allocated for each connection and then discarded, also it sounded better then calling it “the stateful protocol”, even though that’s what it actually is.
It is (mostly) thread-safe as long as it’s operations are limited to the scope of the object.
The Callbacks
A protocol class MUST contain ONE of the following callbacks:
- on_data
-
called whened there’s data available to be read, but no data was read just yet. ‘on_data` will not be called again untill all the existing network buffer was read (edge triggered event).
- on_message(buffer)
-
the default ‘on_data` implementation creates a 1Kb buffer and reads data while recycling the same String memory space. The buffer is forwarded to the `on_message` callback before being recycled. The buffer object will be over-written once `on_message` returns, so creating a persistent copy requires `buffer.dup`.
A protocol class MAY contain any of the following optional callbacks:
- on_open
-
called after a new connection was accepted and the protocol was linked with Iodine’s Protocol API. Initialization should be performed here.
- ping
-
called whenever timeout was reached. The default implementation will close the connection unless a protocol task (#defer, ‘on_data` or `on_message`) are busy in the background.
- on_shutdown
-
called if the connection is still open while the server is shutting down. This allows the protocol to send a “going away” frame before the connection is closed and ‘on_close` is called.
- on_close
-
called after a connection was closed, for any cleanup (if any).
WARNING: for thread safety and connection management, ‘on_open`, `on_shutdown`, `on_close` and `ping` will all be performed within the reactor’s main thread. Do not run long running tasks within these callbacks, or the server might block while you do. Use #defer to run protocol related tasks (this locks the connection, preventing it from running more then one task at a time and offering thread safety), or #run to run asynchronous tasks that aren’t protocol related.
Connection timeouts
By setting a class variable called ‘@timeout` it is possible to define a default timeout for new connections. However, changing this default timeout should be performed using the #timeout methods.
The API:
After a new connection is accepted and a new protocol object is created, the protocol will be linked with Iodine’s Protocol API. Only the main protocol will be able to access the API within ‘initialize`, so it’s best to use ‘on_open` for any Initialization required.
Class Method Summary collapse
-
.count ⇒ Object
Returns the number of total connections managed by Iodine.
-
.defer(*args) ⇒ Object
Schedules a block to execute (defers the blocks execution).
-
.each ⇒ Object
Runs a task for each dynamic connection (not websockets or HTTP).
Instance Method Summary collapse
-
#close ⇒ Object
Closes a connection.
-
#conn_id ⇒ Object
Returns a connection’s localized ID which is valid for *this process* (not a machine or internet unique value).
-
#count ⇒ Object
Returns the number of total connections managed by Iodine.
-
#defer(*args) ⇒ Object
Schedules a block to execute (defers the blocks execution).
-
#on_close ⇒ Object
Override this callback to handle the event.
- #on_data ⇒ Object
-
#on_message(data) ⇒ Object
Override this callback to handle the event.
-
#on_open ⇒ Object
Override this callback to handle the event.
-
#on_ready ⇒ Object
Override this callback to handle the event.
-
#on_shutdown ⇒ Object
Override this callback to handle the event.
-
#open? ⇒ Boolean
Returns true if the connection is open and false if closed.
-
#ping ⇒ Object
Override this callback to handle the event.
-
#read(*args) ⇒ Object
Reads ‘n` bytes from the network connection.
- #switch_protocol(handler) ⇒ Object
-
#timeout ⇒ Object
Returns the connection’s timeout.
-
#timeout=(timeout) ⇒ Object
Update’s a connection’s timeout.
-
#write(data) ⇒ Object
Writes data to the connection.
-
#write!(data) ⇒ Object
Moves a String to iodine’s socket’s buffer.
-
#write_urgent(data) ⇒ Object
Writes data to the connection.
Class Method Details
.count ⇒ Object
Returns the number of total connections managed by Iodine.
102 103 104 105 106 |
# File 'ext/iodine/iodine_protocol.c', line 102 static VALUE dyn_count(VALUE self) { size_t count = facil_count(iodine_protocol_service); return ULL2NUM(count); (void)self; } |
.defer(*args) ⇒ Object
Schedules a block to execute (defers the blocks execution).
When this function is called by a Protocol instance, a lock on the connection will be used to prevent multiple tasks / callbacks from running concurrently. i.e.
defer { write "this will run in a lock" }
Otherwise, the deferred task will run acconrding to the requested concurrency model.
Iodine.defer { puts "this will run concurrently" }
Iodine.run { puts "this will run concurrently" }
Tasks scheduled before calling Iodine.start will run once for every process.
Returns the block given (or ‘false`).
Notice*: There’s a possibility that the rask will never be called if it was associated with a specific connection (the method was called as an instance method) and the connection was closed before the deferred task was performed.
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'ext/iodine/iodine_protocol.c', line 333 static VALUE dyn_defer(int argc, VALUE *argv, VALUE self) { rb_need_block(); intptr_t fd; // check arguments. if (argc > 1) rb_raise(rb_eArgError, "this function expects no more then 1 (optional) " "argument."); else if (argc == 1) { Check_Type(*argv, T_FIXNUM); fd = FIX2LONG(*argv); } else fd = iodine_get_fd(self); if (!sock_isvalid(fd)) return Qfalse; VALUE block = rb_block_proc(); if (block == Qnil) return Qfalse; Registry.add(block); facil_defer(.uuid = fd, .task = iodine_perform_task_and_free, .task_type = FIO_PR_LOCK_TASK, .arg = (void *)block, .fallback = iodine_clear_task); return block; } |
.each ⇒ Object
Runs a task for each dynamic connection (not websockets or HTTP).
Requires a block and returns it as an object.
i.e., will write to every open dynamic connection:
Iodine.each {|obj| obj.write "hello!" }
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'ext/iodine/iodine_protocol.c', line 118 static VALUE dyn_run_each(VALUE self) { rb_need_block(); VALUE block = rb_block_proc(); if (block == Qnil) return Qfalse; intptr_t origin = iodine_get_fd(self); if (!origin) origin = -1; Registry.add(block); facil_each(.arg = (void *)block, .service = "IodineDynamic", .origin = origin, .task_type = FIO_PR_LOCK_TASK, .task = iodine_perform_task, .on_complete = iodine_clear_task); return block; } |
Instance Method Details
#close ⇒ Object
Closes a connection.
The connection will be closed only once all the data was sent.
Returns self.
277 278 279 280 281 |
# File 'ext/iodine/iodine_protocol.c', line 277 static VALUE dyn_close(VALUE self) { intptr_t fd = iodine_get_fd(self); sock_close(fd); return self; } |
#conn_id ⇒ Object
Returns a connection’s localized ID which is valid for *this process* (not a machine or internet unique value).
Once the connection is closed and the ‘on_close` callback was called, this method returns `nil`.
This can be used together with a true process wide UUID to uniquely identify a connection across the internet.
293 294 295 296 297 298 |
# File 'ext/iodine/iodine_protocol.c', line 293 static VALUE dyn_uuid(VALUE self) { intptr_t uuid = iodine_get_fd(self); if (!uuid || uuid == -1) return Qnil; return LONG2FIX(uuid); } |
#count ⇒ Object
Returns the number of total connections managed by Iodine.
102 103 104 105 106 |
# File 'ext/iodine/iodine_protocol.c', line 102 static VALUE dyn_count(VALUE self) { size_t count = facil_count(iodine_protocol_service); return ULL2NUM(count); (void)self; } |
#defer(*args) ⇒ Object
Schedules a block to execute (defers the blocks execution).
When this function is called by a Protocol instance, a lock on the connection will be used to prevent multiple tasks / callbacks from running concurrently. i.e.
defer { write "this will run in a lock" }
Otherwise, the deferred task will run acconrding to the requested concurrency model.
Iodine.defer { puts "this will run concurrently" }
Iodine.run { puts "this will run concurrently" }
Tasks scheduled before calling Iodine.start will run once for every process.
Returns the block given (or ‘false`).
Notice*: There’s a possibility that the rask will never be called if it was associated with a specific connection (the method was called as an instance method) and the connection was closed before the deferred task was performed.
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'ext/iodine/iodine_protocol.c', line 333 static VALUE dyn_defer(int argc, VALUE *argv, VALUE self) { rb_need_block(); intptr_t fd; // check arguments. if (argc > 1) rb_raise(rb_eArgError, "this function expects no more then 1 (optional) " "argument."); else if (argc == 1) { Check_Type(*argv, T_FIXNUM); fd = FIX2LONG(*argv); } else fd = iodine_get_fd(self); if (!sock_isvalid(fd)) return Qfalse; VALUE block = rb_block_proc(); if (block == Qnil) return Qfalse; Registry.add(block); facil_defer(.uuid = fd, .task = iodine_perform_task_and_free, .task_type = FIO_PR_LOCK_TASK, .arg = (void *)block, .fallback = iodine_clear_task); return block; } |
#on_close ⇒ Object
Override this callback to handle the event.
63 64 65 66 |
# File 'ext/iodine/iodine_protocol.c', line 63 static VALUE not_implemented(VALUE self) { (void)(self); return Qnil; } |
#on_data ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'ext/iodine/iodine_protocol.c', line 83 static VALUE default_on_data(VALUE self) { VALUE buff = rb_ivar_get(self, iodine_buff_var_id); if (buff == Qnil) { rb_ivar_set(self, iodine_buff_var_id, (buff = rb_str_buf_new(1024))); } do { dyn_read(1, &buff, self); if (!RSTRING_LEN(buff)) return Qnil; rb_funcall(self, , 1, buff); } while (RSTRING_LEN(buff) == (ssize_t)rb_str_capacity(buff)); return Qnil; } |
#on_message(data) ⇒ Object
Override this callback to handle the event.
69 70 71 72 73 |
# File 'ext/iodine/iodine_protocol.c', line 69 static VALUE not_implemented2(VALUE self, VALUE data) { (void)(self); (void)(data); return Qnil; } |
#on_open ⇒ Object
Override this callback to handle the event.
63 64 65 66 |
# File 'ext/iodine/iodine_protocol.c', line 63 static VALUE not_implemented(VALUE self) { (void)(self); return Qnil; } |
#on_ready ⇒ Object
Override this callback to handle the event.
63 64 65 66 |
# File 'ext/iodine/iodine_protocol.c', line 63 static VALUE not_implemented(VALUE self) { (void)(self); return Qnil; } |
#on_shutdown ⇒ Object
Override this callback to handle the event.
63 64 65 66 |
# File 'ext/iodine/iodine_protocol.c', line 63 static VALUE not_implemented(VALUE self) { (void)(self); return Qnil; } |
#open? ⇒ Boolean
Returns true if the connection is open and false if closed.
303 304 305 306 307 308 |
# File 'ext/iodine/iodine_protocol.c', line 303 static VALUE dyn_is_open(VALUE self) { intptr_t uuid = iodine_get_fd(self); if (uuid && sock_isvalid(uuid)) return Qtrue; return Qfalse; } |
#ping ⇒ Object
Override this callback to handle the event. The default implementation will close the connection.
58 59 60 61 |
# File 'ext/iodine/iodine_protocol.c', line 58 static VALUE not_implemented_ping(VALUE self) { sock_close(iodine_get_fd(self)); return Qnil; } |
#read(*args) ⇒ Object
Reads ‘n` bytes from the network connection. The number of bytes to be read (n) is:
-
the number of bytes set in the optional ‘buffer_or_length` argument.
-
the String capacity (not length) of the String passed as the optional ‘buffer_or_length` argument.
-
1024 Bytes (1Kb) if the optional ‘buffer_or_length` is either missing or contains a String who’s capacity is less then 1Kb.
Returns a String (either the same one used as the buffer or a new one) on a successful read. Returns ‘nil` if no data was available.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'ext/iodine/iodine_protocol.c', line 145 static VALUE dyn_read(int argc, VALUE *argv, VALUE self) { if (argc > 1) { rb_raise( rb_eArgError, "read accepts only one argument - a Fixnum (buffer length) or a String " "(it's capacity - or 1Kb, whichever's the higher - will be used as " "buffer's length)."); return Qnil; } VALUE buffer = (argc == 1 ? argv[0] : Qnil); if (buffer != Qnil && TYPE(buffer) != T_FIXNUM && TYPE(buffer) != T_STRING) { rb_raise(rb_eTypeError, "buffer should either be a length (a new string will be created) " "or a string (reading will be limited to the original string's " "capacity or 1Kb - whichever the larger)."); return Qnil; } VALUE str; long len; intptr_t fd = iodine_get_fd(self); if (buffer == Qnil) { buffer = LONG2FIX(1024); } if (TYPE(buffer) == T_FIXNUM) { len = FIX2LONG(buffer); if (len <= 0) len = 1024; str = rb_str_buf_new(len); // create a rb_String with X length and take it's pointer // rb_str_resize(VALUE str, long len) // RSTRING_PTR(str) } else { // take the string's pointer and length len = rb_str_capacity(buffer); // make sure the string is modifiable rb_str_modify(buffer); // resize the string if needed. if (len < 1024) rb_str_resize(buffer, (len = 1024)); str = buffer; } ssize_t in = sock_read(fd, RSTRING_PTR(str), len); // make sure it's binary encoded rb_enc_associate_index(str, IodineBinaryEncodingIndex); // set actual size.... if (in > 0) rb_str_set_len(str, (long)in); else { rb_str_set_len(str, 0); str = Qnil; } // return empty string? or fix above if to return Qnil? return str; } |
#switch_protocol(handler) ⇒ Object
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 |
# File 'ext/iodine/iodine_protocol.c', line 459 VALUE dyn_switch_prot(VALUE self, VALUE handler) { uint8_t timeout; intptr_t fd = iodine_get_fd(self); if (TYPE(handler) == T_CLASS) { // get the timeout VALUE rb_tout = rb_ivar_get(handler, iodine_timeout_var_id); timeout = (TYPE(rb_tout) == T_FIXNUM) ? FIX2UINT(rb_tout) : 0; // include the Protocol module, preventing coder errors rb_include_module(handler, IodineProtocol); handler = RubyCaller.call(handler, iodine_new_func_id); } else { // include the Protocol module in the object's class VALUE p_class = rb_obj_class(handler); // include the Protocol module, preventing coder errors rb_include_module(p_class, IodineProtocol); // get the timeout VALUE rb_tout = rb_ivar_get(p_class, iodine_timeout_var_id); if (rb_tout == Qnil) rb_tout = rb_ivar_get(handler, iodine_timeout_var_id); timeout = (TYPE(rb_tout) == T_FIXNUM) ? FIX2UINT(rb_tout) : 0; } if (facil_attach(fd, dyn_set_protocol(fd, handler, timeout))) return Qnil; return handler; } |
#timeout ⇒ Object
Returns the connection’s timeout.
263 264 265 266 267 268 |
# File 'ext/iodine/iodine_protocol.c', line 263 static VALUE dyn_get_timeout(VALUE self) { intptr_t fd = iodine_get_fd(self); uint8_t tout = facil_get_timeout(fd); unsigned int tout_int = tout; return UINT2NUM(tout_int); } |
#timeout=(timeout) ⇒ Object
Update’s a connection’s timeout.
Returns self.
251 252 253 254 255 256 257 258 |
# File 'ext/iodine/iodine_protocol.c', line 251 static VALUE dyn_set_timeout(VALUE self, VALUE timeout) { intptr_t fd = iodine_get_fd(self); unsigned int tout = FIX2UINT(timeout); if (tout > 255) tout = 255; facil_set_timeout(fd, tout); return self; } |
#write(data) ⇒ Object
Writes data to the connection. Returns ‘false` on error and `self` on success.
203 204 205 206 207 208 209 210 |
# File 'ext/iodine/iodine_protocol.c', line 203 static VALUE dyn_write(VALUE self, VALUE data) { Check_Type(data, T_STRING); intptr_t fd = iodine_get_fd(self); if (sock_write(fd, RSTRING_PTR(data), RSTRING_LEN(data))) { return Qfalse; } return self; } |
#write!(data) ⇒ Object
Moves a String to iodine’s socket’s buffer. This is a zero-copy write and requires that the string remain unchanged during the process.
For example, Strings received by ‘on_message` can’t be used, because they use a recyclable buffer and they will be destroyed once ‘on_message` returns.
219 220 221 222 223 224 225 226 227 228 |
# File 'ext/iodine/iodine_protocol.c', line 219 static VALUE dyn_write_move(VALUE self, VALUE data) { Check_Type(data, T_STRING); Registry.add(data); intptr_t fd = iodine_get_fd(self); if (sock_write2(.uuid = fd, .buffer = RSTRING_PTR(data), .length = RSTRING_LEN(data), .move = 1, .dealloc = (void (*)(void *))Registry.remove)) return Qfalse; return self; } |
#write_urgent(data) ⇒ Object
Writes data to the connection. The data will be sent as soon as possible without fragmantation of previously scheduled data.
Returns ‘false` on error and `self` on success.
236 237 238 239 240 241 242 243 244 |
# File 'ext/iodine/iodine_protocol.c', line 236 static VALUE dyn_write_urgent(VALUE self, VALUE data) { Check_Type(data, T_STRING); intptr_t fd = iodine_get_fd(self); Registry.add(data); if (sock_write(fd, RSTRING_PTR(data), RSTRING_LEN(data))) { return Qfalse; } return self; } |