Module: Iodine
- Defined in:
- lib/iodine.rb,
lib/iodine/tls.rb,
lib/iodine/json.rb,
lib/iodine/pubsub.rb,
lib/iodine/version.rb,
lib/iodine/mustache.rb,
lib/iodine/connection.rb,
lib/iodine/rack_utils.rb,
lib/rack/handler/iodine.rb,
ext/iodine/iodine.c
Overview
Iodine is an HTTP / WebSocket server as well as an Evented Network Tool Library. In essense, Iodine is a Ruby port for the facil.io C library.
Here is a simple telnet based echo server using Iodine (see full list at Connection):
require 'iodine'
# define the protocol for our service
module EchoProtocol
def on_open(client)
# Set a connection timeout
client.timeout = 10
# Write a welcome message
client.write "Echo server running on Iodine #{Iodine::VERSION}.\r\n"
end
# this is called for incoming data - note data might be fragmented.
def (client, data)
# write the data we received
client.write "echo: #{data}"
# close the connection when the time comes
client.close if data =~ /^bye[\n\r]/
end
# called if the connection is still open and the server is shutting down.
def on_shutdown(client)
# write the data we received
client.write "Server going away\r\n"
end
extend self
end
# create the service instance, the block returns a connection handler.
Iodine.listen(port: "3000") { EchoProtocol }
# start the service
Iodine.threads = 1
Iodine.start
Methods for setting up and starting Iodine include Iodine.start, Iodine.threads, Iodine.threads=, Iodine.workers and Iodine.workers=.
Methods for setting startup / operational callbacks include Iodine.on_idle, Iodine.on_state.
Methods for asynchronous execution include Iodine.run (same as Iodine.defer), Iodine.run_after and Iodine.run_every.
Methods for application wide pub/sub include Iodine.subscribe, Iodine.unsubscribe and Iodine.publish. Connection specific pub/sub methods are documented in the Connection class).
Methods for TCP/IP, Unix Sockets and HTTP connections include Iodine.listen and Iodine.connect.
Note that the HTTP server supports both TCP/IP and Unix Sockets as well as SSE / WebSockets extensions.
Iodine doesn't call Iodine.patch_rack automatically, but doing so will improve Rack's performace.
Please read the README file for an introduction to Iodine.
Defined Under Namespace
Modules: Base, JSON, PubSub, Rack Classes: Connection, Mustache, TLS
Constant Summary collapse
- VERSION =
'0.7.58'.freeze
- DEFAULT_SETTINGS =
{}
- DEFAULT_HTTP_ARGS =
Deprecated.
use DEFAULT_SETTINGS.
DEFAULT_SETTINGS
Class Method Summary collapse
-
.after_fork(&block) ⇒ Object
deprecated
Deprecated.
use Iodine.on_state.
-
.after_fork_in_master(&block) ⇒ Object
deprecated
Deprecated.
use Iodine.on_state.
-
.after_fork_in_worker(&block) ⇒ Object
deprecated
Deprecated.
use Iodine.on_state.
-
.attach_fd(fd, handler) ⇒ Object
The Iodine.attach_fd method instructs iodine to attach a socket to the server using it's numerical file descriptor.
-
.before_fork(&block) ⇒ Object
deprecated
Deprecated.
use Iodine.on_state.
-
.connect(args) ⇒ Object
The Iodine.connect method instructs iodine to connect to a server using either TCP/IP or Unix sockets.
-
.defer ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
-
.listen(args) ⇒ Object
Iodine.listen can be used to listen to any incoming connections, including HTTP and raw (tcp/ip and unix sockets) connections.
-
.listen2http(args, &block) ⇒ Object
deprecated
Deprecated.
use Iodine.listen with
service: :http
. -
.master? ⇒ Boolean
Returns
true
if this process is the master / root process,false
otherwise. -
.on_idle ⇒ Object
Schedules a single occuring event for the next idle cycle.
-
.on_shutdown(&block) ⇒ Object
deprecated
Deprecated.
use Iodine.on_state.
-
.on_state(event) ⇒ Object
Sets a block of code to run when Iodine's core state is updated.
-
.patch_rack ⇒ Object
Will monkey patch some Rack methods to increase their performance.
-
.publish(*args) ⇒ Object
Publishes a message to a channel.
-
.run ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
-
.run_after(milliseconds) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
-
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
-
.running? ⇒ Boolean
Returns
true
if Iodine is currently running a server. -
.start ⇒ Object
This will block the calling (main) thread and start the Iodine reactor.
-
.stop ⇒ Object
This will stop the iodine server, shutting it down.
-
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub stream / channel or replaces an existing subscription.
-
.threads ⇒ FixNum
Returns the number of worker threads that will be used when Iodine.start is called.
-
.threads=(val) ⇒ Object
Sets the number of worker threads that will be used when Iodine.start is called.
-
.unsubscribe(name) ⇒ Object
Unsubscribes from a Pub/Sub stream / channel.
-
.verbosity ⇒ FixNum
Gets the logging level used for Iodine messages.
-
.verbosity=(val) ⇒ Object
Gets the logging level used for Iodine messages.
-
.worker? ⇒ Boolean
Returns
true
if this process is a worker process or if iodine is running in a single process mode (the master is also a worker),false
otherwise. -
.workers ⇒ FixNum
Returns the number of worker processes that will be used when Iodine.start is called.
-
.workers=(val) ⇒ Object
Sets the number of worker processes that will be used when Iodine.start is called.
Class Method Details
.after_fork(&block) ⇒ Object
use on_state.
Sets a block of code to run after a new worker process is forked (cluster mode only).
Code runs in both the parent and the child.
100 101 102 103 |
# File 'lib/iodine.rb', line 100 def self.after_fork(&block) warn "Iodine.after_fork is deprecated, use Iodine.on_state(:after_fork)." Iodine.on_state(:after_fork, &block) end |
.after_fork_in_master(&block) ⇒ Object
use on_state.
Sets a block of code to run in the master / root process, after a new worker process is forked (cluster mode only).
114 115 116 117 |
# File 'lib/iodine.rb', line 114 def self.after_fork_in_master(&block) warn "Iodine.after_fork_in_master is deprecated, use Iodine.on_state(:enter_master)." Iodine.on_state(:enter_master, &block) end |
.after_fork_in_worker(&block) ⇒ Object
use on_state.
Sets a block of code to run in the worker process, after a new worker process is forked (cluster mode only).
107 108 109 110 |
# File 'lib/iodine.rb', line 107 def self.after_fork_in_worker(&block) warn "Iodine.after_fork_in_worker is deprecated, use Iodine.on_state(:enter_child)." Iodine.on_state(:enter_child, &block) end |
.attach_fd(fd, handler) ⇒ Object
The attach_fd method instructs iodine to attach a socket to the server using it's numerical file descriptor.
This is faster than attaching a Ruby IO object since it allows iodine to directly call the system's read/write methods. However, this doesn't support TLS/SSL connections.
This method requires two objects, a file descriptor (fd
) and a callback object.
See listen for details about the callback object.
Returns the callback object (handler) used.
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'ext/iodine/iodine_tcp.c', line 273
static VALUE iodine_tcp_attach_fd(VALUE self, VALUE fd, VALUE handler) {
// clang-format on
Check_Type(fd, T_FIXNUM);
if (handler == Qnil || handler == Qfalse || handler == Qtrue) {
rb_raise(rb_eArgError, "A callback object must be provided.");
}
IodineStore.add(handler);
int other = dup(NUM2INT(fd));
if (other == -1) {
rb_raise(rb_eIOError, "invalid fd.");
}
intptr_t uuid = fio_fd2uuid(other);
iodine_tcp_attch_uuid(uuid, handler);
IodineStore.remove(handler);
return handler;
(void)self;
}
|
.before_fork(&block) ⇒ Object
use on_state.
Sets a block of code to run before a new worker process is forked (cluster mode only).
91 92 93 94 |
# File 'lib/iodine.rb', line 91 def self.before_fork(&block) warn "Iodine.before_fork is deprecated, use Iodine.on_state(:before_fork)." Iodine.on_state(:before_fork, &block) end |
.connect(args) ⇒ Object
The connect method instructs iodine to connect to a server using either TCP/IP or Unix sockets.
Iodine.connect(settings)
Supported Settigs:
:url |
URL indicating service type, host name, port and optional path. |
:handler |
see details below. |
:address |
an IP address or a unix socket address. Only relevant if :url is missing. |
:body |
(HTTP client) the body to be sent. |
:cookies |
(HTTP/WebSocket client) cookie data. |
:headers |
(HTTP/WebSocket client) custom headers. |
:log |
(HTTP only) - logging the requests. |
:max_body |
(HTTP only) - limits HTTP body in the response, see listen. |
:max_headers |
(HTTP only) - limits the header length in the response, see listen. |
:max_msg |
(WebSockets only) maximum incoming message size pre message (in Kb). |
:method |
(HTTP client) a String such as "GET" or "POST". |
:path |
HTTP/WebSocket client) the HTTP path to be used. |
:ping |
ping interval (in seconds). Up to 255 seconds. |
:port |
port number to listen to either a String or Number) |
:public |
(public folder, HTTP server only) |
:service |
(:raw / :tls / :ws / :wss ) |
:timeout |
(HTTP only) keep-alive timeout in seconds. Up to 255 seconds. |
:tls |
an TLS context object for encrypted connections. |
Some connection settings are only valid for HTTP / WebSocket connections.
If :url
is provided, it will overwrite the :address
, :port
and :path
settings (if provided).
Unlike listen, a block can't be used and a :handler
object must be provided.
If the connection fails, only the on_close
callback will be called (with a nil
client).
Here's an example TCP/IP client that sends a simple HTTP GET request:
# use a secure connection?
USE_TLS = false
# remote server details
$port = USE_TLS ? 443 : 80
$address = "google.com"
# require iodine
require 'iodine'
# Iodine runtime settings
Iodine.threads = 1
Iodine.workers = 1
Iodine.verbosity = 3 # warnings only
# a client callback handler
module Client
def self.on_open(connection)
# Set a connection timeout
connection.timeout = 10
# subscribe to the chat channel.
puts "* Sending request..."
connection.write "GET / HTTP/1.1\r\nHost: #{$address}\r\n\r\n"
end
def self.(connection, data)
# publish the data we received
STDOUT.write data
# close the connection after a second... we're not really parsing anything, so it's a guess.
Iodine.run_after(1000) { connection.close }
end
def self.on_close(connection)
# stop iodine
Iodine.stop
puts "Done."
end
# returns the callback object (self).
def self.call
self
end
end
if(USE_TLS)
tls = Iodine::TLS.new
# ALPN blocks should return a valid calback object
tls.on_protocol("http/1.1") { Client }
end
Iodine.connect(address: $address, port: $port, handler: Client, tls: tls)
# start the iodine reactor
Iodine.start
Iodine also supports WebSocket client connections, using either the url
property or the ws
and wss
service names.
The following example establishes a secure (TLS) connects to the WebSocket echo testing server at wss://echo.websocket.org
:
# require iodine
require 'iodine'
# The client class
class EchoClient
def on_open(connection)
@messages = [ "Hello World!",
"I'm alive and sending messages",
"I also receive messages",
"now that we all know this...",
"I can stop.",
"Goodbye." ]
(connection)
end
def (connection, )
puts "Received: #{}"
(connection)
end
def on_close(connection)
# in this example, we stop iodine once the client is closed
puts "* Client closed."
Iodine.stop
end
# We use this method to pop messages from the queue and send them
#
# When the queue is empty, we disconnect the client.
def (connection)
msg = @messages.shift
if(msg)
connection.write msg
else
connection.close
end
end
end
Iodine.threads = 1
Iodine.connect url: "wss://echo.websocket.org", handler: EchoClient.new, ping: 40
Iodine.start
Note*: the on_close
callback is always called, even if a connection couldn't be established.
Returns the handler object used.
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 |
# File 'ext/iodine/iodine.c', line 1299
static VALUE iodine_connect(VALUE self, VALUE args) {
// clang-format on
iodine_connection_args_s s = iodine_connect_args(args, 0);
intptr_t uuid = -1;
switch (s.service) {
case IODINE_SERVICE_RAW:
uuid = iodine_tcp_connect(s);
break;
case IODINE_SERVICE_HTTP:
iodine_connect_args_cleanup(&s);
rb_raise(rb_eRuntimeError, "HTTP client connections aren't supported yet.");
return Qnil;
break;
case IODINE_SERVICE_WS:
uuid = iodine_ws_connect(s);
break;
}
iodine_connect_args_cleanup(&s);
if (uuid == -1)
rb_raise(rb_eRuntimeError, "Couldn't open client socket.");
return self;
}
|
.defer ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
Always returns the block of code to executed (Proc object).
Code will be executed only while Iodine is running (after start).
Code blocks that where scheduled to run before Iodine enters cluster mode will run on all child processes.
202 203 204 205 206 207 208 |
# File 'ext/iodine/iodine_defer.c', line 202
static VALUE iodine_defer_run(VALUE self) {
rb_need_block();
VALUE block = IodineStore.add(rb_block_proc());
fio_defer(iodine_defer_performe_once, (void *)block, NULL);
return block;
(void)self;
}
|
.listen(args) ⇒ Object
listen can be used to listen to any incoming connections, including HTTP and raw (tcp/ip and unix sockets) connections.
Iodine.listen(settings)
Supported Settigs:
:url |
URL indicating service type, host name and port. Path will be parsed as a Unix socket. |
:handler |
(deprecated: :app ) see details below. |
:address |
an IP address or a unix socket address. Only relevant if :url is missing. |
:log |
(HTTP only) request logging. For global verbosity see verbosity |
:max_body |
(HTTP only) maximum upload size allowed per request before disconnection (in Mb). |
:max_headers |
(HTTP only) maximum total header length allowed per request (in Kb). |
:max_msg |
(WebSockets only) maximum message size pre message (in Kb). |
:ping |
(:raw clients and WebSockets only) ping interval (in seconds). Up to 255 seconds. |
:port |
port number to listen to either a String or Number) |
:public |
(HTTP server only) public folder for static file service. |
:service |
(:raw / :tls / :ws / :wss / :http / :https ) a supported service this socket will listen to. |
:timeout |
(HTTP only) keep-alive timeout in seconds. Up to 255 seconds. |
:tls |
an TLS context object for encrypted connections. |
Some connection settings are only valid when listening to HTTP / WebSocket connections.
If :url
is provided, it will overwrite the :address
and :port
settings (if provided).
For HTTP connections, the :handler
must be a valid Rack application object (answers .call(env)
).
Here's an example for an HTTP hello world application:
require 'iodine'
# a handler can be a block
Iodine.listen(service: :http, port: "3000") {|env| [200, {"Content-Length" => "12"}, ["Hello World!"]] }
# start the service
Iodine.threads = 1
Iodine.start
Here's another example, using a Unix Socket instead of a TCP/IP socket for an HTTP hello world application.
This example shows how the :url
option can be used, but the :address
settings could have been used for the same effect (with port: 0
).
require 'iodine'
# note that unix sockets in URL form use an absolute path.
Iodine.listen(url: "http://:0/tmp/sock.sock") {|env| [200, {"Content-Length" => "12"}, ["Hello World!"]] }
# start the service
Iodine.threads = 1
Iodine.start
For raw connections, the :handler
object should be an object that answer .call
and returns a valid callback object that supports the following callbacks (see also Connection):
on_open(client) |
called after a connection was established |
on_message(client,data) |
called when incoming data is available. Data may be fragmented. |
on_drained(client) |
called after pending client.write events have been processed (see Iodine::Connection#pending). |
ping(client) |
called whenever a timeout has occured (see Iodine::Connection#timeout=). |
on_shutdown(client) |
called if the server is shutting down. This is called before the connection is closed. |
on_close(client) |
called when the connection with the client was closed. |
The client
argument passed to the :handler
callbacks is an Connection instance that represents the connection / the client.
Here's an example for a telnet based chat-room example:
require 'iodine'
# define the protocol for our service
module ChatHandler
def self.on_open(client)
# Set a connection timeout
client.timeout = 10
# subscribe to the chat channel.
client.subscribe :chat
# Write a welcome message
client.publish :chat, "new member entered the chat\r\n"
end
# this is called for incoming data - note data might be fragmented.
def self.(client, data)
# publish the data we received
client.publish :chat, data
# close the connection when the time comes
client.close if data =~ /^bye[\n\r]/
end
# called whenever timeout occurs.
def self.ping(client)
client.write "System: quite, isn't it...?\r\n"
end
# called if the connection is still open and the server is shutting down.
def self.on_shutdown(client)
# write the data we received
client.write "Chat server going away. Try again later.\r\n"
end
# returns the callback object (self).
def self.call
self
end
end
# we use can both the `handler` keyword or a block, anything that answers #call.
Iodine.listen(service: :raw, port: "3000", handler: ChatHandler)
# we can listen to more than a single socket at a time.
Iodine.listen(url: "raw://:3030", handler: ChatHandler)
# start the service
Iodine.threads = 1
Iodine.start
Returns the handler object used.
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 |
# File 'ext/iodine/iodine.c', line 1122
static VALUE iodine_listen(VALUE self, VALUE args) {
// clang-format on
iodine_connection_args_s s = iodine_connect_args(args, 1);
intptr_t uuid = -1;
switch (s.service) {
case IODINE_SERVICE_RAW:
uuid = iodine_tcp_listen(s);
break;
case IODINE_SERVICE_HTTP: /* overflow */
case IODINE_SERVICE_WS:
uuid = iodine_http_listen(s);
break;
}
iodine_connect_args_cleanup(&s);
if (uuid == -1)
rb_raise(rb_eRuntimeError, "Couldn't open listening socket.");
return s.handler;
(void)self;
}
|
.listen2http(args, &block) ⇒ Object
use listen with service: :http
.
Sets a block of code to run once a Worker process shuts down (both in single process mode and cluster mode).
82 83 84 85 86 |
# File 'lib/iodine.rb', line 82 def self.listen2http(args, &block) warn "Iodine.listen2http is deprecated, use Iodine.listen(service: :http)." args[:service] = :http; Iodine.listen(args, &block) end |
.master? ⇒ Boolean
Returns true
if this process is the master / root process, false
otherwise.
Note that the master process might be a worker process as well, when running in single process mode (see workers).
306 307 308 |
# File 'ext/iodine/iodine.c', line 306 static VALUE iodine_master_is(VALUE self) { return fio_is_master() ? Qtrue : Qfalse; } |
.on_idle ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'ext/iodine/iodine.c', line 83
static VALUE iodine_sched_on_idle(VALUE self) {
// clang-format on
rb_need_block();
VALUE block = rb_block_proc();
IodineStore.add(block);
fio_state_callback_add(FIO_CALL_ON_IDLE, iodine_perform_on_idle_callback,
(void *)block);
return block;
(void)self;
}
|
.on_shutdown(&block) ⇒ Object
use on_state.
Sets a block of code to run once a Worker process shuts down (both in single process mode and cluster mode).
121 122 123 124 |
# File 'lib/iodine.rb', line 121 def self.on_shutdown(&block) warn "Iodine.on_shutdown is deprecated, use Iodine.on_state(:on_finish)." Iodine.on_state(:on_finish, &block) end |
.on_state(event) ⇒ Object
Sets a block of code to run when Iodine's core state is updated.
The state event Symbol can be any of the following:
:pre_start |
the block will be called once before starting up the IO reactor. |
:before_fork |
the block will be called before each time the IO reactor forks a new worker. |
:after_fork |
the block will be called after each fork (both in parent and workers). |
:enter_child |
the block will be called by a worker process right after forking. |
:enter_master |
the block will be called by the master process after spawning a worker (after forking). |
:on_start |
the block will be called every time a worker proceess starts. In single process mode, the master process is also a worker. |
:on_parent_crush |
the block will be called by each worker the moment it detects the master process crashed. |
:on_child_crush |
the block will be called by the parent (master) after a worker process crashed. |
:start_shutdown |
the block will be called before starting the shutdown sequence. |
:on_finish |
the block will be called just before finishing up (both on chlid and parent processes). |
Code runs in both the parent and the child.
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'ext/iodine/iodine_defer.c', line 326
static VALUE iodine_on_state(VALUE self, VALUE event) {
// clang-format on
rb_need_block();
Check_Type(event, T_SYMBOL);
VALUE block = rb_block_proc();
IodineStore.add(block);
ID state = rb_sym2id(event);
if (state == STATE_PRE_START) {
fio_state_callback_add(FIO_CALL_PRE_START,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_BEFORE_FORK) {
fio_state_callback_add(FIO_CALL_BEFORE_FORK,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_AFTER_FORK) {
fio_state_callback_add(FIO_CALL_AFTER_FORK,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_ENTER_CHILD) {
fio_state_callback_add(FIO_CALL_IN_CHILD,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_ENTER_MASTER) {
fio_state_callback_add(FIO_CALL_IN_MASTER,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_ON_START) {
fio_state_callback_add(FIO_CALL_ON_START,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_ON_PARENT_CRUSH) {
fio_state_callback_add(FIO_CALL_ON_PARENT_CRUSH,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_ON_CHILD_CRUSH) {
fio_state_callback_add(FIO_CALL_ON_CHILD_CRUSH,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_START_SHUTDOWN) {
fio_state_callback_add(FIO_CALL_ON_SHUTDOWN,
iodine_perform_state_callback_persist,
(void *)block);
} else if (state == STATE_ON_FINISH) {
fio_state_callback_add(FIO_CALL_ON_FINISH,
iodine_perform_state_callback_persist,
(void *)block);
} else {
IodineStore.remove(block);
rb_raise(rb_eTypeError, "unknown event in Iodine.on_state");
}
return block;
(void)self;
}
|
.patch_rack ⇒ Object
Will monkey patch some Rack methods to increase their performance.
This is recommended, see Iodine::Rack::Utils for details.
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/iodine.rb', line 65 def self.patch_rack begin require 'rack' rescue LoadError end ::Rack::Utils.class_eval do Iodine::Base::MonkeyPatch::RackUtils.methods(false).each do |m| ::Rack::Utils.define_singleton_method(m, Iodine::Base::MonkeyPatch::RackUtils.instance_method(m) ) end end end |
.publish(*args) ⇒ Object
Publishes a message to a channel.
Can be used using two Strings:
publish(to, )
The method accepts an optional engine
argument:
publish(to, , my_pubsub_engine)
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 |
# File 'ext/iodine/iodine_connection.c', line 708
static VALUE iodine_pubsub_publish(int argc, VALUE *argv, VALUE self) {
// clang-format on
VALUE rb_ch, rb_msg, rb_engine = Qnil;
const fio_pubsub_engine_s *engine = NULL;
switch (argc) {
case 3:
/* fallthrough */
rb_engine = argv[2];
case 2:
rb_ch = argv[0];
rb_msg = argv[1];
break;
case 1: {
/* single argument must be a Hash */
Check_Type(argv[0], T_HASH);
rb_ch = rb_hash_aref(argv[0], to_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
rb_ch = rb_hash_aref(argv[0], channel_id);
}
rb_msg = rb_hash_aref(argv[0], message_id);
rb_engine = rb_hash_aref(argv[0], engine_id);
} break;
default:
rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
}
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required.");
}
Check_Type(rb_msg, T_STRING);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "target / channel is required .");
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
if (rb_engine == Qfalse) {
engine = FIO_PUBSUB_PROCESS;
} else if (rb_engine != Qnil) {
// collect engine object
iodine_pubsub_s *e = iodine_pubsub_CData(rb_engine);
if (e) {
engine = e->engine;
}
}
fio_publish(.engine = engine, .channel = IODINE_RSTRINFO(rb_ch),
.message = IODINE_RSTRINFO(rb_msg));
return Qtrue;
(void)self;
}
|
.run ⇒ Object
Runs a block of code asyncronously (adds the code to the event queue).
Always returns the block of code to executed (Proc object).
Code will be executed only while Iodine is running (after start).
Code blocks that where scheduled to run before Iodine enters cluster mode will run on all child processes.
202 203 204 205 206 207 208 |
# File 'ext/iodine/iodine_defer.c', line 202
static VALUE iodine_defer_run(VALUE self) {
rb_need_block();
VALUE block = IodineStore.add(rb_block_proc());
fio_defer(iodine_defer_performe_once, (void *)block, NULL);
return block;
(void)self;
}
|
.run_after(milliseconds) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'ext/iodine/iodine_defer.c', line 218
static VALUE iodine_defer_run_after(VALUE self, VALUE milliseconds) {
(void)(self);
if (milliseconds == Qnil) {
return iodine_defer_run(self);
}
if (TYPE(milliseconds) != T_FIXNUM) {
rb_raise(rb_eTypeError, "milliseconds must be a number");
return Qnil;
}
size_t milli = FIX2UINT(milliseconds);
if (milli == 0) {
return iodine_defer_run(self);
}
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
IodineStore.add(block);
if (fio_run_every(milli, 1, iodine_defer_run_timer, (void *)block,
(void (*)(void *))IodineStore.remove) == -1) {
perror("ERROR: Iodine couldn't initialize timer");
return Qnil;
}
return block;
}
|
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).
Accepts:
:milliseconds |
the number of milliseconds between event repetitions. |
:repetitions |
the number of event repetitions. Defaults to 0 (never ending). |
:block |
(required) a block is required, as otherwise there is nothing to |
perform.
The event will repeat itself until the number of repetitions had been delpeted.
Always returns a copy of the block object.
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'ext/iodine/iodine_defer.c', line 263
static VALUE iodine_defer_run_every(int argc, VALUE *argv, VALUE self) {
// clang-format on
(void)(self);
VALUE milliseconds, repetitions, block;
rb_scan_args(argc, argv, "11&", &milliseconds, &repetitions, &block);
if (TYPE(milliseconds) != T_FIXNUM) {
rb_raise(rb_eTypeError, "milliseconds must be a number.");
return Qnil;
}
if (repetitions != Qnil && TYPE(repetitions) != T_FIXNUM) {
rb_raise(rb_eTypeError, "repetitions must be a number or `nil`.");
return Qnil;
}
size_t milli = FIX2UINT(milliseconds);
size_t repeat = (repetitions == Qnil) ? 0 : FIX2UINT(repetitions);
// requires a block to be passed
rb_need_block();
IodineStore.add(block);
if (fio_run_every(milli, repeat, iodine_defer_run_timer, (void *)block,
(void (*)(void *))IodineStore.remove) == -1) {
perror("ERROR: Iodine couldn't initialize timer");
return Qnil;
}
return block;
}
|
.running? ⇒ Boolean
Returns true
if Iodine is currently running a server
321 322 323 324 325 326 327 |
# File 'ext/iodine/iodine.c', line 321
static VALUE iodine_running(VALUE self) {
if (fio_is_running()) {
return Qtrue;
} else {
return Qfalse;
}
}
|
.start ⇒ Object
This will block the calling (main) thread and start the Iodine reactor.
When using cluster mode (2 or more worker processes), it is important that no other threads are active.
For many reasons, fork
should NOT be called while multi-threading, so
cluster mode must always be initiated from the main thread in a single thread
environment.
For information about why forking in multi-threaded environments should be avoided, see (for example): http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'ext/iodine/iodine.c', line 272
static VALUE iodine_start(VALUE self) {
if (fio_is_running()) {
rb_raise(rb_eRuntimeError, "Iodine already running!");
}
IodineCaller.set_GVL(1);
VALUE threads_rb = iodine_threads_get(self);
VALUE workers_rb = iodine_workers_get(self);
iodine_start_params_s params = {
.threads = NUM2SHORT(threads_rb),
.workers = NUM2SHORT(workers_rb),
};
iodine_print_startup_message(params);
IodineCaller.leaveGVL(iodine_run_outside_GVL, ¶ms);
return self;
}
|
.stop ⇒ Object
This will stop the iodine server, shutting it down.
If called within a worker process (rather than the root/master process), this will cause a hot-restart for the worker.
294 295 296 297 |
# File 'ext/iodine/iodine.c', line 294 static VALUE iodine_stop(VALUE self) { fio_stop(); return self; } |
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub stream / channel or replaces an existing subscription.
The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:
subscribe("my_stream") {|source, msg| p msg }
subscribe("my_strea*", match: :redis) {|source, msg| p msg }
subscribe(to: "my_stream") {|source, msg| p msg }
# or use any object that answers `#call(source, msg)`
MyProc = Proc.new {|source, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc
The first argument must be either a String or a Hash.
The second, optional, argument must be a Hash (if given).
The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):
:match
- The channel / subject name matching type to be used. Valid value is::redis
. Future versions hope to support:nats
and:rabbit
patern matching as well.:to
- The channel / subject to subscribe to.:as
- (only for WebSocket connections) accepts the optional value:binary
. default is:text
. Note that binary transmissions are illegal for some connections (such as SSE) and an attempted binary subscription will fail for these connections.:handler
- Any object that answers.call(source, msg)
where source is the stream / channel name.
Note: if an existing subscription with the same name exists, it will be replaced by this new subscription.
Returns the name of the subscription, which matches the name be used in unsubscribe (or nil on failure).
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 |
# File 'ext/iodine/iodine_connection.c', line 604
static VALUE iodine_pubsub_subscribe(int argc, VALUE *argv, VALUE self) {
// clang-format on
iodine_sub_args_s args = iodine_subscribe_args(argc, argv);
if (args.channel == Qnil) {
return Qnil;
}
iodine_connection_data_s *c = NULL;
if (TYPE(self) == T_MODULE) {
if (!args.block) {
rb_raise(rb_eArgError,
"block or :handler required for local subscriptions.");
}
} else {
c = iodine_connection_validate_data(self);
if (!c || (c->info.type == IODINE_CONNECTION_SSE && args.binary)) {
if (args.block) {
IodineStore.remove(args.block);
}
return Qnil; /* cannot subscribe a closed / invalid connection. */
}
if (args.block == Qnil) {
if (c->info.type == IODINE_CONNECTION_WEBSOCKET)
websocket_optimize4broadcasts((args.binary
? WEBSOCKET_OPTIMIZE_PUBSUB_BINARY
: WEBSOCKET_OPTIMIZE_PUBSUB),
1);
if (args.binary) {
args.block = Qtrue;
}
}
fio_atomic_add(&c->ref, 1);
}
subscription_s *sub =
fio_subscribe(.channel = IODINE_RSTRINFO(args.channel),
.on_message = iodine_on_pubsub,
.on_unsubscribe = iodine_on_unsubscribe, .udata1 = c,
.udata2 = (void *)args.block, .match = args.pattern);
if (c) {
fio_lock(&c->lock);
if (c->info.uuid == -1) {
fio_unsubscribe(sub);
fio_unlock(&c->lock);
return Qnil;
}
iodine_sub_add(&c->subscriptions, sub);
fio_unlock(&c->lock);
} else {
fio_lock(&sub_lock);
iodine_sub_add(&sub_global, sub);
fio_unlock(&sub_lock);
}
return args.channel_org;
}
|
.threads ⇒ FixNum
Returns the number of worker threads that will be used when start is called.
Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
124 125 126 127 128 129 |
# File 'ext/iodine/iodine.c', line 124
static VALUE iodine_threads_get(VALUE self) {
VALUE i = rb_ivar_get(self, rb_intern2("@threads", 8));
if (i == Qnil)
i = INT2NUM(0);
return i;
}
|
.threads=(val) ⇒ Object
Sets the number of worker threads that will be used when start is called.
Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
142 143 144 145 146 147 148 149 |
# File 'ext/iodine/iodine.c', line 142
static VALUE iodine_threads_set(VALUE self, VALUE val) {
Check_Type(val, T_FIXNUM);
if (NUM2SSIZET(val) >= (1 << 12)) {
rb_raise(rb_eRangeError, "requsted thread count is out of range.");
}
rb_ivar_set(self, rb_intern2("@threads", 8), val);
return val;
}
|
.unsubscribe(name) ⇒ Object
Unsubscribes from a Pub/Sub stream / channel.
The method accepts a single arguments, the name used for the subscription. i.e.:
subscribe("my_stream") {|source, msg| p msg }
unsubscribe("my_stream")
Returns true
if the subscription was found.
Returns false
if the subscription didn't exist.
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 |
# File 'ext/iodine/iodine_connection.c', line 672
static VALUE iodine_pubsub_unsubscribe(VALUE self, VALUE name) {
// clang-format on
iodine_connection_data_s *c = NULL;
fio_lock_i *s_lock = &sub_lock;
fio_subhash_s *subs = &sub_global;
VALUE ret;
if (TYPE(self) != T_MODULE) {
c = iodine_connection_validate_data(self);
if (!c || c->info.uuid == -1) {
return Qnil; /* cannot unsubscribe a closed connection. */
}
s_lock = &c->lock;
subs = &c->subscriptions;
}
if (TYPE(name) == T_SYMBOL)
name = rb_sym2str(name);
Check_Type(name, T_STRING);
fio_lock(s_lock);
ret = iodine_sub_unsubscribe(subs, IODINE_RSTRINFO(name));
fio_unlock(s_lock);
return ret;
}
|
.verbosity ⇒ FixNum
Gets the logging level used for Iodine messages.
Levels range from 0-5, where:
0 == Quite (no messages) 1 == Fatal Errors only. 2 == Errors only (including fatal errors). 3 == Warnings and errors only. 4 == Informational messages, warnings and errors (default). 5 == Everything, including debug information.
Logging is always performed to the process's STDERR and can be piped away.
NOTE: this does NOT effect HTTP logging.
169 170 171 172 |
# File 'ext/iodine/iodine.c', line 169
static VALUE iodine_logging_get(VALUE self) {
return INT2FIX(FIO_LOG_LEVEL);
(void)self;
}
|
.verbosity=(val) ⇒ Object
Gets the logging level used for Iodine messages.
Levels range from 0-5, where:
0 == Quite (no messages) 1 == Fatal Errors only. 2 == Errors only (including fatal errors). 3 == Warnings and errors only. 4 == Informational messages, warnings and errors (default). 5 == Everything, including debug information.
Logging is always performed to the process's STDERR and can be piped away.
NOTE: this does NOT effect HTTP logging.
192 193 194 195 196 |
# File 'ext/iodine/iodine.c', line 192
static VALUE iodine_logging_set(VALUE self, VALUE val) {
Check_Type(val, T_FIXNUM);
FIO_LOG_LEVEL = FIX2INT(val);
return self;
}
|
.worker? ⇒ Boolean
Returns true
if this process is a worker process or if iodine is running in
a single process mode (the master is also a worker), false
otherwise.
314 315 316 |
# File 'ext/iodine/iodine.c', line 314 static VALUE iodine_worker_is(VALUE self) { return fio_is_worker() ? Qtrue : Qfalse; } |
.workers ⇒ FixNum
Returns the number of worker processes that will be used when start is called.
Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
1 == single process mode, the msater process acts as a worker process.
211 212 213 214 215 216 |
# File 'ext/iodine/iodine.c', line 211
static VALUE iodine_workers_get(VALUE self) {
VALUE i = rb_ivar_get(self, rb_intern2("@workers", 8));
if (i == Qnil)
i = INT2NUM(0);
return i;
}
|
.workers=(val) ⇒ Object
Sets the number of worker processes that will be used when start is called.
Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.
Zero values promise nothing (iodine will decide what to do with them).
1 == single process mode, the msater process acts as a worker process.
231 232 233 234 235 236 237 238 |
# File 'ext/iodine/iodine.c', line 231
static VALUE iodine_workers_set(VALUE self, VALUE val) {
Check_Type(val, T_FIXNUM);
if (NUM2SSIZET(val) >= (1 << 9)) {
rb_raise(rb_eRangeError, "requsted worker process count is out of range.");
}
rb_ivar_set(self, rb_intern2("@workers", 8), val);
return val;
}
|