Class: HTTP2Adapter::StreamAdapter
- Inherits:
-
Object
- Object
- HTTP2Adapter::StreamAdapter
- Defined in:
- lib/polyphony/http/client/http2.rb
Overview
Virtualizes adapter over HTTP2 stream
Instance Method Summary collapse
- #body ⇒ Object
- #each_chunk ⇒ Object
-
#initialize(connection) ⇒ StreamAdapter
constructor
A new instance of StreamAdapter.
- #next_body_chunk ⇒ Object
- #on_close(_stream) ⇒ Object
- #on_data(chunk) ⇒ Object
- #on_headers(headers) ⇒ Object
- #prepare_headers(ctx) ⇒ Object
- #protocol ⇒ Object
- #request(ctx) ⇒ Object
- #send_request(ctx, stream) ⇒ Object
- #setup_stream ⇒ Object
- #wait_for_headers ⇒ Object
- #wait_for_response(_ctx, _stream) ⇒ Object
Constructor Details
#initialize(connection) ⇒ StreamAdapter
Returns a new instance of StreamAdapter.
44 45 46 |
# File 'lib/polyphony/http/client/http2.rb', line 44 def initialize(connection) @connection = connection end |
Instance Method Details
#body ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/polyphony/http/client/http2.rb', line 140 def body @waiting_done_fiber = Fiber.current suspend @buffered_chunks.join # body = +'' # while !@done # p :body_suspend_pre # chunk = suspend # p :body_suspend_post # body << chunk # end # puts "" # body rescue Exception => e p e puts e.backtrace.join("\n") end |
#each_chunk ⇒ Object
158 159 160 161 162 163 164 165 166 |
# File 'lib/polyphony/http/client/http2.rb', line 158 def each_chunk yield @buffered_chunks.shift until @buffered_chunks.empty? @waiting_chunk_fiber = Fiber.current until @done chunk = suspend yield chunk end end |
#next_body_chunk ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/polyphony/http/client/http2.rb', line 168 def next_body_chunk return yield @buffered_chunks.shift unless @buffered_chunks.empty? @waiting_chunk_fuber = Fiber.current until @done chunk = suspend return yield chunk end nil end |
#on_close(_stream) ⇒ Object
93 94 95 96 |
# File 'lib/polyphony/http/client/http2.rb', line 93 def on_close(_stream) @done = true @waiting_done_fiber&.schedule end |
#on_data(chunk) ⇒ Object
85 86 87 88 89 90 91 |
# File 'lib/polyphony/http/client/http2.rb', line 85 def on_data(chunk) if @waiting_chunk_fiber @waiting_chunk_fiber&.schedule chunk else @buffered_chunks << chunk end end |
#on_headers(headers) ⇒ Object
77 78 79 80 81 82 83 |
# File 'lib/polyphony/http/client/http2.rb', line 77 def on_headers(headers) if @waiting_headers_fiber @waiting_headers_fiber.schedule headers.to_h else @headers = headers.to_h end end |
#prepare_headers(ctx) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/polyphony/http/client/http2.rb', line 128 def prepare_headers(ctx) headers = { ':method' => ctx[:method].to_s, ':scheme' => ctx[:uri].scheme, ':authority' => [ctx[:uri].host, ctx[:uri].port].join(':'), ':path' => ctx[:uri].request_uri, 'User-Agent' => 'curl/7.54.0' } headers.merge!(ctx[:opts][:headers]) if ctx[:opts][:headers] headers end |
#protocol ⇒ Object
124 125 126 |
# File 'lib/polyphony/http/client/http2.rb', line 124 def protocol :http2 end |
#request(ctx) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/polyphony/http/client/http2.rb', line 48 def request(ctx) stream = setup_stream # (ctx, stream) send_request(ctx, stream) stream.on(:headers, &method(:on_headers)) stream.on(:data, &method(:on_data)) stream.on(:close, &method(:on_close)) # stream.on(:active) { puts "* active" } # stream.on(:half_close) { puts "* half_close" } wait_for_response(ctx, stream) rescue Exception => e p e puts e.backtrace.join("\n") # ensure # stream.close end |
#send_request(ctx, stream) ⇒ Object
67 68 69 70 71 72 73 74 75 |
# File 'lib/polyphony/http/client/http2.rb', line 67 def send_request(ctx, stream) headers = prepare_headers(ctx) if ctx[:opts][:payload] stream.headers(headers, end_stream: false) stream.data(ctx[:opts][:payload], end_stream: true) else stream.headers(headers, end_stream: true) end end |
#setup_stream ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/polyphony/http/client/http2.rb', line 98 def setup_stream stream = @connection.allocate_stream @headers = nil @done = nil @buffered_chunks = [] @waiting_headers_fiber = nil @waiting_chunk_fiber = nil @waiting_done_fiber = nil stream end |
#wait_for_headers ⇒ Object
117 118 119 120 121 122 |
# File 'lib/polyphony/http/client/http2.rb', line 117 def wait_for_headers return @headers if @headers @waiting_headers_fiber = Fiber.current suspend end |
#wait_for_response(_ctx, _stream) ⇒ Object
112 113 114 115 |
# File 'lib/polyphony/http/client/http2.rb', line 112 def wait_for_response(_ctx, _stream) headers = wait_for_headers Response.new(self, headers[':status'].to_i, headers) end |