Class: SurroGate::Selector
- Inherits:
-
Object
- Object
- SurroGate::Selector
- Defined in:
- lib/surro-gate/selector.rb,
ext/surro-gate/selector_ext.c
Instance Method Summary collapse
- #each_ready ⇒ Object
-
#initialize(logger) ⇒ Selector
constructor
A new instance of Selector.
- #pop(left, right) ⇒ Object
- #push(left, right) ⇒ Object
- #select(timeout) ⇒ Object
Constructor Details
#initialize(logger) ⇒ Selector
Returns a new instance of Selector.
77 78 79 80 81 82 83 84 85 86 |
# File 'ext/surro-gate/selector_ext.c', line 77 def initialize(logger) @logger = logger @pairing = Concurrent::Array.new @sockets = Concurrent::Array.new @reads = Concurrent::Array.new @writes = Concurrent::Array.new @mutex = Mutex.new end |
Instance Method Details
#each_ready ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'ext/surro-gate/selector_ext.c', line 162 def each_ready @pairing.each do |pair| next unless pair.ready? yield(pair.left, pair.right) pair.instance_variable_set(:@rd_rdy, false) pair.instance_variable_set(:@wr_rdy, false) @reads.push(pair.left) @writes.push(pair.right) end end |
#pop(left, right) ⇒ Object
106 107 108 109 110 111 112 113 114 |
# File 'ext/surro-gate/selector_ext.c', line 106 def pop(*sockets) [@sockets, @reads, @writes].each do |arr| arr.delete_if { |sock| sockets.include?(sock) } end @pairing.delete_if { |pair| pairing_compare(pair, sockets) } nil end |
#push(left, right) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'ext/surro-gate/selector_ext.c', line 84 def push(left, right) raise TypeError unless left.is_a?(IO) && right.is_a?(IO) raise ArgumentError if @pairing.detect { |pair| [left, right].include?(pair.left) || [left, right].include?(pair.right) } left_to_right = SurroGate::Pair.new(left, right) right_to_left = SurroGate::Pair.new(right, left) # The method can be called from a different thread @mutex.synchronize do @pairing.push(left_to_right, right_to_left) @sockets.push(left, right) @reads.push(left, right) @writes.push(left, right) end true end |
#select(timeout) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'ext/surro-gate/selector_ext.c', line 119 def select(timeout) begin read, write, error = @mutex.synchronize { IO.select(@reads, @writes, @sockets, timeout * 0.001) } rescue IOError # One of the sockets is closed, Pair#ready? will catch it end error.to_a.each do ltr = find_pairing(sock, :left) rtl = find_pairing(sock, :right) [ltr, rtl].each do |pair| %i[@rd_rdy @wr_rdy].each do |ivar| pair.instance_variable_set(ivar, true) end end end read.to_a.each do |sock| @reads.delete(sock) find_pairing(sock, :left).instance_variable_set(:@rd_rdy, true) end write.to_a.each do |sock| @writes.delete(sock) find_pairing(sock, :right).instance_variable_set(:@wr_rdy, true) end read.to_a.length + write.to_a.length end |