Class: Arf::Wire::BaseConnection
- Inherits:
-
Object
- Object
- Arf::Wire::BaseConnection
- Defined in:
- lib/arf/wire/base_connection.rb
Direct Known Subclasses
Instance Method Summary collapse
- #arf_close_after_writing? ⇒ Boolean
- #cancel_streams(code) ⇒ Object
- #close ⇒ Object
- #close_connection ⇒ Object
- #close_connection_after_writing ⇒ Object
- #dispatch(frame) ⇒ Object
- #dispatch_frame(type, terminate: false) ⇒ Object
- #fetch_stream(id) ⇒ Object
- #flush_write_buffer(io) ⇒ Object
- #go_away!(code, extra_data: nil, terminate: false) ⇒ Object
- #handle_frame(raw_frame) ⇒ Object
- #handle_ping(fr) ⇒ Object
-
#initialize ⇒ BaseConnection
constructor
A new instance of BaseConnection.
- #ping ⇒ Object
- #protocol_error! ⇒ Object
- #recv(data) ⇒ Object
- #registered! ⇒ Object
- #registered? ⇒ Boolean
- #send_data(data) ⇒ Object
- #wait_configuration ⇒ Object
- #wait_pong ⇒ Object
Constructor Details
#initialize ⇒ BaseConnection
Returns a new instance of BaseConnection.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/arf/wire/base_connection.rb', line 6 def initialize @_write_lock = Monitor.new @_write_buffer = Queue.new @_write_head = nil @compression = :none @last_stream_id = 0 @reader = FrameReader.new @configured = false @pong_signal = WaitSignal.new @streams = {} @streams_monitor = Monitor.new @configuration_ready_lock = Mutex.new @configuration_ready_signal = Thread::ConditionVariable.new @registered = false end |
Instance Method Details
#arf_close_after_writing? ⇒ Boolean
98 99 100 |
# File 'lib/arf/wire/base_connection.rb', line 98 def arf_close_after_writing? @arf_close_after_writing || false end |
#cancel_streams(code) ⇒ Object
75 76 77 78 79 80 81 82 83 |
# File 'lib/arf/wire/base_connection.rb', line 75 def cancel_streams(code) @streams_monitor.synchronize do @streams.each_value do |v| v.reset(code) rescue ClosedStreamError, StreamResetError next end end end |
#close ⇒ Object
104 105 106 |
# File 'lib/arf/wire/base_connection.rb', line 104 def close close_connection end |
#close_connection ⇒ Object
102 |
# File 'lib/arf/wire/base_connection.rb', line 102 def close_connection = Reactor.detach_client(self) |
#close_connection_after_writing ⇒ Object
108 109 110 |
# File 'lib/arf/wire/base_connection.rb', line 108 def close_connection_after_writing @arf_close_after_writing = true end |
#dispatch(frame) ⇒ Object
139 140 141 |
# File 'lib/arf/wire/base_connection.rb', line 139 def dispatch(frame) send_data(frame.to_frame.bytes(@compression)) end |
#dispatch_frame(type, terminate: false) ⇒ Object
134 135 136 137 |
# File 'lib/arf/wire/base_connection.rb', line 134 def dispatch_frame(type, terminate: false, &) dispatch(type.new(&)) close_connection_after_writing if terminate end |
#fetch_stream(id) ⇒ Object
96 |
# File 'lib/arf/wire/base_connection.rb', line 96 def fetch_stream(id) = @streams_monitor.synchronize { @streams[id] } |
#flush_write_buffer(io) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/arf/wire/base_connection.rb', line 153 def flush_write_buffer(io) @_write_lock.synchronize do loop do if @_write_head.nil? return true if @_write_buffer.empty? @_write_head = @_write_buffer.pop end written = io.write_nonblock(@_write_head, exception: false) case written when :wait_writable return false when @_write_head.bytesize @_write_head = nil else @_write_head = @_write_head.byteslice(written, @_write_head.bytesize) return false end end end end |
#go_away!(code, extra_data: nil, terminate: false) ⇒ Object
145 146 147 148 149 150 151 |
# File 'lib/arf/wire/base_connection.rb', line 145 def go_away!(code, extra_data: nil, terminate: false) dispatch_frame(GoAwayFrame, terminate:) do |g| g.last_stream_id = @last_stream_id g.error_code = code g.additional_data = extra_data if extra_data end end |
#handle_frame(raw_frame) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/arf/wire/base_connection.rb', line 53 def handle_frame(raw_frame) frame = raw_frame.specialize(@compression) case frame when ConfigurationFrame handle_configuration(frame) when PingFrame handle_ping(frame) when GoAwayFrame handle_go_away(frame) when MakeStreamFrame handle_make_stream(frame) when ResetStreamFrame handle_reset_stream(frame) when DataFrame handle_data(frame) else protocol_error! end rescue UnknownFrameKindError, InvalidFrameLengthError protocol_error! end |
#handle_ping(fr) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/arf/wire/base_connection.rb', line 120 def handle_ping(fr) return protocol_error! unless @configured if fr.ack? @pong_signal.broadcast return end dispatch_frame(PingFrame) do |p| p.ack! p.payload = fr.payload end end |
#ping ⇒ Object
85 86 87 88 89 90 91 92 |
# File 'lib/arf/wire/base_connection.rb', line 85 def ping wait_configuration @streams_monitor.synchronize do dispatch_frame(PingFrame) do |p| p.payload = SecureRandom.bytes(8) end end end |
#protocol_error! ⇒ Object
143 |
# File 'lib/arf/wire/base_connection.rb', line 143 def protocol_error! = go_away!(ERROR_CODE_PROTOCOL_ERROR, terminate: true) |
#recv(data) ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/arf/wire/base_connection.rb', line 44 def recv(data) data.each_byte do |b| v = @reader.feed(b) next unless v handle_frame(v) end end |
#registered! ⇒ Object
26 27 28 29 30 31 |
# File 'lib/arf/wire/base_connection.rb', line 26 def registered! @registered = true @_write_lock.synchronize do Arf::Reactor.client_writes_pending(self) unless @_write_buffer.empty? end end |
#registered? ⇒ Boolean
24 |
# File 'lib/arf/wire/base_connection.rb', line 24 def registered? = @registered |
#send_data(data) ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/arf/wire/base_connection.rb', line 112 def send_data(data) @_write_lock.synchronize do @_write_buffer << data end Reactor.client_writes_pending(self) if registered? data.bytesize end |
#wait_configuration ⇒ Object
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/arf/wire/base_connection.rb', line 33 def wait_configuration return if @configured loop do @configuration_ready_lock.synchronize do @configuration_ready_signal.wait(@configuration_ready_lock) return if @configured end end end |
#wait_pong ⇒ Object
94 |
# File 'lib/arf/wire/base_connection.rb', line 94 def wait_pong = @pong_signal.wait |