Class: ZMachine::Reactor
- Inherits:
-
Object
- Object
- ZMachine::Reactor
- Defined in:
- lib/zmachine/reactor.rb
Instance Method Summary collapse
- #add_shutdown_hook(&block) ⇒ Object
- #add_timer(*args, &block) ⇒ Object
- #cancel_timer(timer_or_sig) ⇒ Object
- #connect(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
- #connection_count ⇒ Object
- #error_handler(callback = nil, &block) ⇒ Object
-
#initialize ⇒ Reactor
constructor
A new instance of Reactor.
- #next_tick(callback = nil, &block) ⇒ Object
- #reactor_running? ⇒ Boolean
- #run(callback = nil, shutdown_hook = nil, &block) ⇒ Object
- #start_server(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
- #stop_event_loop ⇒ Object
- #stop_server(channel) ⇒ Object
Constructor Details
#initialize ⇒ Reactor
Returns a new instance of Reactor.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/zmachine/reactor.rb', line 24 def initialize @timers = TreeMap.new @timer_callbacks = {} @channels = [] @new_channels = [] @unbound_channels = [] @next_signature = 0 @shutdown_hooks = [] @next_tick_queue = ConcurrentLinkedQueue.new @running = false # don't use a direct buffer. Ruby doesn't seem to like them. @read_buffer = ByteBuffer.allocate(32*1024) end |
Instance Method Details
#add_shutdown_hook(&block) ⇒ Object
39 40 41 |
# File 'lib/zmachine/reactor.rb', line 39 def add_shutdown_hook(&block) @shutdown_hooks << block end |
#add_timer(*args, &block) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/zmachine/reactor.rb', line 43 def add_timer(*args, &block) interval = args.shift callback = args.shift || block return unless callback signature = next_signature deadline = java.util.Date.new.time + (interval.to_f * 1000).to_i if @timers.contains_key(deadline) @timers.get(deadline) << signature else @timers.put(deadline, [signature]) end @timer_callbacks[signature] = callback signature end |
#cancel_timer(timer_or_sig) ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/zmachine/reactor.rb', line 61 def cancel_timer(timer_or_sig) if timer_or_sig.respond_to?(:cancel) timer_or_sig.cancel else @timer_callbacks[timer_or_sig] = false if @timer_callbacks.has_key?(timer_or_sig) end end |
#connect(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
69 70 71 72 73 74 75 |
# File 'lib/zmachine/reactor.rb', line 69 def connect(server, port_or_type=nil, handler=nil, *args, &block) if server.nil? or server =~ %r{\w+://} _connect_zmq(server, port_or_type, handler, *args, &block) else _connect_tcp(server, port_or_type, handler, *args, &block) end end |
#connection_count ⇒ Object
77 78 79 |
# File 'lib/zmachine/reactor.rb', line 77 def connection_count @channels.size end |
#error_handler(callback = nil, &block) ⇒ Object
81 82 83 |
# File 'lib/zmachine/reactor.rb', line 81 def error_handler(callback = nil, &block) @error_handler = callback || block end |
#next_tick(callback = nil, &block) ⇒ Object
85 86 87 88 |
# File 'lib/zmachine/reactor.rb', line 85 def next_tick(callback=nil, &block) @next_tick_queue << (callback || block) signal_loopbreak if running? end |
#reactor_running? ⇒ Boolean
125 126 127 |
# File 'lib/zmachine/reactor.rb', line 125 def reactor_running? @running || false end |
#run(callback = nil, shutdown_hook = nil, &block) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/zmachine/reactor.rb', line 90 def run(callback=nil, shutdown_hook=nil, &block) @callback = callback || block add_shutdown_hook(shutdown_hook) if shutdown_hook begin @running = true add_timer(0, @callback) if @callback @selector = Selector.open @run_reactor = true while @run_reactor run_deferred_callbacks break unless @run_reactor run_timers break unless @run_reactor remove_unbound_channels check_io add_new_channels process_io end ensure @selector.close rescue nil @selector = nil @unbound_channels += @channels remove_unbound_channels @shutdown_hooks.pop.call until @shutdown_hooks.empty? @next_tick_queue = ConcurrentLinkedQueue.new @running = false ZMachine.context.destroy end end |
#start_server(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
129 130 131 132 133 134 135 |
# File 'lib/zmachine/reactor.rb', line 129 def start_server(server, port_or_type=nil, handler=nil, *args, &block) if server =~ %r{\w+://} _bind_zmq(server, port_or_type, handler, *args, &block) else _bind_tcp(server, port_or_type, handler, *args, &block) end end |
#stop_event_loop ⇒ Object
137 138 139 140 |
# File 'lib/zmachine/reactor.rb', line 137 def stop_event_loop @run_reactor = false signal_loopbreak end |
#stop_server(channel) ⇒ Object
142 143 144 |
# File 'lib/zmachine/reactor.rb', line 142 def stop_server(channel) channel.close end |