Module: Iodine

Defined in:
lib/iodine.rb,
lib/iodine/cli.rb,
lib/iodine/json.rb,
lib/iodine/pubsub.rb,
lib/iodine/version.rb,
lib/iodine/protocol.rb,
lib/iodine/websocket.rb,
lib/iodine/monkeypatch.rb,
lib/rack/handler/iodine.rb,
ext/iodine/iodine.c

Overview

Iodine is both a Rack server and a platform for writing evented network services on Ruby.

Here is a sample Echo server using Iodine:

# define the protocol for our service
class EchoProtocol
  @timeout = 10
  # this is just one possible callback with a recyclable buffer
  def on_message buffer
    # write the data we received
    write "echo: #{buffer}"
    # close the connection when the time comes
    close if buffer =~ /^bye[\n\r]/
  end
end
# create the service instance
Iodine.listen 3000, EchoProtocol
# start the service
Iodine.start

Please read the README file for an introduction to Iodine and an overview of it’s API.

The API

The main API methods for the top Iodine namesapce are grouped here by subject.

Event Loop / Concurrency

Iodine manages an internal event-loop and reactor pattern. The following API manages Iodine’s behavior.

  • Iodine.threads, Iodine.threads= gets or sets the amount of threads iodine will use in it’s working thread pool.

  • Iodine.processes, Iodine.processes gets or sets the amount of processes iodine will utilize (‘fork`) to handle connections.

  • Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.

Event and Task Scheduling

  • Iodine.run schedules a block of code to run asynchronously.

  • Iodine.run_after, Iodine.run_every schedules a block of code to run (asynchronously) using a timer.

  • Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.

In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation, which allows for a connection bound task(s) to be scheduled to run within the connection’s lock (for example, Websocket#defer and Websocket#each).

Connection Handling

Iodine handles connections using Protocol objects. The following API manages either built-in or custom Protocol objects (classes / instances) in relation to their network sockets.

  • Iodine.attach_fd, Iodine.attach_io allows Iodine to take controll of an IO object (i.e., a TCP/IP Socket, a Unix Socket or a pipe).

  • Iodine.connect creates a new TCP/IP connection using the specified Protocol.

  • Iodine.listen listens to new TCP/IP connections using the specified Protocol.

  • Iodine.listen2http listens to new TCP/IP connections using the buildin HTTP / Websocket Protocol.

  • Iodine.warmup warms up any HTTP Rack applications.

  • Iodine.count counts the number of connections (including HTTP / Websocket connections).

In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation.

Pub/Sub

Iodine offers a native Pub/Sub engine (no database required) that can be easily extended by implementing a Pub/Sub PubSub::Engine.

The following methods offect server side Pub/Sub that allows the server code to react to channel event.

Websocket objects have a seperate Pub/Sub implementation that manages the subscription’s lifetime to match the connection’s lifetime and allows direct client Pub/Sub (forwards the message to the client directly without invoking the Ruby interpreter).

Note that Iodine.subscribe returns an PubSub::Subscription object that can be used for closing the subscription.

Patching Rack

Although Iodine offers Rack::Utils optimizations using monkey patching, Iodine does NOT monkey patch Rack automatically.

Choosing to monkey patch Rack::Utils could offer significant performance gains for some applications. i.e. (on my machine):

require 'iodine'
require 'rack'
# a String in need of decoding
s = '%E3%83%AB%E3%83%93%E3%82%A4%E3%82%B9%E3%81%A8'
Benchmark.bm do |bm|
  # Pre-Patch
  bm.report("   Rack.unescape")    {1_000_000.times { Rack::Utils.unescape s } }
  bm.report("    Rack.rfc2822")    {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
  bm.report("    Rack.rfc2109")    {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
  # Perform Patch
  Iodine.patch_rack
  puts "            --- Monkey Patching Rack ---"
  # Post Patch
  bm.report("Patched.unescape")    {1_000_000.times { Rack::Utils.unescape s } }
  bm.report(" Patched.rfc2822")    {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
  bm.report(" Patched.rfc2109")    {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
end && nil

Results:

user     system      total        real
Rack.unescape  8.660000   0.010000   8.670000 (  8.687807)
Rack.rfc2822  3.730000   0.000000   3.730000 (  3.727732)
Rack.rfc2109  3.020000   0.010000   3.030000 (  3.031940)
             --- Monkey Patching Rack ---
Patched.unescape  0.340000   0.000000   0.340000 (  0.341506)
Patched.rfc2822  0.740000   0.000000   0.740000 (  0.737796)
Patched.rfc2109  0.690000   0.010000   0.700000 (  0.700155)

At the moment, the extent of the monkey patching offered is very limited. As new optimizations are added, the policy regarding monkey patching (benifit vs. risks) might be re-evaluated.

Defined Under Namespace

Modules: Base, JSON, Protocol, PubSub, Rack, Websocket

Constant Summary collapse

VERSION =
'0.5.0'.freeze

Class Method Summary collapse

Class Method Details

.after_fork(*args, &block) ⇒ Object



193
194
195
196
197
198
199
# File 'lib/iodine.rb', line 193

def self.after_fork(*args, &block)
  if(block)
    @after_fork_blocks << [args, block]
  else
    @after_fork_blocks.each {|b| b[1].call(b[0]) }
  end
end

.attach_fd(rbfd, handler) ⇒ Object

Attaches an existing file descriptor (‘fd`) (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.

i.e.

Iodine.attach my_io_obj.to_i, MyProtocolClass.new


604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# File 'ext/iodine/iodine_protocol.c', line 604

static VALUE iodine_attach_fd(VALUE self, VALUE rbfd, VALUE handler) {
  Check_Type(rbfd, T_FIXNUM);
  if (handler == Qnil || handler == Qfalse)
    return Qfalse;
  intptr_t uuid = FIX2INT(rbfd);
  if (!uuid || uuid == -1)
    return Qfalse;
  /* make sure the uuid is connected to the sock library */
  if (sock_fd2uuid(uuid) == -1)
    sock_open(uuid);
  if (TYPE(handler) == T_CLASS) {
    // include the Protocol module, preventing coder errors
    rb_include_module(handler, IodineProtocol);
    handler = RubyCaller.call(handler, iodine_new_func_id);
  } else {
    // include the Protocol module in the object's class
    VALUE p_class = rb_obj_class(handler);
    // include the Protocol module, preventing coder errors
    rb_include_module(p_class, IodineProtocol);
  }
  Registry.add(handler);
  on_open_dyn_protocol_instance(uuid, (void *)handler);
  return self;
}

.attach_io(io, handler) ⇒ Object

Attaches an existing IO object (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.

i.e.

Iodine.attach my_io_obj, MyProtocolClass.new


637
638
639
640
# File 'ext/iodine/iodine_protocol.c', line 637

static VALUE iodine_attach_io(VALUE self, VALUE io, VALUE handler) {
  return iodine_attach_fd(self, RubyCaller.call(io, iodine_to_i_func_id),
                          handler);
}

.before_fork(*args, &block) ⇒ Object



202
203
204
205
206
207
208
# File 'lib/iodine.rb', line 202

def self.before_fork(*args, &block)
  if(block)
    @before_fork_blocks << [args, block]
  else
    @before_fork_blocks.each {|b| b[1].call(b[0]) }
  end
end

.connect(address, port, handler) ⇒ Object

Connects (as a TCP/IP client) to a remote TCP/IP server.

i.e.

Iodine.connect "example.com", 5000, MyProtocolClass.new


565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# File 'ext/iodine/iodine_protocol.c', line 565

static VALUE iodine_connect(VALUE self, VALUE address, VALUE port,
                            VALUE handler) {
  if (TYPE(handler) == T_CLASS || TYPE(handler) == T_MODULE) {
    // include the Protocol module, preventing coder errors
    rb_include_module(handler, IodineProtocol);
    handler = RubyCaller.call(handler, iodine_new_func_id);
  } else {
    // include the Protocol module in the object's class
    VALUE p_class = rb_obj_class(handler);
    // include the Protocol module, preventing coder errors
    rb_include_module(p_class, IodineProtocol);
  }
  if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING)
    rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String.");
  Registry.add(handler);
  if (TYPE(port) == T_FIXNUM)
    port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
  // connect
  intptr_t uuid = facil_connect(.port = StringValueCStr(port),
                                .address = StringValueCStr(address),
                                .udata = (void *)handler,
                                .on_connect = on_open_dyn_protocol_instance,
                                .on_fail = remove_from_registry);
  if (uuid == -1)
    return Qnil;
  iodine_set_fd(handler, uuid);
  return handler;
  (void)self;
}

.countObject

Returns the number of total connections managed by Iodine.



67
68
69
70
71
# File 'ext/iodine/iodine.c', line 67

static VALUE iodine_count(VALUE self) {
  size_t count = facil_count(NULL);
  return ULL2NUM(count);
  (void)self;
}

.default_engineObject

Returns the default Pub/Sub engine (if any).

See PubSub and Iodine::PubSub::Engine for more details.



557
558
559
560
# File 'ext/iodine/iodine_pubsub.c', line 557

static VALUE ips_get_default(VALUE self) {
  return rb_ivar_get(Iodine, default_pubsubid);
  (void)self;
}

.default_engine=(en) ⇒ Object

Sets the default Pub/Sub engine to be used.

See PubSub and Iodine::PubSub::Engine for more details.



532
533
534
535
536
537
538
539
540
541
542
543
# File 'ext/iodine/iodine_pubsub.c', line 532

static VALUE ips_set_default(VALUE self, VALUE en) {
  iodine_engine_s *e;
  Data_Get_Struct(en, iodine_engine_s, e);
  if (!e)
    rb_raise(rb_eArgError, "deafult engine must be an Iodine::PubSub::Engine.");
  if (!e->p)
    rb_raise(rb_eArgError, "This Iodine::PubSub::Engine is broken.");
  rb_ivar_set(Iodine, default_pubsubid, en);
  PUBSUB_DEFAULT_ENGINE = e->p;
  return en;
  (void)self;
}

.default_pubsubObject



565
566
567
568
569
# File 'ext/iodine/iodine_pubsub.c', line 565

static VALUE ips_get_default_dep(VALUE self) {
  fprintf(stderr, "WARNING: Iodine.default_pubsub is deprecated. Use "
                  "Iodine::PubSub.default_engine.\n");
  return ips_get_default(self);
}

.default_pubsub=(en) ⇒ Object



546
547
548
549
550
# File 'ext/iodine/iodine_pubsub.c', line 546

static VALUE ips_set_default_dep(VALUE self, VALUE en) {
  fprintf(stderr, "WARNING: Iodine.default_pubsub is deprecated. Use "
                  "Iodine::PubSub.default_engine.\n");
  return ips_set_default(self, en);
}

.listen(port, handler) ⇒ Object

Sets up a listening socket. Conncetions received at the assigned port will be handled by the assigned handler.

Multiple services (listening sockets) can be registered before starting the Iodine event loop.



499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'ext/iodine/iodine_protocol.c', line 499

static VALUE iodine_listen(VALUE self, VALUE port, VALUE handler) {
  // validate that the handler is a class and include the Iodine::Protocol
  if (TYPE(handler) == T_CLASS) {
    // include the Protocol module
    // // do we neet to check?
    // if (rb_mod_include_p(protocol, rDynProtocol) == Qfalse)
    rb_include_module(handler, IodineProtocol);
    rb_extend_object(handler, IodineProtocol);
  } else {
    rb_raise(rb_eTypeError, "The connection handler MUST be of type Class.");
    return Qnil;
  }
  if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING)
    rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String.");
  if (TYPE(port) == T_FIXNUM)
    port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
  rb_ivar_set(self, rb_intern("_port"), port);
  // listen
  if (facil_listen(.port = StringValueCStr(port), .udata = (void *)handler,
                   .on_open = on_open_dyn_protocol) == -1)
    return Qnil;
  return self;
}

.listen2http(opt) ⇒ Object

Listens to incoming HTTP connections and handles incoming requests using the Rack specification.

This is delegated to a lower level C HTTP and Websocket implementation, no Ruby object will be crated except the ‘env` object required by the Rack specifications.

Accepts a single Hash argument with the following properties:

app

the Rack application that handles incoming requests. Default: ‘nil`.

port

the port to listen to. Default: 3000.

address

the address to bind to. Default: binds to all possible addresses.

log

enable response logging (Hijacked sockets aren’t logged). Default: off.

public

The root public folder for static file service. Default: none.

timeout

Timeout for inactive HTTP/1.x connections. Defaults: 40 seconds.

max_body

The maximum body size for incoming HTTP messages. Default: ~50Mib.

max_msg

The maximum Websocket message size allowed. Default: ~250Kib.

ping

The Websocket ‘ping` interval. Default: 40 seconds.

Either the ‘app` or the `public` properties are required. If niether exists, the function will fail. If both exist, Iodine will serve static files as well as dynamic requests.

When using the static file server, it’s possible to serve ‘gzip` versions of the static files by saving a compressed version with the `gz` extension (i.e. `styles.css.gz`).

‘gzip` will only be served to clients tat support the `gzip` transfer encoding.

Once HTTP/2 is supported (planned, but probably very far away), HTTP/2 timeouts will be dynamically managed by Iodine. The ‘timeout` option is only relevant to HTTP/1.x connections.



742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
# File 'ext/iodine/iodine_http.c', line 742

VALUE iodine_http_listen(VALUE self, VALUE opt) {
  static int called_once = 0;
  uint8_t log_http = 0;
  size_t ping = 0;
  size_t max_body = 0;
  size_t max_headers = 0;
  size_t max_msg = 0;
  Check_Type(opt, T_HASH);
  VALUE app = rb_hash_aref(opt, ID2SYM(rb_intern("app")));
  VALUE www = rb_hash_aref(opt, ID2SYM(rb_intern("public")));
  VALUE port = rb_hash_aref(opt, ID2SYM(rb_intern("port")));
  VALUE address = rb_hash_aref(opt, ID2SYM(rb_intern("address")));
  VALUE tout = rb_hash_aref(opt, ID2SYM(rb_intern("timeout")));

  VALUE tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_msg")));
  if (tmp != Qnil && tmp != Qfalse) {
    Check_Type(tmp, T_FIXNUM);
    max_msg = FIX2ULONG(tmp);
  }

  tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_body")));
  if (tmp != Qnil && tmp != Qfalse) {
    Check_Type(tmp, T_FIXNUM);
    max_body = FIX2ULONG(tmp);
  }
  tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_headers")));
  if (tmp != Qnil && tmp != Qfalse) {
    Check_Type(tmp, T_FIXNUM);
    max_headers = FIX2ULONG(tmp);
  }

  tmp = rb_hash_aref(opt, ID2SYM(rb_intern("ping")));
  if (tmp != Qnil && tmp != Qfalse) {
    Check_Type(tmp, T_FIXNUM);
    ping = FIX2ULONG(tmp);
  }
  if (ping > 255) {
    fprintf(stderr, "Iodine Warning: Websocket timeout value "
                    "is over 255 and is silently ignored.\n");
    ping = 0;
  }

  tmp = rb_hash_aref(opt, ID2SYM(rb_intern("log")));
  if (tmp != Qnil && tmp != Qfalse)
    log_http = 1;

  if ((app == Qnil || app == Qfalse) && (www == Qnil || www == Qfalse)) {
    fprintf(stderr, "Iodine Warning: HTTP without application or public folder "
                    "(is silently ignored).\n");
    return Qfalse;
  }

  if ((www != Qnil && www != Qfalse)) {
    Check_Type(www, T_STRING);
    Registry.add(www);
    rb_hash_aset(env_template_no_upgrade, XSENDFILE_TYPE, XSENDFILE);
    rb_hash_aset(env_template_no_upgrade, XSENDFILE_TYPE_HEADER, XSENDFILE);
    support_xsendfile = 1;
  } else
    www = 0;

  if ((address != Qnil && address != Qfalse))
    Check_Type(address, T_STRING);
  else
    address = 0;

  if ((tout != Qnil && tout != Qfalse)) {
    Check_Type(tout, T_FIXNUM);
    tout = FIX2ULONG(tout);
  } else
    tout = 0;
  if (tout > 255) {
    fprintf(stderr, "Iodine Warning: HTTP timeout value "
                    "is over 255 and is silently ignored.\n");
    tout = 0;
  }

  if (port != Qnil && port != Qfalse) {
    if (!RB_TYPE_P(port, T_STRING) && !RB_TYPE_P(port, T_FIXNUM))
      rb_raise(rb_eTypeError,
               "The `port` property MUST be either a String or a Number");
    if (RB_TYPE_P(port, T_FIXNUM))
      port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
    Registry.add(port);
  } else if (port == Qfalse)
    port = 0;
  else {
    port = rb_str_new("3000", 4);
    Registry.add(port);
  }

  if ((app != Qnil && app != Qfalse))
    Registry.add(app);
  else
    app = 0;

  if (http_listen(
          StringValueCStr(port), (address ? StringValueCStr(address) : NULL),
          .on_request = on_rack_request, .on_upgrade = on_rack_upgrade,
          .udata = (void *)app, .timeout = (tout ? FIX2INT(tout) : tout),
          .ws_timeout = ping, .ws_max_msg_size = max_msg,
          .max_header_size = max_headers, .on_finish = free_iodine_http,
          .log = log_http, .max_body_size = max_body,
          .public_folder = (www ? StringValueCStr(www) : NULL))) {
    fprintf(stderr,
            "ERROR: Failed to initialize a listening HTTP socket for port %s\n",
            port ? StringValueCStr(port) : "3000");
    return Qfalse;
  }

  if ((app == Qnil || app == Qfalse)) {
    fprintf(stderr,
            "* Iodine: (no app) the HTTP service on port %s will only serve "
            "static files.\n",
            (port ? StringValueCStr(port) : "3000"));
  }
  if (called_once) {
    defer(iodine_print_http_msg2, (www ? (void *)www : NULL), (void *)port);
  } else {
    called_once = 1;
    defer(iodine_print_http_msg1, (www ? (void *)www : NULL), (void *)port);
  }

  return Qtrue;
  (void)self;
}

.on_idleObject

Schedules a single occuring event for the next idle cycle.

To schedule a reoccuring event, simply reschedule the event at the end of it’s run.

i.e.

IDLE_PROC = Proc.new { puts "idle"; Iodine.on_idle &IDLE_PROC }
Iodine.on_idle &IDLE_PROC


177
178
179
180
181
182
183
184
185
186
# File 'ext/iodine/iodine.c', line 177

VALUE iodine_sched_on_idle(VALUE self) {
  rb_need_block();
  VALUE block = rb_block_proc();
  Registry.add(block);
  spn_lock(&iodine_on_idle_lock);
  fio_ls_push(&iodine_on_idle_list, (void *)block);
  spn_unlock(&iodine_on_idle_lock);
  return block;
  (void)self;
}

.patch_jsonObject

Will monkey patch the default JSON parser to replace the default ‘JSON.parse` with Iodine::JSON.parse.



184
185
186
187
188
189
# File 'lib/iodine.rb', line 184

def self.patch_json
  ::JSON.class_eval do
      ::JSON.define_singleton_method(:parse,
            Iodine::JSON.instance_method(:parse) )
  end
end

.patch_rackObject

Will monkey patch some Rack methods to increase their performance.



174
175
176
177
178
179
180
181
# File 'lib/iodine.rb', line 174

def self.patch_rack
  ::Rack::Utils.class_eval do
    Iodine::Base::MonkeyPatch::RackUtils.methods(false).each do |m|
      ::Rack::Utils.define_singleton_method(m,
            Iodine::Base::MonkeyPatch::RackUtils.instance_method(m) )
    end
  end
end

.processesObject

Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.



140
141
142
# File 'lib/iodine.rb', line 140

def self.processes
  @processes
end

.processes=(count) ⇒ Object

Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.



145
146
147
# File 'lib/iodine.rb', line 145

def self.processes=(count)
  @processes = count.to_i
end

.publish(*args) ⇒ Object

Publishes a message to a channel.

Can be used using two Strings:

publish(to, message)

The method accepts an optional ‘engine` argument:

publish(to, message, my_pubsub_engine)

Alternatively, accepts the following named arguments:

:to

The channel to publish to (required).

:message

The message to be published (required).

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.



796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
# File 'ext/iodine/iodine_pubsub.c', line 796

VALUE iodine_publish(int argc, VALUE *argv, VALUE self) {
  VALUE rb_ch, rb_msg, rb_engine = Qnil;
  uint8_t use_pattern = 0;
  const pubsub_engine_s *engine = NULL;
  switch (argc) {
  case 3:
    /* fallthrough */
    rb_engine = argv[2];
  case 2:
    rb_ch = argv[0];
    rb_msg = argv[1];
    break;
  case 1: {
    /* single argument must be a Hash */
    Check_Type(argv[0], T_HASH);
    rb_ch = rb_hash_aref(argv[0], to_sym_id);
    if (rb_ch == Qnil || rb_ch == Qfalse) {
      use_pattern = 1;
      rb_ch = rb_hash_aref(argv[0], match_sym_id);
    }
    rb_msg = rb_hash_aref(argv[0], message_sym_id);
    rb_engine = rb_hash_aref(argv[0], engine_varid);
  } break;
  default:
    rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
  }

  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required.");
  }
  Check_Type(rb_msg, T_STRING);

  if (rb_ch == Qnil || rb_ch == Qfalse)
    rb_raise(rb_eArgError, "channel is required .");
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  if (rb_engine == Qfalse) {
    engine = PUBSUB_PROCESS_ENGINE;
  } else if (rb_engine == Qnil) {
    engine = NULL;
  } else {
    engine = iodine_engine_ruby2facil(rb_engine);
  }

  FIOBJ ch = fiobj_str_new(RSTRING_PTR(rb_ch), RSTRING_LEN(rb_ch));
  FIOBJ msg = fiobj_str_new(RSTRING_PTR(rb_msg), RSTRING_LEN(rb_msg));

  intptr_t ret =
      pubsub_publish(.engine = engine, .channel = ch, .message = msg);
  fiobj_free(ch);
  fiobj_free(msg);
  if (!ret)
    return Qfalse;
  return Qtrue;
  (void)self;
}

.runObject



146
147
148
149
150
151
152
153
154
155
# File 'ext/iodine/iodine.c', line 146

static VALUE iodine_run(VALUE self) {
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  defer(iodine_perform_deferred, (void *)block, NULL);
  return block;
  (void)self;
}

.run_after(milliseconds) ⇒ Object

Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).

Tasks scheduled before calling start will run once for every process.

Always returns a copy of the block object.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'ext/iodine/iodine.c', line 81

static VALUE iodine_run_after(VALUE self, VALUE milliseconds) {
  (void)(self);
  if (TYPE(milliseconds) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "milliseconds must be a number");
    return Qnil;
  }
  size_t milli = FIX2UINT(milliseconds);
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  if (facil_run_every(milli, 1, iodine_run_task, (void *)block,
                      (void (*)(void *))Registry.remove) == -1) {
    perror("ERROR: Iodine couldn't initialize timer");
    return Qnil;
  }
  return block;
}

.run_every(*args) ⇒ Object

Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).

Accepts:

milliseconds

the number of milliseconds between event repetitions.

repetitions

the number of event repetitions. Defaults to 0 (never ending).

block

(required) a block is required, as otherwise there is nothing to

perform.

The event will repeat itself until the number of repetitions had been delpeted.

Always returns a copy of the block object.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'ext/iodine/iodine.c', line 118

static VALUE iodine_run_every(int argc, VALUE *argv, VALUE self) {
  (void)(self);
  VALUE milliseconds, repetitions, block;

  rb_scan_args(argc, argv, "11&", &milliseconds, &repetitions, &block);

  if (TYPE(milliseconds) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "milliseconds must be a number.");
    return Qnil;
  }
  if (repetitions != Qnil && TYPE(repetitions) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "repetitions must be a number or `nil`.");
    return Qnil;
  }

  size_t milli = FIX2UINT(milliseconds);
  size_t repeat = (repetitions == Qnil) ? 0 : FIX2UINT(repetitions);
  // requires a block to be passed
  rb_need_block();
  Registry.add(block);
  if (facil_run_every(milli, repeat, iodine_run_task, (void *)block,
                      (void (*)(void *))Registry.remove) == -1) {
    perror("ERROR: Iodine couldn't initialize timer");
    return Qnil;
  }
  return block;
}

.running?Boolean

Returns:

  • (Boolean)


326
327
328
329
# File 'ext/iodine/iodine.c', line 326

static VALUE iodine_is_running(VALUE self) {
  return (facil_is_running() ? Qtrue : Qfalse);
  (void)self;
}

.startObject

Starts the Iodine event loop. This will hang the thread until an interrupt (‘^C`) signal is received.

Returns the Iodine module.



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'ext/iodine/iodine.c', line 296

static VALUE iodine_start(VALUE self) {
  /* re-register the Rack::Push namespace to point at Iodine */
  if (rb_const_defined(rb_cObject, rb_intern("Rack"))) {
    VALUE rack = rb_const_get(rb_cObject, rb_intern("Rack"));
    if (rack != Qnil) {
      if (rb_const_defined(rack, rb_intern("PubSub"))) {
        rb_const_remove(rack, rb_intern("PubSub"));
      }
      rb_const_set(rack, rb_intern("PubSub"), Iodine);
    }
  }
  /* for the special Iodine::Rack object and backwards compatibility */
  if (iodine_review_rack_app()) {
    fprintf(stderr, "ERROR: (iodine) cann't start Iodine::Rack.\n");
    return Qnil;
  }

  VALUE rb_th_i = rb_iv_get(Iodine, "@threads");
  VALUE rb_pr_i = rb_iv_get(Iodine, "@processes");

  iodine_start_settings_s s = {
      .threads = ((TYPE(rb_th_i) == T_FIXNUM) ? FIX2INT(rb_th_i) : 0),
      .processes = ((TYPE(rb_pr_i) == T_FIXNUM) ? FIX2INT(rb_pr_i) : 0)};

  RubyCaller.set_gvl_state(1);
  RubyCaller.leave_gvl(srv_start_no_gvl, (void *)&s);

  return self;
}

.subscribe(*args) ⇒ Object

Subscribes to a Pub/Sub channel.

The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:

subscribe("my_stream") {|from, msg| p msg }
subscribe("my_stream", match: :redis) {|from, msg| p msg }
subscribe(to: "my_stream")  {|from, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc

The first argument must be either a String or a Hash.

The second, optional, argument must be a Hash (if given).

The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):

:match

The channel / subject name matching type to be used. Valid value is: ‘:redis`. Future versions hope to support `:nats` and `:rabbit` patern matching as well.

:to

The channel / subject to subscribe to.

Returns an Iodine::PubSub::Subscription object that answers to:

close

closes the connection.

to_s

returns the subscription’s target (stream / channel / subject).

(str) :: returns true if the string is an exact match for the target (even if the target itself is a pattern).



768
769
770
771
772
# File 'ext/iodine/iodine_pubsub.c', line 768

static VALUE iodine_subscribe_global(int argc, VALUE *argv, VALUE self) {
  // clang-format on
  return iodine_subscribe(argc, argv, NULL, IODINE_PUBSUB_GLOBAL);
  (void)self;
}

.threadsObject

Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).



130
131
132
# File 'lib/iodine.rb', line 130

def self.threads
  @threads
end

.threads=(count) ⇒ Object

Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).



135
136
137
# File 'lib/iodine.rb', line 135

def self.threads=(count)
  @threads = count.to_i
end

.warmup(app) ⇒ Object

Runs the warmup sequence. warmup attempts to get every “autoloaded” (lazy loaded) file to load now instead of waiting for “first access”. This allows multi-threaded safety and better memory utilization during forking.

However, ‘warmup` might cause undefined behavior and should be avoided when using gems that initiate network / database connections or gems that spawn threads (i.e., ActiveRecord / ActiveCable).

Use warmup when either processes or threads are set to more then 1 and gems don’t spawn threads or initialize network connections.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/iodine.rb', line 155

def self.warmup app
  # load anything marked with `autoload`, since autoload isn't thread safe nor fork friendly.
  Iodine.run do
    Module.constants.each do |n|
      begin
        Object.const_get(n)
      rescue Exception => _e
      end
    end
    ::Rack::Builder.new(app) do |r|
      r.warmup do |a|
        client = ::Rack::MockRequest.new(a)
        client.get('/')
      end
    end
  end
end