Module: Iodine
- Defined in:
- lib/iodine.rb,
lib/iodine/cli.rb,
lib/iodine/json.rb,
lib/iodine/pubsub.rb,
lib/iodine/version.rb,
lib/iodine/protocol.rb,
lib/iodine/websocket.rb,
lib/iodine/monkeypatch.rb,
lib/rack/handler/iodine.rb,
ext/iodine/iodine.c
Overview
Iodine is both a Rack server and a platform for writing evented network services on Ruby.
Here is a sample Echo server using Iodine:
# define the protocol for our service
class EchoProtocol
@timeout = 10
# this is just one possible callback with a recyclable buffer
def buffer
# write the data we received
write "echo: #{buffer}"
# close the connection when the time comes
close if buffer =~ /^bye[\n\r]/
end
end
# create the service instance
Iodine.listen 3000, EchoProtocol
# start the service
Iodine.start
Please read the README file for an introduction to Iodine and an overview of it’s API.
The API
The main API methods for the top Iodine namesapce are grouped here by subject.
Event Loop / Concurrency
Iodine manages an internal event-loop and reactor pattern. The following API manages Iodine’s behavior.
-
Iodine.threads, Iodine.threads= gets or sets the amount of threads iodine will use in it’s working thread pool.
-
Iodine.processes, Iodine.processes gets or sets the amount of processes iodine will utilize (‘fork`) to handle connections.
-
Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.
Event and Task Scheduling
-
Iodine.run schedules a block of code to run asynchronously.
-
Iodine.run_after, Iodine.run_every schedules a block of code to run (asynchronously) using a timer.
-
Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.
In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation, which allows for a connection bound task(s) to be scheduled to run within the connection’s lock (for example, Websocket#defer and Websocket#each).
Connection Handling
Iodine handles connections using Protocol objects. The following API manages either built-in or custom Protocol objects (classes / instances) in relation to their network sockets.
-
Iodine.attach_fd, Iodine.attach_io allows Iodine to take controll of an IO object (i.e., a TCP/IP Socket, a Unix Socket or a pipe).
-
Iodine.connect creates a new TCP/IP connection using the specified Protocol.
-
Iodine.listen listens to new TCP/IP connections using the specified Protocol.
-
Iodine.listen2http listens to new TCP/IP connections using the buildin HTTP / Websocket Protocol.
-
Iodine.warmup warms up any HTTP Rack applications.
-
Iodine.count counts the number of connections (including HTTP / Websocket connections).
In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation.
Pub/Sub
Iodine offers a native Pub/Sub engine (no database required) that can be easily extended by implementing a Pub/Sub PubSub::Engine.
The following methods offect server side Pub/Sub that allows the server code to react to channel event.
-
Iodine.subscribe subscribes the process to a channel (which might be different than a connection’s subscription, see Websocket).
-
Iodine.publish publishes a message to a Pub/Sub channel. The message will be sent to all subscribers - connections, other processes in the cluster and possibly other machines (when using an engine such as PubSub::RedisEngine).
-
PubSub.default_engine=, PubSub.default_engine sets or gets the default Pub/Sub PubSub::Engine. i.e., when set to a new PubSub::RedisEngine instance, all Pub/Sub method calls will use the Redis engine (unless explicitly requiring a different engine).
Websocket objects have a seperate Pub/Sub implementation that manages the subscription’s lifetime to match the connection’s lifetime and allows direct client Pub/Sub (forwards the message to the client directly without invoking the Ruby interpreter).
Note that Iodine.subscribe returns an PubSub::Subscription object that can be used for closing the subscription.
Patching Rack
Although Iodine offers Rack::Utils optimizations using monkey patching, Iodine does NOT monkey patch Rack automatically.
Choosing to monkey patch Rack::Utils could offer significant performance gains for some applications. i.e. (on my machine):
require 'iodine'
require 'rack'
# a String in need of decoding
s = '%E3%83%AB%E3%83%93%E3%82%A4%E3%82%B9%E3%81%A8'
Benchmark.bm do |bm|
# Pre-Patch
bm.report(" Rack.unescape") {1_000_000.times { Rack::Utils.unescape s } }
bm.report(" Rack.rfc2822") {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
bm.report(" Rack.rfc2109") {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
# Perform Patch
Iodine.patch_rack
puts " --- Monkey Patching Rack ---"
# Post Patch
bm.report("Patched.unescape") {1_000_000.times { Rack::Utils.unescape s } }
bm.report(" Patched.rfc2822") {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
bm.report(" Patched.rfc2109") {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
end && nil
Results:
user system total real
Rack.unescape 8.660000 0.010000 8.670000 ( 8.687807)
Rack.rfc2822 3.730000 0.000000 3.730000 ( 3.727732)
Rack.rfc2109 3.020000 0.010000 3.030000 ( 3.031940)
--- Monkey Patching Rack ---
Patched.unescape 0.340000 0.000000 0.340000 ( 0.341506)
Patched.rfc2822 0.740000 0.000000 0.740000 ( 0.737796)
Patched.rfc2109 0.690000 0.010000 0.700000 ( 0.700155)
At the moment, the extent of the monkey patching offered is very limited. As new optimizations are added, the policy regarding monkey patching (benifit vs. risks) might be re-evaluated.
Defined Under Namespace
Modules: Base, JSON, Protocol, PubSub, Rack, Websocket
Constant Summary collapse
- VERSION =
'0.5.0'.freeze
Class Method Summary collapse
- .after_fork(*args, &block) ⇒ Object
-
.attach_fd(rbfd, handler) ⇒ Object
Attaches an existing file descriptor (‘fd`) (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
-
.attach_io(io, handler) ⇒ Object
Attaches an existing IO object (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
- .before_fork(*args, &block) ⇒ Object
-
.connect(address, port, handler) ⇒ Object
Connects (as a TCP/IP client) to a remote TCP/IP server.
-
.count ⇒ Object
Returns the number of total connections managed by Iodine.
-
.default_engine ⇒ Object
Returns the default Pub/Sub engine (if any).
-
.default_engine=(en) ⇒ Object
Sets the default Pub/Sub engine to be used.
-
.default_pubsub ⇒ Object
Deprecated.
-
.default_pubsub=(en) ⇒ Object
Deprecated.
-
.listen(port, handler) ⇒ Object
Sets up a listening socket.
-
.listen2http(opt) ⇒ Object
Listens to incoming HTTP connections and handles incoming requests using the Rack specification.
-
.on_idle ⇒ Object
Schedules a single occuring event for the next idle cycle.
-
.patch_json ⇒ Object
Will monkey patch the default JSON parser to replace the default ‘JSON.parse` with JSON.parse.
-
.patch_rack ⇒ Object
Will monkey patch some Rack methods to increase their performance.
-
.processes ⇒ Object
Get/Set the number of worker processes.
-
.processes=(count) ⇒ Object
Get/Set the number of worker processes.
-
.publish(*args) ⇒ Object
Publishes a message to a channel.
- .run ⇒ Object
-
.run_after(milliseconds) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
-
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
- .running? ⇒ Boolean
-
.start ⇒ Object
Starts the Iodine event loop.
-
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub channel.
-
.threads ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool).
-
.threads=(count) ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool).
-
.warmup(app) ⇒ Object
Runs the warmup sequence.
Class Method Details
.after_fork(*args, &block) ⇒ Object
193 194 195 196 197 198 199 |
# File 'lib/iodine.rb', line 193 def self.after_fork(*args, &block) if(block) @after_fork_blocks << [args, block] else @after_fork_blocks.each {|b| b[1].call(b[0]) } end end |
.attach_fd(rbfd, handler) ⇒ Object
Attaches an existing file descriptor (‘fd`) (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
i.e.
Iodine.attach my_io_obj.to_i, MyProtocolClass.new
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 |
# File 'ext/iodine/iodine_protocol.c', line 604 static VALUE iodine_attach_fd(VALUE self, VALUE rbfd, VALUE handler) { Check_Type(rbfd, T_FIXNUM); if (handler == Qnil || handler == Qfalse) return Qfalse; intptr_t uuid = FIX2INT(rbfd); if (!uuid || uuid == -1) return Qfalse; /* make sure the uuid is connected to the sock library */ if (sock_fd2uuid(uuid) == -1) sock_open(uuid); if (TYPE(handler) == T_CLASS) { // include the Protocol module, preventing coder errors rb_include_module(handler, IodineProtocol); handler = RubyCaller.call(handler, iodine_new_func_id); } else { // include the Protocol module in the object's class VALUE p_class = rb_obj_class(handler); // include the Protocol module, preventing coder errors rb_include_module(p_class, IodineProtocol); } Registry.add(handler); on_open_dyn_protocol_instance(uuid, (void *)handler); return self; } |
.attach_io(io, handler) ⇒ Object
Attaches an existing IO object (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
i.e.
Iodine.attach my_io_obj, MyProtocolClass.new
637 638 639 640 |
# File 'ext/iodine/iodine_protocol.c', line 637 static VALUE iodine_attach_io(VALUE self, VALUE io, VALUE handler) { return iodine_attach_fd(self, RubyCaller.call(io, iodine_to_i_func_id), handler); } |
.before_fork(*args, &block) ⇒ Object
202 203 204 205 206 207 208 |
# File 'lib/iodine.rb', line 202 def self.before_fork(*args, &block) if(block) @before_fork_blocks << [args, block] else @before_fork_blocks.each {|b| b[1].call(b[0]) } end end |
.connect(address, port, handler) ⇒ Object
Connects (as a TCP/IP client) to a remote TCP/IP server.
i.e.
Iodine.connect "example.com", 5000, MyProtocolClass.new
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 |
# File 'ext/iodine/iodine_protocol.c', line 565 static VALUE iodine_connect(VALUE self, VALUE address, VALUE port, VALUE handler) { if (TYPE(handler) == T_CLASS || TYPE(handler) == T_MODULE) { // include the Protocol module, preventing coder errors rb_include_module(handler, IodineProtocol); handler = RubyCaller.call(handler, iodine_new_func_id); } else { // include the Protocol module in the object's class VALUE p_class = rb_obj_class(handler); // include the Protocol module, preventing coder errors rb_include_module(p_class, IodineProtocol); } if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING) rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String."); Registry.add(handler); if (TYPE(port) == T_FIXNUM) port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL); // connect intptr_t uuid = facil_connect(.port = StringValueCStr(port), .address = StringValueCStr(address), .udata = (void *)handler, .on_connect = on_open_dyn_protocol_instance, .on_fail = remove_from_registry); if (uuid == -1) return Qnil; iodine_set_fd(handler, uuid); return handler; (void)self; } |
.count ⇒ Object
Returns the number of total connections managed by Iodine.
67 68 69 70 71 |
# File 'ext/iodine/iodine.c', line 67 static VALUE iodine_count(VALUE self) { size_t count = facil_count(NULL); return ULL2NUM(count); (void)self; } |
.default_engine ⇒ Object
Returns the default Pub/Sub engine (if any).
See PubSub and Iodine::PubSub::Engine for more details.
557 558 559 560 |
# File 'ext/iodine/iodine_pubsub.c', line 557 static VALUE ips_get_default(VALUE self) { return rb_ivar_get(Iodine, default_pubsubid); (void)self; } |
.default_engine=(en) ⇒ Object
Sets the default Pub/Sub engine to be used.
See PubSub and Iodine::PubSub::Engine for more details.
532 533 534 535 536 537 538 539 540 541 542 543 |
# File 'ext/iodine/iodine_pubsub.c', line 532 static VALUE ips_set_default(VALUE self, VALUE en) { iodine_engine_s *e; Data_Get_Struct(en, iodine_engine_s, e); if (!e) rb_raise(rb_eArgError, "deafult engine must be an Iodine::PubSub::Engine."); if (!e->p) rb_raise(rb_eArgError, "This Iodine::PubSub::Engine is broken."); rb_ivar_set(Iodine, default_pubsubid, en); PUBSUB_DEFAULT_ENGINE = e->p; return en; (void)self; } |
.default_pubsub ⇒ Object
Deprecated. Use Iodine::PubSub.default_engine.
565 566 567 568 569 |
# File 'ext/iodine/iodine_pubsub.c', line 565 static VALUE ips_get_default_dep(VALUE self) { fprintf(stderr, "WARNING: Iodine.default_pubsub is deprecated. Use " "Iodine::PubSub.default_engine.\n"); return ips_get_default(self); } |
.default_pubsub=(en) ⇒ Object
Deprecated. Use Iodine::PubSub.default_engine=.
546 547 548 549 550 |
# File 'ext/iodine/iodine_pubsub.c', line 546 static VALUE ips_set_default_dep(VALUE self, VALUE en) { fprintf(stderr, "WARNING: Iodine.default_pubsub is deprecated. Use " "Iodine::PubSub.default_engine.\n"); return ips_set_default(self, en); } |
.listen(port, handler) ⇒ Object
Sets up a listening socket. Conncetions received at the assigned port will be handled by the assigned handler.
Multiple services (listening sockets) can be registered before starting the Iodine event loop.
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 |
# File 'ext/iodine/iodine_protocol.c', line 499 static VALUE iodine_listen(VALUE self, VALUE port, VALUE handler) { // validate that the handler is a class and include the Iodine::Protocol if (TYPE(handler) == T_CLASS) { // include the Protocol module // // do we neet to check? // if (rb_mod_include_p(protocol, rDynProtocol) == Qfalse) rb_include_module(handler, IodineProtocol); rb_extend_object(handler, IodineProtocol); } else { rb_raise(rb_eTypeError, "The connection handler MUST be of type Class."); return Qnil; } if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING) rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String."); if (TYPE(port) == T_FIXNUM) port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL); rb_ivar_set(self, rb_intern("_port"), port); // listen if (facil_listen(.port = StringValueCStr(port), .udata = (void *)handler, .on_open = on_open_dyn_protocol) == -1) return Qnil; return self; } |
.listen2http(opt) ⇒ Object
Listens to incoming HTTP connections and handles incoming requests using the Rack specification.
This is delegated to a lower level C HTTP and Websocket implementation, no Ruby object will be crated except the ‘env` object required by the Rack specifications.
Accepts a single Hash argument with the following properties:
- app
-
the Rack application that handles incoming requests. Default: ‘nil`.
- port
-
the port to listen to. Default: 3000.
- address
-
the address to bind to. Default: binds to all possible addresses.
- log
-
enable response logging (Hijacked sockets aren’t logged). Default: off.
- public
-
The root public folder for static file service. Default: none.
- timeout
-
Timeout for inactive HTTP/1.x connections. Defaults: 40 seconds.
- max_body
-
The maximum body size for incoming HTTP messages. Default: ~50Mib.
- max_msg
-
The maximum Websocket message size allowed. Default: ~250Kib.
- ping
-
The Websocket ‘ping` interval. Default: 40 seconds.
Either the ‘app` or the `public` properties are required. If niether exists, the function will fail. If both exist, Iodine will serve static files as well as dynamic requests.
When using the static file server, it’s possible to serve ‘gzip` versions of the static files by saving a compressed version with the `gz` extension (i.e. `styles.css.gz`).
‘gzip` will only be served to clients tat support the `gzip` transfer encoding.
Once HTTP/2 is supported (planned, but probably very far away), HTTP/2 timeouts will be dynamically managed by Iodine. The ‘timeout` option is only relevant to HTTP/1.x connections.
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 |
# File 'ext/iodine/iodine_http.c', line 742 VALUE iodine_http_listen(VALUE self, VALUE opt) { static int called_once = 0; uint8_t log_http = 0; size_t ping = 0; size_t max_body = 0; size_t max_headers = 0; size_t max_msg = 0; Check_Type(opt, T_HASH); VALUE app = rb_hash_aref(opt, ID2SYM(rb_intern("app"))); VALUE www = rb_hash_aref(opt, ID2SYM(rb_intern("public"))); VALUE port = rb_hash_aref(opt, ID2SYM(rb_intern("port"))); VALUE address = rb_hash_aref(opt, ID2SYM(rb_intern("address"))); VALUE tout = rb_hash_aref(opt, ID2SYM(rb_intern("timeout"))); VALUE tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_msg"))); if (tmp != Qnil && tmp != Qfalse) { Check_Type(tmp, T_FIXNUM); max_msg = FIX2ULONG(tmp); } tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_body"))); if (tmp != Qnil && tmp != Qfalse) { Check_Type(tmp, T_FIXNUM); max_body = FIX2ULONG(tmp); } tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_headers"))); if (tmp != Qnil && tmp != Qfalse) { Check_Type(tmp, T_FIXNUM); max_headers = FIX2ULONG(tmp); } tmp = rb_hash_aref(opt, ID2SYM(rb_intern("ping"))); if (tmp != Qnil && tmp != Qfalse) { Check_Type(tmp, T_FIXNUM); ping = FIX2ULONG(tmp); } if (ping > 255) { fprintf(stderr, "Iodine Warning: Websocket timeout value " "is over 255 and is silently ignored.\n"); ping = 0; } tmp = rb_hash_aref(opt, ID2SYM(rb_intern("log"))); if (tmp != Qnil && tmp != Qfalse) log_http = 1; if ((app == Qnil || app == Qfalse) && (www == Qnil || www == Qfalse)) { fprintf(stderr, "Iodine Warning: HTTP without application or public folder " "(is silently ignored).\n"); return Qfalse; } if ((www != Qnil && www != Qfalse)) { Check_Type(www, T_STRING); Registry.add(www); rb_hash_aset(env_template_no_upgrade, XSENDFILE_TYPE, XSENDFILE); rb_hash_aset(env_template_no_upgrade, XSENDFILE_TYPE_HEADER, XSENDFILE); support_xsendfile = 1; } else www = 0; if ((address != Qnil && address != Qfalse)) Check_Type(address, T_STRING); else address = 0; if ((tout != Qnil && tout != Qfalse)) { Check_Type(tout, T_FIXNUM); tout = FIX2ULONG(tout); } else tout = 0; if (tout > 255) { fprintf(stderr, "Iodine Warning: HTTP timeout value " "is over 255 and is silently ignored.\n"); tout = 0; } if (port != Qnil && port != Qfalse) { if (!RB_TYPE_P(port, T_STRING) && !RB_TYPE_P(port, T_FIXNUM)) rb_raise(rb_eTypeError, "The `port` property MUST be either a String or a Number"); if (RB_TYPE_P(port, T_FIXNUM)) port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL); Registry.add(port); } else if (port == Qfalse) port = 0; else { port = rb_str_new("3000", 4); Registry.add(port); } if ((app != Qnil && app != Qfalse)) Registry.add(app); else app = 0; if (http_listen( StringValueCStr(port), (address ? StringValueCStr(address) : NULL), .on_request = on_rack_request, .on_upgrade = on_rack_upgrade, .udata = (void *)app, .timeout = (tout ? FIX2INT(tout) : tout), .ws_timeout = ping, .ws_max_msg_size = max_msg, .max_header_size = max_headers, .on_finish = free_iodine_http, .log = log_http, .max_body_size = max_body, .public_folder = (www ? StringValueCStr(www) : NULL))) { fprintf(stderr, "ERROR: Failed to initialize a listening HTTP socket for port %s\n", port ? StringValueCStr(port) : "3000"); return Qfalse; } if ((app == Qnil || app == Qfalse)) { fprintf(stderr, "* Iodine: (no app) the HTTP service on port %s will only serve " "static files.\n", (port ? StringValueCStr(port) : "3000")); } if (called_once) { defer(iodine_print_http_msg2, (www ? (void *)www : NULL), (void *)port); } else { called_once = 1; defer(iodine_print_http_msg1, (www ? (void *)www : NULL), (void *)port); } return Qtrue; (void)self; } |
.on_idle ⇒ Object
Schedules a single occuring event for the next idle cycle.
To schedule a reoccuring event, simply reschedule the event at the end of it’s run.
i.e.
IDLE_PROC = Proc.new { puts "idle"; Iodine.on_idle &IDLE_PROC }
Iodine.on_idle &IDLE_PROC
177 178 179 180 181 182 183 184 185 186 |
# File 'ext/iodine/iodine.c', line 177 VALUE iodine_sched_on_idle(VALUE self) { rb_need_block(); VALUE block = rb_block_proc(); Registry.add(block); spn_lock(&iodine_on_idle_lock); fio_ls_push(&iodine_on_idle_list, (void *)block); spn_unlock(&iodine_on_idle_lock); return block; (void)self; } |
.patch_json ⇒ Object
Will monkey patch the default JSON parser to replace the default ‘JSON.parse` with Iodine::JSON.parse.
184 185 186 187 188 189 |
# File 'lib/iodine.rb', line 184 def self.patch_json ::JSON.class_eval do ::JSON.define_singleton_method(:parse, Iodine::JSON.instance_method(:parse) ) end end |
.patch_rack ⇒ Object
Will monkey patch some Rack methods to increase their performance.
174 175 176 177 178 179 180 181 |
# File 'lib/iodine.rb', line 174 def self.patch_rack ::Rack::Utils.class_eval do Iodine::Base::MonkeyPatch::RackUtils.methods(false).each do |m| ::Rack::Utils.define_singleton_method(m, Iodine::Base::MonkeyPatch::RackUtils.instance_method(m) ) end end end |
.processes ⇒ Object
Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.
140 141 142 |
# File 'lib/iodine.rb', line 140 def self.processes @processes end |
.processes=(count) ⇒ Object
Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.
145 146 147 |
# File 'lib/iodine.rb', line 145 def self.processes=(count) @processes = count.to_i end |
.publish(*args) ⇒ Object
Publishes a message to a channel.
Can be used using two Strings:
publish(to, )
The method accepts an optional ‘engine` argument:
publish(to, , my_pubsub_engine)
Alternatively, accepts the following named arguments:
- :to
-
The channel to publish to (required).
- :message
-
The message to be published (required).
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 |
# File 'ext/iodine/iodine_pubsub.c', line 796 VALUE iodine_publish(int argc, VALUE *argv, VALUE self) { VALUE rb_ch, rb_msg, rb_engine = Qnil; uint8_t use_pattern = 0; const pubsub_engine_s *engine = NULL; switch (argc) { case 3: /* fallthrough */ rb_engine = argv[2]; case 2: rb_ch = argv[0]; rb_msg = argv[1]; break; case 1: { /* single argument must be a Hash */ Check_Type(argv[0], T_HASH); rb_ch = rb_hash_aref(argv[0], to_sym_id); if (rb_ch == Qnil || rb_ch == Qfalse) { use_pattern = 1; rb_ch = rb_hash_aref(argv[0], match_sym_id); } rb_msg = rb_hash_aref(argv[0], ); rb_engine = rb_hash_aref(argv[0], engine_varid); } break; default: rb_raise(rb_eArgError, "method accepts 1-3 arguments."); } if (rb_msg == Qnil || rb_msg == Qfalse) { rb_raise(rb_eArgError, "message is required."); } Check_Type(rb_msg, T_STRING); if (rb_ch == Qnil || rb_ch == Qfalse) rb_raise(rb_eArgError, "channel is required ."); if (TYPE(rb_ch) == T_SYMBOL) rb_ch = rb_sym2str(rb_ch); Check_Type(rb_ch, T_STRING); if (rb_engine == Qfalse) { engine = PUBSUB_PROCESS_ENGINE; } else if (rb_engine == Qnil) { engine = NULL; } else { engine = iodine_engine_ruby2facil(rb_engine); } FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch)); FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg)); intptr_t ret = pubsub_publish(.engine = engine, .channel = ch, . = msg); fiobj_free(ch); fiobj_free(msg); if (!ret) return Qfalse; return Qtrue; (void)self; } |
.run ⇒ Object
146 147 148 149 150 151 152 153 154 155 |
# File 'ext/iodine/iodine.c', line 146 static VALUE iodine_run(VALUE self) { rb_need_block(); VALUE block = rb_block_proc(); if (block == Qnil) return Qfalse; Registry.add(block); defer(iodine_perform_deferred, (void *)block, NULL); return block; (void)self; } |
.run_after(milliseconds) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'ext/iodine/iodine.c', line 81 static VALUE iodine_run_after(VALUE self, VALUE milliseconds) { (void)(self); if (TYPE(milliseconds) != T_FIXNUM) { rb_raise(rb_eTypeError, "milliseconds must be a number"); return Qnil; } size_t milli = FIX2UINT(milliseconds); // requires a block to be passed rb_need_block(); VALUE block = rb_block_proc(); if (block == Qnil) return Qfalse; Registry.add(block); if (facil_run_every(milli, 1, iodine_run_task, (void *)block, (void (*)(void *))Registry.remove) == -1) { perror("ERROR: Iodine couldn't initialize timer"); return Qnil; } return block; } |
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).
Accepts:
- milliseconds
-
the number of milliseconds between event repetitions.
- repetitions
-
the number of event repetitions. Defaults to 0 (never ending).
- block
-
(required) a block is required, as otherwise there is nothing to
perform.
The event will repeat itself until the number of repetitions had been delpeted.
Always returns a copy of the block object.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'ext/iodine/iodine.c', line 118 static VALUE iodine_run_every(int argc, VALUE *argv, VALUE self) { (void)(self); VALUE milliseconds, repetitions, block; rb_scan_args(argc, argv, "11&", &milliseconds, &repetitions, &block); if (TYPE(milliseconds) != T_FIXNUM) { rb_raise(rb_eTypeError, "milliseconds must be a number."); return Qnil; } if (repetitions != Qnil && TYPE(repetitions) != T_FIXNUM) { rb_raise(rb_eTypeError, "repetitions must be a number or `nil`."); return Qnil; } size_t milli = FIX2UINT(milliseconds); size_t repeat = (repetitions == Qnil) ? 0 : FIX2UINT(repetitions); // requires a block to be passed rb_need_block(); Registry.add(block); if (facil_run_every(milli, repeat, iodine_run_task, (void *)block, (void (*)(void *))Registry.remove) == -1) { perror("ERROR: Iodine couldn't initialize timer"); return Qnil; } return block; } |
.running? ⇒ Boolean
326 327 328 329 |
# File 'ext/iodine/iodine.c', line 326 static VALUE iodine_is_running(VALUE self) { return (facil_is_running() ? Qtrue : Qfalse); (void)self; } |
.start ⇒ Object
Starts the Iodine event loop. This will hang the thread until an interrupt (‘^C`) signal is received.
Returns the Iodine module.
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'ext/iodine/iodine.c', line 296 static VALUE iodine_start(VALUE self) { /* re-register the Rack::Push namespace to point at Iodine */ if (rb_const_defined(rb_cObject, rb_intern("Rack"))) { VALUE rack = rb_const_get(rb_cObject, rb_intern("Rack")); if (rack != Qnil) { if (rb_const_defined(rack, rb_intern("PubSub"))) { rb_const_remove(rack, rb_intern("PubSub")); } rb_const_set(rack, rb_intern("PubSub"), Iodine); } } /* for the special Iodine::Rack object and backwards compatibility */ if (iodine_review_rack_app()) { fprintf(stderr, "ERROR: (iodine) cann't start Iodine::Rack.\n"); return Qnil; } VALUE rb_th_i = rb_iv_get(Iodine, "@threads"); VALUE rb_pr_i = rb_iv_get(Iodine, "@processes"); iodine_start_settings_s s = { .threads = ((TYPE(rb_th_i) == T_FIXNUM) ? FIX2INT(rb_th_i) : 0), .processes = ((TYPE(rb_pr_i) == T_FIXNUM) ? FIX2INT(rb_pr_i) : 0)}; RubyCaller.set_gvl_state(1); RubyCaller.leave_gvl(srv_start_no_gvl, (void *)&s); return self; } |
.subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub channel.
The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:
subscribe("my_stream") {|from, msg| p msg }
subscribe("my_stream", match: :redis) {|from, msg| p msg }
subscribe(to: "my_stream") {|from, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc
The first argument must be either a String or a Hash.
The second, optional, argument must be a Hash (if given).
The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):
- :match
-
The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.
- :to
-
The channel / subject to subscribe to.
Returns an Iodine::PubSub::Subscription object that answers to:
- close
-
closes the connection.
- to_s
-
returns the subscription’s target (stream / channel / subject).
(str) :: returns true if the string is an exact match for the target (even if the target itself is a pattern).
768 769 770 771 772 |
# File 'ext/iodine/iodine_pubsub.c', line 768 static VALUE iodine_subscribe_global(int argc, VALUE *argv, VALUE self) { // clang-format on return iodine_subscribe(argc, argv, NULL, IODINE_PUBSUB_GLOBAL); (void)self; } |
.threads ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).
130 131 132 |
# File 'lib/iodine.rb', line 130 def self.threads @threads end |
.threads=(count) ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).
135 136 137 |
# File 'lib/iodine.rb', line 135 def self.threads=(count) @threads = count.to_i end |
.warmup(app) ⇒ Object
Runs the warmup sequence. warmup attempts to get every “autoloaded” (lazy loaded) file to load now instead of waiting for “first access”. This allows multi-threaded safety and better memory utilization during forking.
However, ‘warmup` might cause undefined behavior and should be avoided when using gems that initiate network / database connections or gems that spawn threads (i.e., ActiveRecord / ActiveCable).
Use warmup when either processes or threads are set to more then 1 and gems don’t spawn threads or initialize network connections.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/iodine.rb', line 155 def self.warmup app # load anything marked with `autoload`, since autoload isn't thread safe nor fork friendly. Iodine.run do Module.constants.each do |n| begin Object.const_get(n) rescue Exception => _e end end ::Rack::Builder.new(app) do |r| r.warmup do |a| client = ::Rack::MockRequest.new(a) client.get('/') end end end end |