Module: Rainbows::EventMachine

Includes:
Base
Defined in:
lib/rainbows/event_machine.rb

Overview

Implements a basic single-threaded event model with EventMachine. It is capable of handling thousands of simultaneous client connections, but with only a single-threaded app dispatch. It is suited for slow clients, and can work with slow applications via asynchronous libraries such as async_sinatra, Cramp, and rack-fiber_pool.

It does not require your Rack application to be thread-safe, reentrancy is only required for the DevFdResponse body generator.

Compatibility: Whatever EventMachine ~> 0.12.10 and Unicorn both support, currently Ruby 1.8/1.9.

This model is compatible with users of “async.callback” in the Rack environment such as async_sinatra.

For a complete asynchronous framework, Cramp is fully supported when using this concurrency model.

This model is fully-compatible with rack-fiber_pool which allows each request to run inside its own Fiber after all request processing is complete.

Merb (and other frameworks/apps) supporting deferred? execution as documented at Rainbows::EventMachine::TryDefer

This model does not implement as streaming “rack.input” which allows the Rack application to process data as it arrives. This means “rack.input” will be fully buffered in memory or to a temporary file before the application is entered.

RubyGem Requirements

  • event_machine 0.12.10

Instance Method Summary collapse

Methods included from Base

included, #init_worker_process, #process_client, #reopen_worker_logs, #sig_receiver

Instance Method Details

#defers_finished?Boolean

Returns:

  • (Boolean)


68
69
70
71
# File 'lib/rainbows/event_machine.rb', line 68

def defers_finished?
  # EventMachine 1.0.0+ has defers_finished?
  EM.respond_to?(:defers_finished?) ? EM.defers_finished? : true
end

#em_client_classObject

Cramp (and possibly others) can subclass Rainbows::EventMachine::Client and provide the :em_client_class option. We /don’t/ want to load Rainbows::EventMachine::Client in the master process since we need reloadability.



57
58
59
60
61
62
63
64
65
66
# File 'lib/rainbows/event_machine.rb', line 57

def em_client_class
  case klass = Rainbows::O[:em_client_class]
  when Proc
    klass.call # e.g.: proc { Cramp::WebSocket::Rainbows }
  when Symbol, String
    eval(klass.to_s) # Object.const_get won't resolve multi-level paths
  else # @use should be either :EventMachine or :NeverBlock
    Rainbows.const_get(@use).const_get(:Client)
  end
end

#worker_loop(worker) ⇒ Object

runs inside each forked worker, this sits around and waits for connections and doesn’t die until the parent dies (or is given a INT, QUIT, or TERM signal)



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/rainbows/event_machine.rb', line 76

def worker_loop(worker) # :nodoc:
  init_worker_process(worker)
  server = Rainbows.server
  server.app.respond_to?(:deferred?) and
    server.app = TryDefer.new(server.app)

  # enable them both, should be non-fatal if not supported
  EM.epoll
  EM.kqueue
  logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
  client_class = em_client_class
  max = worker_connections + LISTENERS.size
  Rainbows::EventMachine::Server.const_set(:MAX, max)
  Rainbows::EventMachine::Server.const_set(:CL, client_class)
  Rainbows::EventMachine::Client.const_set(:APP, Rainbows.server.app)
  EM.run {
    conns = EM.instance_variable_get(:@conns) or
      raise RuntimeError, "EM @conns instance variable not accessible!"
    Rainbows::EventMachine::Server.const_set(:CUR, conns)
    Rainbows.at_quit do
      EM.next_tick do
        LISTENERS.clear
        conns.each_value do |c|
          case c
          when client_class
            c.quit
          when Rainbows::EventMachine::Server
            c.detach
          end
        end
      end
    end
    EM.add_periodic_timer(1) do
      if ! Rainbows.tick && conns.empty? && defers_finished? &&
          EM.reactor_running?
        EM.stop
      end
    end
    LISTENERS.map! do |s|
      EM.watch(s, Rainbows::EventMachine::Server) do |c|
        c.notify_readable = true
      end
    end
  }
  EM.reactor_thread.join if EM.reactor_running?
end