Class: HTTP2Adapter::StreamAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/polyphony/http/client/http2.rb

Overview

Virtualizes adapter over HTTP2 stream

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ StreamAdapter

Returns a new instance of StreamAdapter.



44
45
46
# File 'lib/polyphony/http/client/http2.rb', line 44

def initialize(connection)
  @connection = connection
end

Instance Method Details

#bodyObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/polyphony/http/client/http2.rb', line 140

def body
  @waiting_done_fiber = Fiber.current
  suspend
  @buffered_chunks.join
  # body = +''
  # while !@done
  #   p :body_suspend_pre
  #   chunk = suspend
  #   p :body_suspend_post
  #   body << chunk
  # end
  # puts ""
  # body
rescue Exception => e
  p e
  puts e.backtrace.join("\n")
end

#each_chunkObject



158
159
160
161
162
163
164
165
166
# File 'lib/polyphony/http/client/http2.rb', line 158

def each_chunk
  yield @buffered_chunks.shift until @buffered_chunks.empty?

  @waiting_chunk_fiber = Fiber.current
  until @done
    chunk = suspend
    yield chunk
  end
end

#next_body_chunkObject



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/polyphony/http/client/http2.rb', line 168

def next_body_chunk
  return yield @buffered_chunks.shift unless @buffered_chunks.empty?

  @waiting_chunk_fuber = Fiber.current
  until @done
    chunk = suspend
    return yield chunk
  end

  nil
end

#on_close(_stream) ⇒ Object



93
94
95
96
# File 'lib/polyphony/http/client/http2.rb', line 93

def on_close(_stream)
  @done = true
  @waiting_done_fiber&.schedule
end

#on_data(chunk) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/polyphony/http/client/http2.rb', line 85

def on_data(chunk)
  if @waiting_chunk_fiber
    @waiting_chunk_fiber&.schedule chunk
  else
    @buffered_chunks << chunk
  end
end

#on_headers(headers) ⇒ Object



77
78
79
80
81
82
83
# File 'lib/polyphony/http/client/http2.rb', line 77

def on_headers(headers)
  if @waiting_headers_fiber
    @waiting_headers_fiber.schedule headers.to_h
  else
    @headers = headers.to_h
  end
end

#prepare_headers(ctx) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/polyphony/http/client/http2.rb', line 128

def prepare_headers(ctx)
  headers = {
    ':method'    => ctx[:method].to_s,
    ':scheme'    => ctx[:uri].scheme,
    ':authority' => [ctx[:uri].host, ctx[:uri].port].join(':'),
    ':path'      => ctx[:uri].request_uri,
    'User-Agent' => 'curl/7.54.0'
  }
  headers.merge!(ctx[:opts][:headers]) if ctx[:opts][:headers]
  headers
end

#protocolObject



124
125
126
# File 'lib/polyphony/http/client/http2.rb', line 124

def protocol
  :http2
end

#request(ctx) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/polyphony/http/client/http2.rb', line 48

def request(ctx)
  stream = setup_stream # (ctx, stream)
  send_request(ctx, stream)

  stream.on(:headers, &method(:on_headers))
  stream.on(:data, &method(:on_data))
  stream.on(:close, &method(:on_close))

  # stream.on(:active) { puts "* active" }
  # stream.on(:half_close) { puts "* half_close" }

  wait_for_response(ctx, stream)
rescue Exception => e
  p e
  puts e.backtrace.join("\n")
  # ensure
  # stream.close
end

#send_request(ctx, stream) ⇒ Object



67
68
69
70
71
72
73
74
75
# File 'lib/polyphony/http/client/http2.rb', line 67

def send_request(ctx, stream)
  headers = prepare_headers(ctx)
  if ctx[:opts][:payload]
    stream.headers(headers, end_stream: false)
    stream.data(ctx[:opts][:payload], end_stream: true)
  else
    stream.headers(headers, end_stream: true)
  end
end

#setup_streamObject



98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/polyphony/http/client/http2.rb', line 98

def setup_stream
  stream = @connection.allocate_stream

  @headers = nil
  @done = nil
  @buffered_chunks = []

  @waiting_headers_fiber = nil
  @waiting_chunk_fiber = nil
  @waiting_done_fiber = nil

  stream
end

#wait_for_headersObject



117
118
119
120
121
122
# File 'lib/polyphony/http/client/http2.rb', line 117

def wait_for_headers
  return @headers if @headers

  @waiting_headers_fiber = Fiber.current
  suspend
end

#wait_for_response(_ctx, _stream) ⇒ Object



112
113
114
115
# File 'lib/polyphony/http/client/http2.rb', line 112

def wait_for_response(_ctx, _stream)
  headers = wait_for_headers
  Response.new(self, headers[':status'].to_i, headers)
end