Class: ZMachine::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/zmachine/reactor.rb

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/zmachine/reactor.rb', line 24

def initialize
  @timers = TreeMap.new
  @timer_callbacks = {}
  @channels = []
  @new_channels = []
  @unbound_channels = []
  @next_signature = 0
  @shutdown_hooks = []
  @next_tick_queue = ConcurrentLinkedQueue.new
  @running = false

  # don't use a direct buffer. Ruby doesn't seem to like them.
  @read_buffer = ByteBuffer.allocate(32*1024)
end

Instance Method Details

#add_shutdown_hook(&block) ⇒ Object



39
40
41
# File 'lib/zmachine/reactor.rb', line 39

def add_shutdown_hook(&block)
  @shutdown_hooks << block
end

#add_timer(*args, &block) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/zmachine/reactor.rb', line 43

def add_timer(*args, &block)
  interval = args.shift
  callback = args.shift || block
  return unless callback

  signature = next_signature
  deadline = java.util.Date.new.time + (interval.to_f * 1000).to_i

  if @timers.contains_key(deadline)
    @timers.get(deadline) << signature
  else
    @timers.put(deadline, [signature])
  end

  @timer_callbacks[signature] = callback
  signature
end

#cancel_timer(timer_or_sig) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/zmachine/reactor.rb', line 61

def cancel_timer(timer_or_sig)
  if timer_or_sig.respond_to?(:cancel)
    timer_or_sig.cancel
  else
    @timer_callbacks[timer_or_sig] = false if @timer_callbacks.has_key?(timer_or_sig)
  end
end

#connect(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/zmachine/reactor.rb', line 69

def connect(server, port_or_type=nil, handler=nil, *args, &block)
  if server.nil? or server =~ %r{\w+://}
    _connect_zmq(server, port_or_type, handler, *args, &block)
  else
    _connect_tcp(server, port_or_type, handler, *args, &block)
  end
end

#connection_countObject



77
78
79
# File 'lib/zmachine/reactor.rb', line 77

def connection_count
  @channels.size
end

#error_handler(callback = nil, &block) ⇒ Object



81
82
83
# File 'lib/zmachine/reactor.rb', line 81

def error_handler(callback = nil, &block)
  @error_handler = callback || block
end

#next_tick(callback = nil, &block) ⇒ Object



85
86
87
88
# File 'lib/zmachine/reactor.rb', line 85

def next_tick(callback=nil, &block)
  @next_tick_queue << (callback || block)
  signal_loopbreak if running?
end

#reactor_running?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/zmachine/reactor.rb', line 125

def reactor_running?
  @running || false
end

#run(callback = nil, shutdown_hook = nil, &block) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/zmachine/reactor.rb', line 90

def run(callback=nil, shutdown_hook=nil, &block)
  @callback = callback || block

  add_shutdown_hook(shutdown_hook) if shutdown_hook

  begin
    @running = true

    add_timer(0, @callback) if @callback

    @selector = Selector.open
    @run_reactor = true

    while @run_reactor
      run_deferred_callbacks
      break unless @run_reactor
      run_timers
      break unless @run_reactor
      remove_unbound_channels
      check_io
      add_new_channels
      process_io
    end
  ensure
    @selector.close rescue nil
    @selector = nil
    @unbound_channels += @channels
    remove_unbound_channels
    @shutdown_hooks.pop.call until @shutdown_hooks.empty?
    @next_tick_queue = ConcurrentLinkedQueue.new
    @running = false
    ZMachine.context.destroy
  end
end

#start_server(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object



129
130
131
132
133
134
135
# File 'lib/zmachine/reactor.rb', line 129

def start_server(server, port_or_type=nil, handler=nil, *args, &block)
  if server =~ %r{\w+://}
    _bind_zmq(server, port_or_type, handler, *args, &block)
  else
    _bind_tcp(server, port_or_type, handler, *args, &block)
  end
end

#stop_event_loopObject



137
138
139
140
# File 'lib/zmachine/reactor.rb', line 137

def stop_event_loop
  @run_reactor = false
  signal_loopbreak
end

#stop_server(channel) ⇒ Object



142
143
144
# File 'lib/zmachine/reactor.rb', line 142

def stop_server(channel)
  channel.close
end