Class: H2::Client
- Inherits:
-
Object
- Object
- H2::Client
- Extended by:
- Celluloid::ClassMethods, Concurrent::ClassMethods
- Includes:
- Blockable, Celluloid, Concurrent, ExceptionlessIO, On
- Defined in:
- lib/h2/client.rb,
lib/h2/client/celluloid.rb,
lib/h2/client/concurrent.rb,
lib/h2/client/tcp_socket.rb
Defined Under Namespace
Modules: Celluloid, Concurrent, ExceptionlessIO Classes: ReadGate, TCPSocket
Constant Summary collapse
- PARSER_EVENTS =
[ :close, :frame, :frame_sent, :goaway, :promise ]
- ALPN_PROTOCOLS =
include FrameDebugger
['h2']
- DEFAULT_MAXLEN =
4096
- RE_IP_ADDR =
Regexp.union Resolv::IPv4::Regex, Resolv::IPv6::Regex
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#last_stream ⇒ Object
Returns the value of attribute last_stream.
-
#reader ⇒ Object
readonly
Returns the value of attribute reader.
-
#scheme ⇒ Object
readonly
Returns the value of attribute scheme.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
Instance Method Summary collapse
-
#_read(maxlen = DEFAULT_MAXLEN) ⇒ Object
underyling read loop implementation, handling returned
Symbol
values and shovelling data into the client parser. -
#add_params(params, path) ⇒ Object
add query string parameters the given request path
String
. -
#add_stream(method:, path:, stream:, &block) ⇒ Object
creates a new stream and adds it to the @streams
Hash
keyed at both the methodSymbol
and request path as well as the ID of the stream. -
#bind_events ⇒ Object
binds all connection events to their respective on_ handlers.
-
#build_headers(method:, path:, headers:) ⇒ Object
builds headers
Hash
with appropriate ordering. -
#close ⇒ Object
close the connection.
-
#closed? ⇒ Boolean
True if the connection is closed.
-
#connect ⇒ Object
initiate the connection.
- #connected? ⇒ Boolean
-
#create_ssl_context ⇒ Object
builds a new SSLContext suitable for use in ‘h2’ connections.
- #eof? ⇒ Boolean
-
#goaway(block: false) ⇒ Object
send a goaway frame and optionally wait for the connection to be closed.
-
#goaway! ⇒ Object
send a goaway frame and wait until the connection is closed.
-
#initialize(host: nil, port: 443, url: nil, lazy: true, tls: {}) {|_self| ... } ⇒ H2::Client
constructor
create a new h2 client.
-
#on_close ⇒ Object
close callback for parser: calls custom handler, then closes connection.
-
#on_frame(bytes) ⇒ Object
frame callback for parser: writes bytes to the @socket, and slicing appropriately for given return values.
-
#on_frame_sent(frame) ⇒ Object
frame_sent callback for parser: used to wait for initial settings frame to be sent by the client (post-connection-preface) before the read thread responds to server settings frame with ack.
-
#on_goaway(*args) ⇒ Object
goaway callback for parser: calls custom handler, then closes connection.
-
#on_promise(promise) ⇒ Object
push promise callback for parser: creates new
Stream
with appropriate parent, binds close event, calls custom handler. -
#read(maxlen = DEFAULT_MAXLEN) ⇒ Object
creates a new
Thread
to read the given number of bytes each loop from the current @socket. -
#read_from_socket(maxlen) ⇒ Object
fake exceptionless IO for reading on older ruby versions.
-
#request(method:, path:, headers: {}, params: {}, body: nil) {|H2::Stream| ... } ⇒ H2::Stream
initiate a
Stream
by making a request with the given HTTP method. -
#selector ⇒ Object
maintain a ivar for the
Array
to send toIO.select
. - #set_ssl_context_protocols(ctx) ⇒ Object
-
#stringify_headers(hash) ⇒ Object
mutates the given hash into
String
keys and values. -
#tls_socket(socket) ⇒ Object
build, configure, and return TLS socket.
-
#write_to_socket(bytes) ⇒ Object
fake exceptionless IO for writing on older ruby versions.
Methods included from Celluloid::ClassMethods
Methods included from Concurrent::ClassMethods
Methods included from On
Methods included from Blockable
#block!, #init_blocking, #unblock!
Constructor Details
#initialize(host: nil, port: 443, url: nil, lazy: true, tls: {}) {|_self| ... } ⇒ H2::Client
create a new h2 client
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/h2/client.rb', line 38 def initialize host: nil, port: 443, url: nil, lazy: true, tls: {} raise ArgumentError if url.nil? && (host.nil? || port.nil?) if url url = URI.parse url unless URI === url @host = url.host @port = url.port @scheme = url.scheme tls = false if 'http' == @scheme else @host = host @port = port @scheme = tls ? 'https' : 'http' end @tls = tls @streams = {} @client = HTTP2::Client.new @read_gate = ReadGate.new init_blocking yield self if block_given? bind_events connect unless lazy end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
25 26 27 |
# File 'lib/h2/client.rb', line 25 def client @client end |
#last_stream ⇒ Object
Returns the value of attribute last_stream.
24 25 26 |
# File 'lib/h2/client.rb', line 24 def last_stream @last_stream end |
#reader ⇒ Object (readonly)
Returns the value of attribute reader.
25 26 27 |
# File 'lib/h2/client.rb', line 25 def reader @reader end |
#scheme ⇒ Object (readonly)
Returns the value of attribute scheme.
25 26 27 |
# File 'lib/h2/client.rb', line 25 def scheme @scheme end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
25 26 27 |
# File 'lib/h2/client.rb', line 25 def socket @socket end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
25 26 27 |
# File 'lib/h2/client.rb', line 25 def streams @streams end |
Instance Method Details
#_read(maxlen = DEFAULT_MAXLEN) ⇒ Object
underyling read loop implementation, handling returned Symbol
values and shovelling data into the client parser
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/h2/client.rb', line 238 def _read maxlen = DEFAULT_MAXLEN begin data = nil loop do data = read_from_socket maxlen case data when :wait_readable IO.select selector when NilClass break else begin @client << data rescue HTTP2::Error::ProtocolError => pe STDERR.puts "protocol error: #{pe.}" STDERR.puts pe.backtrace.map {|l| "\t" + l} end end end rescue IOError, Errno::EBADF close ensure unblock! end end |
#add_params(params, path) ⇒ Object
add query string parameters the given request path String
196 197 198 199 200 |
# File 'lib/h2/client.rb', line 196 def add_params params, path appendage = path.index('?') ? '&' : '?' path << appendage path << URI.encode_www_form(params) end |
#add_stream(method:, path:, stream:, &block) ⇒ Object
creates a new stream and adds it to the @streams Hash
keyed at both the method Symbol
and request path as well as the ID of the stream.
185 186 187 188 189 190 191 192 |
# File 'lib/h2/client.rb', line 185 def add_stream method:, path:, stream:, &block @streams[method] ||= {} @streams[method][path] ||= [] stream = Stream.new client: self, stream: stream, &block unless Stream === stream @streams[method][path] << stream @streams[stream.id] = stream stream end |
#bind_events ⇒ Object
binds all connection events to their respective on_ handlers
115 116 117 118 119 |
# File 'lib/h2/client.rb', line 115 def bind_events PARSER_EVENTS.each do |e| @client.on(e){|*a| __send__ "on_#{e}", *a} end end |
#build_headers(method:, path:, headers:) ⇒ Object
builds headers Hash
with appropriate ordering
172 173 174 175 176 177 178 179 180 |
# File 'lib/h2/client.rb', line 172 def build_headers method:, path:, headers: h = { AUTHORITY_KEY => [@host, @port.to_s].join(':'), METHOD_KEY => method.to_s.upcase, PATH_KEY => path, SCHEME_KEY => @scheme }.merge USER_AGENT h.merge! stringify_headers(headers) end |
#close ⇒ Object
close the connection
85 86 87 88 |
# File 'lib/h2/client.rb', line 85 def close unblock! socket.close unless closed? end |
#closed? ⇒ Boolean
Returns true if the connection is closed.
79 80 81 |
# File 'lib/h2/client.rb', line 79 def closed? connected? && socket.closed? end |
#connect ⇒ Object
initiate the connection
67 68 69 70 71 |
# File 'lib/h2/client.rb', line 67 def connect @socket = TCPSocket.new(@host, @port) @socket = tls_socket socket if @tls read end |
#connected? ⇒ Boolean
73 74 75 |
# File 'lib/h2/client.rb', line 73 def connected? !!socket end |
#create_ssl_context ⇒ Object
builds a new SSLContext suitable for use in ‘h2’ connections
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/h2/client.rb', line 373 def create_ssl_context ctx = OpenSSL::SSL::SSLContext.new ctx.ca_file = @tls[:ca_file] if @tls[:ca_file] ctx.ca_path = @tls[:ca_path] if @tls[:ca_path] ctx.ciphers = @tls[:ciphers] || OpenSSL::SSL::SSLContext::DEFAULT_PARAMS[:ciphers] ctx. = @tls[:options] || OpenSSL::SSL::SSLContext::DEFAULT_PARAMS[:options] ctx.ssl_version = :TLSv1_2 ctx.verify_mode = @tls[:verify_mode] || ( OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT ) # https://github.com/jruby/jruby-openssl/issues/99 set_ssl_context_protocols ctx unless H2.jruby? ctx end |
#eof? ⇒ Boolean
90 91 92 |
# File 'lib/h2/client.rb', line 90 def eof? socket.eof? end |
#goaway(block: false) ⇒ Object
send a goaway frame and optionally wait for the connection to be closed
107 108 109 110 111 |
# File 'lib/h2/client.rb', line 107 def goaway block: false return false if closed? @client.goaway block! if block end |
#goaway! ⇒ Object
send a goaway frame and wait until the connection is closed
96 97 98 |
# File 'lib/h2/client.rb', line 96 def goaway! goaway block: true end |
#on_close ⇒ Object
close callback for parser: calls custom handler, then closes connection
280 281 282 283 |
# File 'lib/h2/client.rb', line 280 def on_close on :close close end |
#on_frame(bytes) ⇒ Object
frame callback for parser: writes bytes to the @socket, and slicing appropriately for given return values
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/h2/client.rb', line 290 def on_frame bytes on :frame, bytes if ::H2::Client::TCPSocket === socket total = bytes.bytesize loop do n = write_to_socket bytes if n == :wait_writable IO.select nil, socket.selector elsif n < total bytes = bytes.byteslice n, total else break end end else socket.write bytes end socket.flush end |
#on_frame_sent(frame) ⇒ Object
frame_sent callback for parser: used to wait for initial settings frame to be sent by the client (post-connection-preface) before the read thread responds to server settings frame with ack
315 316 317 318 319 320 |
# File 'lib/h2/client.rb', line 315 def on_frame_sent frame if @read_gate.first && frame[:type] == :settings @read_gate.first = false @read_gate.unblock! end end |
#on_goaway(*args) ⇒ Object
goaway callback for parser: calls custom handler, then closes connection
334 335 336 337 |
# File 'lib/h2/client.rb', line 334 def on_goaway *args on :goaway, *args close end |
#on_promise(promise) ⇒ Object
push promise callback for parser: creates new Stream
with appropriate parent, binds close event, calls custom handler
342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/h2/client.rb', line 342 def on_promise promise push_promise = Stream.new client: self, parent: @streams[promise.parent.id], push: true, stream: promise do |p| p.on :close do method = p.headers[METHOD_KEY].downcase.to_sym rescue :error path = p.headers[PATH_KEY] add_stream method: method, path: path, stream: p end end on :promise, push_promise end |
#read(maxlen = DEFAULT_MAXLEN) ⇒ Object
creates a new Thread
to read the given number of bytes each loop from the current @socket
NOTE: initial client frames (settings, etc) should be sent first, since
this is a separate thread, take care to block until this happens
NOTE: this is the override point for celluloid actor pool or concurrent
ruby threadpool support
221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/h2/client.rb', line 221 def read maxlen = DEFAULT_MAXLEN main = Thread.current @reader = Thread.new do @read_gate.block! begin _read maxlen rescue => e main.raise e end end end |
#read_from_socket(maxlen) ⇒ Object
fake exceptionless IO for reading on older ruby versions
270 271 272 273 274 |
# File 'lib/h2/client.rb', line 270 def read_from_socket maxlen socket.read_nonblock maxlen rescue IO::WaitReadable :wait_readable end |
#request(method:, path:, headers: {}, params: {}, body: nil) {|H2::Stream| ... } ⇒ H2::Stream
initiate a Stream
by making a request with the given HTTP method
143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/h2/client.rb', line 143 def request method:, path:, headers: {}, params: {}, body: nil, &block connect unless connected? s = @client.new_stream add_params params, path unless params.empty? stream = add_stream method: method, path: path, stream: s, &block h = build_headers method: method, path: path, headers: headers s.headers h, end_stream: body.nil? s.data body if body stream end |
#selector ⇒ Object
maintain a ivar for the Array
to send to IO.select
206 207 208 |
# File 'lib/h2/client.rb', line 206 def selector @selector ||= [socket] end |
#set_ssl_context_protocols(ctx) ⇒ Object
392 393 394 |
# File 'lib/h2/client.rb', line 392 def set_ssl_context_protocols ctx ctx.alpn_protocols = ALPN_PROTOCOLS end |
#stringify_headers(hash) ⇒ Object
mutates the given hash into String
keys and values
159 160 161 162 163 164 165 |
# File 'lib/h2/client.rb', line 159 def stringify_headers hash hash.keys.each do |key| hash[key] = hash[key].to_s unless String === hash[key] hash[key.to_s] = hash.delete key unless String === key end hash end |
#tls_socket(socket) ⇒ Object
build, configure, and return TLS socket
363 364 365 366 367 368 369 |
# File 'lib/h2/client.rb', line 363 def tls_socket socket socket = OpenSSL::SSL::SSLSocket.new socket, create_ssl_context socket.sync_close = true socket.hostname = @host unless RE_IP_ADDR.match(@host) socket.connect socket end |
#write_to_socket(bytes) ⇒ Object
fake exceptionless IO for writing on older ruby versions
326 327 328 329 330 |
# File 'lib/h2/client.rb', line 326 def write_to_socket bytes socket.write_nonblock bytes rescue IO::WaitWritable :wait_writable end |