Module: Iodine
- Defined in:
- lib/iodine.rb,
lib/iodine/json.rb,
lib/iodine/pubsub.rb,
lib/iodine/version.rb,
lib/iodine/connection.rb,
lib/iodine/rack_utils.rb,
lib/rack/handler/iodine.rb,
ext/iodine/iodine.c
Overview
Iodine is an HTTP / WebSocket server as well as an Evented Network Tool Library.
Here is a simple telnet based echo server using Iodine (see full list at Connection):
require 'iodine'
# define the protocol for our service
module EchoProtocol
def on_open(client)
# Set a connection timeout
client.timeout = 10
# Write a welcome message
client.write "Echo server running on Iodine #{Iodine::VERSION}.\r\n"
end
# this is called for incoming data - note data might be fragmented.
def (client, data)
# write the data we received
client.write "echo: #{data}"
# close the connection when the time comes
client.close if data =~ /^bye[\n\r]/
end
# called if the connection is still open and the server is shutting down.
def on_shutdown(client)
# write the data we received
client.write "Server going away\r\n"
end
extend self
end
# create the service instance, the block returns a connection handler.
Iodine.listen(port: "3000") { EchoProtocol }
# start the service
Iodine.threads = 1
Iodine.start
Methods for setting up and starting Iodine include Iodine.start, Iodine.threads, Iodine.threads=, Iodine.workers and Iodine.workers=.
Methods for setting startup / operational callbacks include Iodine.on_idle, Iodine.on_shutdown, Iodine.before_fork and Iodine.after_fork.
Methods for asynchronous execution include Iodine.run (same as Iodine.defer), Iodine.run_after and Iodine.run_every.
Methods for application wide pub/sub include Iodine.subscribe, Iodine.unsubscribe and Iodine.publish. Connection specific pub/sub methods are documented in the Connection class).
Methods for TCP/IP and Unix Sockets connections include Iodine.listen and Iodine.connect.
Methods for HTTP connections include Iodine.listen2http.
Note that the HTTP server supports both TCP/IP and Unix Sockets as well as SSE / WebSockets extensions.
Iodine doesn’t call Iodine.patch_rack automatically, but doing so will improve Rack’s performace.
Please read the README file for an introduction to Iodine.
Defined Under Namespace
Modules: Base, JSON, PubSub, Rack Classes: Connection
Constant Summary collapse
- VERSION =
'0.6.0'.freeze
Class Method Summary collapse
-
.after_fork ⇒ Object
Sets a block of code to run after a new worker process is forked (cluster mode only).
-
.attach_fd(fd, handler) ⇒ Object
The Iodine.attach_fd method instructs iodine to attach a socket to the server using it’s numerical file descriptor.
-
.before_fork ⇒ Object
Sets a block of code to run before a new worker process is forked (cluster mode only).
-
.connect(args) ⇒ Object
The Iodine.connect method instructs iodine to connect to a server using either TCP/IP or Unix sockets.
-
.defer ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
-
.listen(args) ⇒ Object
The Iodine.listen method instructs iodine to listen to incoming connections using either TCP/IP or Unix sockets.
-
.listen2http(opt) ⇒ Object
Listens to incoming HTTP connections and handles incoming requests using the Rack specification.
-
.on_idle ⇒ Object
Schedules a single occuring event for the next idle cycle.
-
.on_shutdown ⇒ Object
Sets a block of code to run once a Worker process shuts down (both in single process mode and cluster mode).
-
.patch_rack ⇒ Object
Will monkey patch some Rack methods to increase their performance.
-
.publish(*args) ⇒ Object
Publishes a message to a channel.
-
.run ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
-
.run_after(milliseconds) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
-
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
-
.start ⇒ Object
This will block the calling (main) thread and start the Iodine reactor.
-
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub stream / channel or replaces an existing subscription.
-
.threads ⇒ Object
Returns the number of worker threads that will be used when Iodine.start is called.
-
.threads=(val) ⇒ Object
Sets the number of worker threads that will be used when Iodine.start is called.
-
.unsubscribe(name) ⇒ Object
Unsubscribes from a Pub/Sub stream / channel.
-
.workers ⇒ Object
Returns the number of worker processes that will be used when Iodine.start is called.
-
.workers=(val) ⇒ Object
Sets the number of worker processes that will be used when Iodine.start is called.
Class Method Details
.after_fork ⇒ Object
Sets a block of code to run after a new worker process is forked (cluster mode only).
308 309 310 311 312 313 314 315 316 317 |
# File 'ext/iodine/iodine_defer.c', line 308
VALUE iodine_after_fork_add(VALUE self) {
rb_need_block();
VALUE block = rb_block_proc();
IodineStore.add(block);
spn_lock(&iodine_after_fork_lock);
fio_ls_push(&iodine_after_fork_list, (void *)block);
spn_unlock(&iodine_after_fork_lock);
return block;
(void)self;
}
|
.attach_fd(fd, handler) ⇒ Object
The attach_fd method instructs iodine to attach a socket to the server using it’s numerical file descriptor.
This is faster than attaching a Ruby IO object since it allows iodine to directly call the system’s read/write methods. However, this doesn’t support TLS/SSL connections.
This method requires two objects, a file descriptor (‘fd`) and a callback object.
See listen for details about the callback object.
Returns the callback object (handler) used.
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'ext/iodine/iodine_tcp.c', line 314
static VALUE iodine_tcp_attach_fd(VALUE self, VALUE fd, VALUE handler) {
// clang-format on
Check_Type(fd, T_FIXNUM);
if (handler == Qnil || handler == Qfalse || handler == Qtrue) {
rb_raise(rb_eArgError, "A callback object must be provided.");
}
IodineStore.add(handler);
int other = dup(NUM2INT(fd));
if (other == -1) {
rb_raise(rb_eIOError, "invalid fd.");
}
intptr_t uuid = sock_open(other);
iodine_tcp_attch_uuid(uuid, handler);
IodineStore.remove(handler);
return handler;
(void)self;
}
|
.before_fork ⇒ Object
Sets a block of code to run before a new worker process is forked (cluster mode only).
293 294 295 296 297 298 299 300 301 302 |
# File 'ext/iodine/iodine_defer.c', line 293
VALUE iodine_before_fork_add(VALUE self) {
rb_need_block();
VALUE block = rb_block_proc();
IodineStore.add(block);
spn_lock(&iodine_before_fork_lock);
fio_ls_push(&iodine_before_fork_list, (void *)block);
spn_unlock(&iodine_before_fork_lock);
return block;
(void)self;
}
|
.connect(args) ⇒ Object
The connect method instructs iodine to connect to a server using either TCP/IP or Unix sockets.
The method accepts a single Hash argument with the following optional keys:
- :port
-
The port to listen to, deafults to 0 (using a Unix socket)
- :address
-
The address to listen to, which could be a Unix Socket path as well as an IPv4 / IPv6 address. Deafults to 0.0.0.0 (or the IPv6 equivelant).
- :handler
-
A connection callback object that supports the following same callbacks listen in the listen method’s documentation.
- :timeout
-
An integer timeout for connection establishment (doen’t effect the new connection’s timeout. Should be in the rand of 0..255.
The method also accepts an optional block.
Either a block or the :handler key MUST be present.
If the connection fails, only the ‘on_close` callback will be called (with a `nil` client).
Returns the handler object used.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'ext/iodine/iodine_tcp.c', line 270
static VALUE iodine_tcp_connect(VALUE self, VALUE args) {
// clang-format on
Check_Type(args, T_HASH);
VALUE rb_port = rb_hash_aref(args, port_id);
VALUE rb_address = rb_hash_aref(args, address_id);
VALUE rb_handler = rb_hash_aref(args, handler_id);
VALUE rb_timeout = rb_hash_aref(args, timeout_id);
uint8_t timeout = 0;
if (rb_handler == Qnil || rb_handler == Qfalse || rb_handler == Qtrue) {
rb_raise(rb_eArgError, "A callback object (:handler) must be provided.");
}
IodineStore.add(rb_handler);
if (rb_address != Qnil) {
Check_Type(rb_address, T_STRING);
}
if (rb_port != Qnil) {
Check_Type(rb_port, T_STRING);
}
if (rb_timeout != Qnil) {
Check_Type(rb_timeout, T_FIXNUM);
timeout = NUM2USHORT(rb_timeout);
}
facil_connect(.port = (rb_port == Qnil ? NULL : StringValueCStr(rb_port)),
.address =
(rb_address == Qnil ? NULL : StringValueCStr(rb_address)),
.on_connect = iodine_tcp_on_connect,
.on_fail = iodine_tcp_on_fail, .timeout = timeout,
.udata = (void *)rb_handler);
return rb_handler;
(void)self;
}
|
.defer ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
Always returns the block of code to executed (Proc object).
Code will be executed only while Iodine is running (after start).
Code blocks that where scheduled to run before Iodine enters cluster mode will run on all child processes.
189 190 191 192 193 194 195 |
# File 'ext/iodine/iodine_defer.c', line 189
static VALUE iodine_defer_run(VALUE self) {
rb_need_block();
VALUE block = IodineStore.add(rb_block_proc());
defer(iodine_defer_performe_once, (void *)block, NULL);
return block;
(void)self;
}
|
.listen(args) ⇒ Object
The listen method instructs iodine to listen to incoming connections using either TCP/IP or Unix sockets.
The method accepts a single Hash argument with the following optional keys:
- :port
-
The port to listen to, deafults to 0 (using a Unix socket)
- :address
-
The address to listen to, which could be a Unix Socket path as well as an IPv4 / IPv6 address. Deafults to 0.0.0.0 (or the IPv6 equivelant).
- :handler
-
An object that answers the ‘call` method (i.e., a Proc).
The method also accepts an optional block.
Either a block or the :handler key MUST be present.
The handler Proc (or object) should return a connection callback object that supports the following callbacks (see also Connection):
- on_open(client)
-
called after a connection was established
- on_message(client, data)
-
called when incoming data is available. Data may be fragmented.
- on_drained(client)
-
called when all the pending ‘client.write` events have been processed (see Iodine::Connection#pending).
- ping(client)
-
called whenever a timeout has occured (see Iodine::Connection#timeout=).
- on_shutdown(client)
-
called if the server is shutting down. This is called before the connection is closed.
- on_close(client)
-
called when the connection with the client was closed.
The ‘client` argument is an Connection instance that represents the connection / the client.
Here’s a telnet based chat-room example:
require 'iodine'
# define the protocol for our service
module ChatHandler
def self.on_open(client)
# Set a connection timeout
client.timeout = 10
# subscribe to the chat channel.
client.subscribe :chat
# Write a welcome message
client.publish :chat, "new member entered the chat\r\n"
end
# this is called for incoming data - note data might be fragmented.
def self.(client, data)
# publish the data we received
client.publish :chat, data
# close the connection when the time comes
client.close if data =~ /^bye[\n\r]/
end
# called whenever timeout occurs.
def self.ping(client)
client.write "System: quite, isn't it...?\r\n"
end
# called if the connection is still open and the server is shutting down.
def self.on_shutdown(client)
# write the data we received
client.write "Chat server going away. Try again later.\r\n"
end
# returns the callback object (self).
def self.call
self
end
end
# we can bothe the `handler` keuword or a block, anything that answers #call.
Iodine.listen(port: "3000", handler: ChatHandler)
# start the service
Iodine.threads = 1
Iodine.start
Returns the handler object used.
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'ext/iodine/iodine_tcp.c', line 219
static VALUE iodine_tcp_listen(VALUE self, VALUE args) {
// clang-format on
Check_Type(args, T_HASH);
VALUE rb_port = rb_hash_aref(args, port_id);
VALUE rb_address = rb_hash_aref(args, address_id);
VALUE rb_handler = rb_hash_aref(args, handler_id);
if (rb_handler == Qnil || rb_handler == Qfalse || rb_handler == Qtrue) {
rb_need_block();
rb_handler = rb_block_proc();
}
IodineStore.add(rb_handler);
if (rb_address != Qnil) {
Check_Type(rb_address, T_STRING);
}
if (rb_port != Qnil) {
Check_Type(rb_port, T_STRING);
}
if (facil_listen(.port = (rb_port == Qnil ? NULL : StringValueCStr(rb_port)),
.address =
(rb_address == Qnil ? NULL
: StringValueCStr(rb_address)),
.on_open = iodine_tcp_on_open,
.on_finish = iodine_tcp_on_finish,
.udata = (void *)rb_handler) == -1) {
IodineStore.remove(rb_handler);
rb_raise(rb_eRuntimeError,
"failed to listen to requested address, unknown error.");
}
return rb_handler;
(void)self;
}
|
.listen2http(opt) ⇒ Object
Listens to incoming HTTP connections and handles incoming requests using the Rack specification.
This is delegated to a lower level C HTTP and Websocket implementation, no Ruby object will be crated except the ‘env` object required by the Rack specifications.
Accepts a single Hash argument with the following properties:
(it’s possible to set default values using the DEFAULT_HTTP_ARGS Hash)
- app
-
the Rack application that handles incoming requests. Default: ‘nil`.
- port
-
the port to listen to. Default: 3000.
- address
-
the address to bind to. Default: binds to all possible addresses.
- log
-
enable response logging (Hijacked sockets aren’t logged). Default: off.
- public
-
The root public folder for static file service. Default: none.
- timeout
-
Timeout for inactive HTTP/1.x connections. Defaults: 40 seconds.
- max_body
-
The maximum body size for incoming HTTP messages. Default: ~50Mib.
- max_headers
-
The maximum total header length for incoming HTTP messages. Default: ~64Kib.
- max_msg
-
The maximum Websocket message size allowed. Default: ~250Kib.
- ping
-
The Websocket ‘ping` interval. Default: 40 seconds.
Either the ‘app` or the `public` properties are required. If niether exists, the function will fail. If both exist, Iodine will serve static files as well as dynamic requests.
When using the static file server, it’s possible to serve ‘gzip` versions of the static files by saving a compressed version with the `gz` extension (i.e. `styles.css.gz`).
‘gzip` will only be served to clients tat support the `gzip` transfer encoding.
Once HTTP/2 is supported (planned, but probably very far away), HTTP/2 timeouts will be dynamically managed by Iodine. The ‘timeout` option is only relevant to HTTP/1.x connections.
859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 |
# File 'ext/iodine/iodine_http.c', line 859
VALUE iodine_http_listen(VALUE self, VALUE opt) {
// clang-format on
uint8_t log_http = 0;
size_t ping = 0;
size_t max_body = 0;
size_t max_headers = 0;
size_t max_msg = 0;
Check_Type(opt, T_HASH);
/* copy from deafult hash */
/* test arguments */
VALUE app = rb_hash_aref(opt, ID2SYM(rb_intern("app")));
VALUE www = rb_hash_aref(opt, ID2SYM(rb_intern("public")));
VALUE port = rb_hash_aref(opt, ID2SYM(rb_intern("port")));
VALUE address = rb_hash_aref(opt, ID2SYM(rb_intern("address")));
VALUE tout = rb_hash_aref(opt, ID2SYM(rb_intern("timeout")));
if (www == Qnil) {
www = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("public")));
}
if (port == Qnil) {
port = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("port")));
}
if (address == Qnil) {
address = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("address")));
}
if (tout == Qnil) {
tout = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("timeout")));
}
VALUE tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_msg")));
if (tmp == Qnil) {
tmp = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("max_msg")));
}
if (tmp != Qnil && tmp != Qfalse) {
Check_Type(tmp, T_FIXNUM);
max_msg = FIX2ULONG(tmp);
}
tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_body")));
if (tmp == Qnil) {
tmp = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("max_body")));
}
if (tmp != Qnil && tmp != Qfalse) {
Check_Type(tmp, T_FIXNUM);
max_body = FIX2ULONG(tmp);
}
tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_headers")));
if (tmp == Qnil) {
tmp = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("max_headers")));
}
if (tmp != Qnil && tmp != Qfalse) {
Check_Type(tmp, T_FIXNUM);
max_headers = FIX2ULONG(tmp);
}
tmp = rb_hash_aref(opt, ID2SYM(rb_intern("ping")));
if (tmp == Qnil) {
tmp = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("ping")));
}
if (tmp != Qnil && tmp != Qfalse) {
Check_Type(tmp, T_FIXNUM);
ping = FIX2ULONG(tmp);
}
if (ping > 255) {
fprintf(stderr, "Iodine Warning: Websocket timeout value "
"is over 255 and will be ignored.\n");
ping = 0;
}
tmp = rb_hash_aref(opt, ID2SYM(rb_intern("log")));
if (tmp == Qnil) {
tmp = rb_hash_aref(iodine_default_args, ID2SYM(rb_intern("log")));
}
if (tmp != Qnil && tmp != Qfalse)
log_http = 1;
if ((app == Qnil || app == Qfalse) && (www == Qnil || www == Qfalse)) {
fprintf(stderr, "Iodine Warning: HTTP without application or public folder "
"(ignored).\n");
return Qfalse;
}
if ((www != Qnil && www != Qfalse)) {
Check_Type(www, T_STRING);
IodineStore.add(www);
rb_hash_aset(env_template_no_upgrade, XSENDFILE_TYPE, XSENDFILE);
rb_hash_aset(env_template_no_upgrade, XSENDFILE_TYPE_HEADER, XSENDFILE);
support_xsendfile = 1;
} else
www = 0;
if ((address != Qnil && address != Qfalse))
Check_Type(address, T_STRING);
else
address = 0;
if ((tout != Qnil && tout != Qfalse)) {
Check_Type(tout, T_FIXNUM);
tout = FIX2ULONG(tout);
} else
tout = 0;
if (tout > 255) {
fprintf(stderr, "Iodine Warning: HTTP timeout value "
"is over 255 and is silently ignored.\n");
tout = 0;
}
if (port != Qnil && port != Qfalse) {
if (!RB_TYPE_P(port, T_STRING) && !RB_TYPE_P(port, T_FIXNUM))
rb_raise(rb_eTypeError,
"The `port` property MUST be either a String or a Number");
if (RB_TYPE_P(port, T_FIXNUM))
port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
IodineStore.add(port);
} else if (port == Qfalse)
port = 0;
else {
port = rb_str_new("3000", 4);
IodineStore.add(port);
}
if ((app != Qnil && app != Qfalse))
IodineStore.add(app);
else
app = 0;
if (http_listen(
StringValueCStr(port), (address ? StringValueCStr(address) : NULL),
.on_request = on_rack_request, .on_upgrade = on_rack_upgrade,
.udata = (void *)app, .timeout = (tout ? FIX2INT(tout) : tout),
.ws_timeout = ping, .ws_max_msg_size = max_msg,
.max_header_size = max_headers, .on_finish = free_iodine_http,
.log = log_http, .max_body_size = max_body,
.public_folder = (www ? StringValueCStr(www) : NULL))) {
fprintf(stderr,
"ERROR: Failed to initialize a listening HTTP socket for port %s\n",
port ? StringValueCStr(port) : "3000");
return Qfalse;
}
if ((app == Qnil || app == Qfalse)) {
fprintf(stderr,
"* Iodine: (no app) the HTTP service on port %s will only serve "
"static files.\n",
(port ? StringValueCStr(port) : "3000"));
}
defer(iodine_print_http_msg, (www ? (void *)www : NULL), (void *)port);
return Qtrue;
(void)self;
}
|
.on_idle ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'ext/iodine/iodine.c', line 56
VALUE iodine_sched_on_idle(VALUE self) {
rb_need_block();
VALUE block = rb_block_proc();
IodineStore.add(block);
spn_lock(&iodine_on_idle_lock);
fio_ls_push(&iodine_on_idle_list, (void *)block);
spn_unlock(&iodine_on_idle_lock);
return block;
(void)self;
}
|
.on_shutdown ⇒ Object
Sets a block of code to run once a Worker process shuts down (both in single process mode and cluster mode).
323 324 325 326 327 328 329 330 331 332 333 |
# File 'ext/iodine/iodine_defer.c', line 323
VALUE iodine_on_shutdown_add(VALUE self) {
// clang-format on
rb_need_block();
VALUE block = rb_block_proc();
IodineStore.add(block);
spn_lock(&iodine_on_shutdown_lock);
fio_ls_push(&iodine_on_shutdown_list, (void *)block);
spn_unlock(&iodine_on_shutdown_lock);
return block;
(void)self;
}
|
.patch_rack ⇒ Object
Will monkey patch some Rack methods to increase their performance.
This is recommended, see Iodine::Rack::Utils for details.
65 66 67 68 69 70 71 72 |
# File 'lib/iodine.rb', line 65 def self.patch_rack ::Rack::Utils.class_eval do Iodine::Base::MonkeyPatch::RackUtils.methods(false).each do |m| ::Rack::Utils.define_singleton_method(m, Iodine::Base::MonkeyPatch::RackUtils.instance_method(m) ) end end end |
.publish(*args) ⇒ Object
Publishes a message to a channel.
Can be used using two Strings:
publish(to, )
The method accepts an optional ‘engine` argument:
publish(to, , my_pubsub_engine)
Alternatively, accepts the following named arguments:
- :to
-
The channel to publish to (required).
- :message
-
The message to be published (required).
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 |
# File 'ext/iodine/iodine_connection.c', line 608
static VALUE iodine_pubsub_publish(int argc, VALUE *argv, VALUE self) {
VALUE rb_ch, rb_msg, rb_engine = Qnil;
const pubsub_engine_s *engine = NULL;
switch (argc) {
case 3:
/* fallthrough */
rb_engine = argv[2];
case 2:
rb_ch = argv[0];
rb_msg = argv[1];
break;
case 1: {
/* single argument must be a Hash */
Check_Type(argv[0], T_HASH);
rb_ch = rb_hash_aref(argv[0], to_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
rb_ch = rb_hash_aref(argv[0], channel_id);
}
rb_msg = rb_hash_aref(argv[0], message_id);
rb_engine = rb_hash_aref(argv[0], engine_id);
} break;
default:
rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
}
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required.");
}
Check_Type(rb_msg, T_STRING);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "target / channel is required .");
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
if (rb_engine == Qfalse) {
engine = PUBSUB_PROCESS_ENGINE;
} else if (rb_engine != Qnil) {
// collect engine object
iodine_pubsub_s *e = iodine_pubsub_CData(rb_engine);
if (e) {
engine = e->engine;
}
}
FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));
intptr_t ret =
pubsub_publish(.engine = engine, .channel = ch, .message = msg);
fiobj_free(ch);
fiobj_free(msg);
if (!ret)
return Qfalse;
return Qtrue;
(void)self;
}
|
.run ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
Always returns the block of code to executed (Proc object).
Code will be executed only while Iodine is running (after start).
Code blocks that where scheduled to run before Iodine enters cluster mode will run on all child processes.
189 190 191 192 193 194 195 |
# File 'ext/iodine/iodine_defer.c', line 189
static VALUE iodine_defer_run(VALUE self) {
rb_need_block();
VALUE block = IodineStore.add(rb_block_proc());
defer(iodine_defer_performe_once, (void *)block, NULL);
return block;
(void)self;
}
|
.run_after(milliseconds) ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'ext/iodine/iodine_defer.c', line 205
static VALUE iodine_defer_run_after(VALUE self, VALUE milliseconds) {
(void)(self);
if (milliseconds == Qnil) {
return iodine_defer_run(self);
}
if (TYPE(milliseconds) != T_FIXNUM) {
rb_raise(rb_eTypeError, "milliseconds must be a number");
return Qnil;
}
size_t milli = FIX2UINT(milliseconds);
if (milli == 0) {
return iodine_defer_run(self);
}
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
IodineStore.add(block);
if (facil_run_every(milli, 1, iodine_defer_run_timer, (void *)block,
(void (*)(void *))IodineStore.remove) == -1) {
perror("ERROR: Iodine couldn't initialize timer");
return Qnil;
}
return block;
}
|
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).
Accepts:
- milliseconds
-
the number of milliseconds between event repetitions.
- repetitions
-
the number of event repetitions. Defaults to 0 (never ending).
- block
-
(required) a block is required, as otherwise there is nothing to
perform.
The event will repeat itself until the number of repetitions had been delpeted.
Always returns a copy of the block object.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'ext/iodine/iodine_defer.c', line 248
static VALUE iodine_defer_run_every(int argc, VALUE *argv, VALUE self) {
(void)(self);
VALUE milliseconds, repetitions, block;
rb_scan_args(argc, argv, "11&", &milliseconds, &repetitions, &block);
if (TYPE(milliseconds) != T_FIXNUM) {
rb_raise(rb_eTypeError, "milliseconds must be a number.");
return Qnil;
}
if (repetitions != Qnil && TYPE(repetitions) != T_FIXNUM) {
rb_raise(rb_eTypeError, "repetitions must be a number or `nil`.");
return Qnil;
}
size_t milli = FIX2UINT(milliseconds);
size_t repeat = (repetitions == Qnil) ? 0 : FIX2UINT(repetitions);
// requires a block to be passed
rb_need_block();
IodineStore.add(block);
if (facil_run_every(milli, repeat, iodine_defer_run_timer, (void *)block,
(void (*)(void *))IodineStore.remove) == -1) {
perror("ERROR: Iodine couldn't initialize timer");
return Qnil;
}
return block;
}
|
.start ⇒ Object
This will block the calling (main) thread and start the Iodine reactor.
When using cluster mode (2 or more worker processes), it is important that no other threads are active.
For many reasons, ‘fork` should NOT be called while multi-threading, so cluster mode must always be initiated from the main thread in a single thread environment.
For information about why forking in multi-threaded environments should be avoided, see (for example): www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them
195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'ext/iodine/iodine.c', line 195
static VALUE iodine_start(VALUE self) {
if (facil_is_running()) {
rb_raise(rb_eRuntimeError, "Iodine already running!");
}
VALUE threads_rb = iodine_threads_get(self);
VALUE workers_rb = iodine_workers_get(self);
iodine_start_params_s params = {
.threads = NUM2SHORT(threads_rb), .workers = NUM2SHORT(workers_rb),
};
iodine_print_startup_message(params);
IodineCaller.leaveGVL(iodine_run_outside_GVL, ¶ms);
return self;
}
|
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub stream / channel or replaces an existing subscription.
The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:
subscribe("my_stream") {|source, msg| p msg }
subscribe("my_strea*", match: :redis) {|source, msg| p msg }
subscribe(to: "my_stream") {|source, msg| p msg }
# or use any object that answers `#call(source, msg)`
MyProc = Proc.new {|source, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc
The first argument must be either a String or a Hash.
The second, optional, argument must be a Hash (if given).
The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):
- :match
-
The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.
- :to
-
The channel / subject to subscribe to.
- :as
-
(only for WebSocket connections) accepts the optional value ‘:binary`. default is `:text`. Note that binary transmissions are illegal for some connections (such as SSE) and an attempted binary subscription will fail for these connections.
- :handler
-
Any object that answers ‘#call(source, msg)` where source is the stream / channel name.
Note: if an existing subscription with the same name exists, it will be replaced by this new subscription.
Returns the name of the subscription, which matches the name be used in unsubscribe (or nil on failure).
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 |
# File 'ext/iodine/iodine_connection.c', line 499
static VALUE iodine_pubsub_subscribe(int argc, VALUE *argv, VALUE self) {
// clang-format on
iodine_sub_args_s args = iodine_subscribe_args(argc, argv);
if (args.channel == Qnil) {
return Qnil;
}
iodine_connection_data_s *c = NULL;
if (TYPE(self) == T_MODULE) {
if (!args.block) {
rb_raise(rb_eArgError,
"block or :handler required for local subscriptions.");
}
} else {
c = iodine_connection_validate_data(self);
if (!c || (c->info.type == IODINE_CONNECTION_SSE && args.binary)) {
if (args.block) {
IodineStore.remove(args.block);
}
return Qnil; /* cannot subscribe a closed connection. */
}
if (args.block == Qnil && args.binary) {
args.block = Qtrue;
}
spn_add(&c->ref, 1);
}
FIOBJ channel =
fiobj_str_new(RSTRING_PTR(args.channel), RSTRING_LEN(args.channel));
pubsub_sub_pt sub =
pubsub_subscribe(.channel = channel, .on_message = iodine_on_pubsub,
.on_unsubscribe = iodine_on_unsubscribe, .udata1 = c,
.udata2 = (void *)args.block,
.use_pattern = args.pattern);
fiobj_free(channel);
if (c) {
spn_lock(&c->lock);
if (c->info.uuid == -1) {
pubsub_unsubscribe(sub);
spn_unlock(&c->lock);
return Qnil;
} else {
iodine_sub_add(&c->subscriptions, sub);
}
spn_unlock(&c->lock);
} else {
spn_lock(&sub_lock);
iodine_sub_add(&sub_global, sub);
spn_unlock(&sub_lock);
}
return args.channel;
}
|
.threads ⇒ Object
Returns the number of worker threads that will be used when start is called.
Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
106 107 108 109 110 111 |
# File 'ext/iodine/iodine.c', line 106
static VALUE iodine_threads_get(VALUE self) {
VALUE i = rb_ivar_get(self, rb_intern2("@threads", 8));
if (i == Qnil)
i = INT2NUM(0);
return i;
}
|
.threads=(val) ⇒ Object
Sets the number of worker threads that will be used when start is called.
Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
122 123 124 125 126 127 128 129 |
# File 'ext/iodine/iodine.c', line 122
static VALUE iodine_threads_set(VALUE self, VALUE val) {
Check_Type(val, T_FIXNUM);
if (NUM2SSIZET(val) >= (1 << 12)) {
rb_raise(rb_eRangeError, "requsted thread count is out of range.");
}
rb_ivar_set(self, rb_intern2("@threads", 8), val);
return val;
}
|
.unsubscribe(name) ⇒ Object
Unsubscribes from a Pub/Sub stream / channel.
The method accepts a single arguments, the name used for the subscription. i.e.:
subscribe("my_stream") {|source, msg| p msg }
unsubscribe("my_stream")
Returns ‘true` if the subscription was found.
Returns ‘false` if the subscription didn’t exist.
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
# File 'ext/iodine/iodine_connection.c', line 564
static VALUE iodine_pubsub_unsubscribe(VALUE self, VALUE name) {
// clang-format on
iodine_connection_data_s *c = NULL;
FIOBJ channel = fiobj_str_new(RSTRING_PTR(name), RSTRING_LEN(name));
VALUE ret;
if (TYPE(self) == T_MODULE) {
spn_lock(&sub_lock);
ret = iodine_sub_unsubscribe(&sub_global, channel);
spn_unlock(&sub_lock);
} else {
c = iodine_connection_validate_data(self);
if (!c) {
return Qnil; /* cannot subscribe a closed connection. */
}
spn_lock(&sub_lock);
ret = iodine_sub_unsubscribe(&sub_global, channel);
spn_unlock(&sub_lock);
}
fiobj_free(channel);
return ret;
}
|
.workers ⇒ Object
Returns the number of worker processes that will be used when start
is called.
Negative numbers are translated as fractions of the number of CPU cores.
i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
140 141 142 143 144 145 |
# File 'ext/iodine/iodine.c', line 140
static VALUE iodine_workers_get(VALUE self) {
VALUE i = rb_ivar_get(self, rb_intern2("@workers", 8));
if (i == Qnil)
i = INT2NUM(0);
return i;
}
|
.workers=(val) ⇒ Object
Sets the number of worker processes that will be used when start is called.
Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
156 157 158 159 160 161 162 163 |
# File 'ext/iodine/iodine.c', line 156
static VALUE iodine_workers_set(VALUE self, VALUE val) {
Check_Type(val, T_FIXNUM);
if (NUM2SSIZET(val) >= (1 << 9)) {
rb_raise(rb_eRangeError, "requsted worker process count is out of range.");
}
rb_ivar_set(self, rb_intern2("@workers", 8), val);
return val;
}
|