Class: SurroGate::Selector
- Inherits:
-
Object
- Object
- SurroGate::Selector
- Defined in:
- lib/surro-gate/selector.rb,
ext/surro-gate/selector_ext.c
Instance Method Summary collapse
- #each_ready ⇒ Object
-
#initialize(logger) ⇒ Selector
constructor
A new instance of Selector.
- #pop(sockets) ⇒ Object
- #push(left, right) ⇒ Object
- #select(timeout) ⇒ Object
Constructor Details
#initialize(logger) ⇒ Selector
Returns a new instance of Selector.
91 92 93 94 95 96 97 98 99 100 |
# File 'ext/surro-gate/selector_ext.c', line 91 def initialize(logger) @logger = logger @pairing = Concurrent::Array.new @sockets = Concurrent::Array.new @reads = Concurrent::Array.new @writes = Concurrent::Array.new @mutex = Mutex.new end |
Instance Method Details
#each_ready ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'ext/surro-gate/selector_ext.c', line 197 def each_ready @pairing.each do |pair| next unless pair.ready? yield(pair.left, pair.right) pair.instance_variable_set(:@rd_rdy, false) pair.instance_variable_set(:@wr_rdy, false) @reads.push(pair.left) @writes.push(pair.right) end end |
#pop(sockets) ⇒ Object
134 135 136 137 138 139 140 141 142 |
# File 'ext/surro-gate/selector_ext.c', line 134 def pop(*sockets) [@sockets, @reads, @writes].each do |arr| arr.delete_if { |sock| sockets.include?(sock) } end @pairing.delete_if { |pair| pairing_compare(pair, sockets) } nil end |
#push(left, right) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'ext/surro-gate/selector_ext.c', line 101 def push(left, right) raise TypeError unless left.is_a?(IO) && right.is_a?(IO) raise ArgumentError if @pairing.detect { |pair| [left, right].include?(pair.left) || [left, right].include?(pair.right) } left_to_right = SurroGate::Pair.new(left, right) right_to_left = SurroGate::Pair.new(right, left) # The method can be called from a different thread @mutex.synchronize do @pairing.push(left_to_right, right_to_left) @sockets.push(left, right) @reads.push(left, right) @writes.push(left, right) end true end |
#select(timeout) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'ext/surro-gate/selector_ext.c', line 148 def select(timeout) begin read, write, error = @mutex.synchronize { IO.select(@reads, @writes, @sockets, timeout * 0.001) } rescue IOError # One of the sockets is closed, Pair#ready? will catch it end error.to_a.each do ltr = find_pairing(sock, :left) rtl = find_pairing(sock, :right) [ltr, rtl].each do |pair| %i[@rd_rdy @wr_rdy].each do |ivar| pair.instance_variable_set(ivar, true) end end end read.to_a.each do |sock| @reads.delete(sock) find_pairing(sock, :left).instance_variable_set(:@rd_rdy, true) end write.to_a.each do |sock| @writes.delete(sock) find_pairing(sock, :right).instance_variable_set(:@wr_rdy, true) end read.to_a.length + write.to_a.length end |