Class: HTTPX::Connection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Callbacks, Loggable, Registry
Defined in:
lib/httpx/connection.rb

Overview

The Connection can be watched for IO events.

It contains the io object to read/write from, and knows what to do when it can.

It defers connecting until absolutely necessary. Connection should be triggered from the IO selector (until then, any request will be queued).

A connection boots up its parser after connection is established. All pending requests will be redirected there after connection.

A connection can be prevented from closing by the parser, that is, if there are pending requests. This will signal that the connection was prematurely closed, due to a possible number of conditions:

  • Remote peer closed the connection (“Connection: close”);

  • Remote peer doesn’t support pipelining;

A connection may also route requests for a different host for which the io was connected to, provided that the IP is the same and the port and scheme as well. This will allow to share the same socket to send HTTP/2 requests to different hosts.

Defined Under Namespace

Classes: HTTP1, HTTP2

Constant Summary collapse

BUFFER_SIZE =
1 << 14

Constants included from Loggable

Loggable::COLORS

Constants included from Registry

Registry::Error

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Callbacks

#emit, #on, #once

Methods included from Loggable

#log, #log_exception

Methods included from Registry

extended, included

Constructor Details

#initialize(type, uri, options) ⇒ Connection

Returns a new instance of Connection.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/httpx/connection.rb', line 51

def initialize(type, uri, options)
  @type = type
  @origins = [uri.origin]
  @origin = URI(uri.origin)
  @options = Options.new(options)
  @window_size = @options.window_size
  @read_buffer = Buffer.new(BUFFER_SIZE)
  @write_buffer = Buffer.new(BUFFER_SIZE)
  @pending = []
  on(:error, &method(:on_error))
  if @options.io
    # if there's an already open IO, get its
    # peer address, and force-initiate the parser
    transition(:already_open)
    @io = IO.registry(@type).new(@origin, nil, @options)
    parser
  else
    transition(:idle)
  end

  @inflight = 0
  @keep_alive_timeout = options.timeout.keep_alive_timeout
  @keep_alive_timer = nil
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



47
48
49
# File 'lib/httpx/connection.rb', line 47

def options
  @options
end

#originObject (readonly)

Returns the value of attribute origin.



47
48
49
# File 'lib/httpx/connection.rb', line 47

def origin
  @origin
end

#pendingObject (readonly)

Returns the value of attribute pending.



47
48
49
# File 'lib/httpx/connection.rb', line 47

def pending
  @pending
end

#stateObject (readonly)

Returns the value of attribute state.



47
48
49
# File 'lib/httpx/connection.rb', line 47

def state
  @state
end

#timers=(value) ⇒ Object (writeonly)

Sets the attribute timers

Parameters:

  • value

    the value to set the attribute timers to.



49
50
51
# File 'lib/httpx/connection.rb', line 49

def timers=(value)
  @timers = value
end

Instance Method Details

#addressesObject



82
83
84
# File 'lib/httpx/connection.rb', line 82

def addresses
  @io && @io.addresses
end

#addresses=(addrs) ⇒ Object

this is a semi-private method, to be used by the resolver to initiate the io object.



78
79
80
# File 'lib/httpx/connection.rb', line 78

def addresses=(addrs)
  @io ||= IO.registry(@type).new(@origin, addrs, @options) # rubocop:disable Naming/MemoizedInstanceVariableName
end

#callObject



197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/httpx/connection.rb', line 197

def call
  case @state
  when :closed
    return
  when :closing
    consume
    transition(:closed)
    emit(:close)
  when :open
    consume
  end
  nil
end

#closeObject



211
212
213
214
215
216
217
# File 'lib/httpx/connection.rb', line 211

def close
  @parser.close if @parser
  return unless @keep_alive_timer

  @keep_alive_timer.cancel
  remove_instance_variable(:@keep_alive_timer)
end

#coalescable?(connection) ⇒ Boolean

coalescable connections need to be mergeable! but internally, #mergeable? is called before #coalescable?

Returns:

  • (Boolean)


113
114
115
116
117
118
119
120
121
# File 'lib/httpx/connection.rb', line 113

def coalescable?(connection)
  if @io.protocol == "h2" &&
     @origin.scheme == "https" &&
     connection.origin.scheme == "https"
    @io.verify_hostname(connection.origin.host)
  else
    @origin == connection.origin
  end
end

#connecting?Boolean

Returns:

  • (Boolean)


169
170
171
# File 'lib/httpx/connection.rb', line 169

def connecting?
  @state == :idle
end

#create_idleObject



123
124
125
# File 'lib/httpx/connection.rb', line 123

def create_idle
  self.class.new(@type, @origin, @options)
end

#inflight?Boolean

Returns:

  • (Boolean)


173
174
175
# File 'lib/httpx/connection.rb', line 173

def inflight?
  @parser && !@parser.empty? && !@write_buffer.empty?
end

#interestsObject



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/httpx/connection.rb', line 177

def interests
  # connecting
  if connecting?
    connect

    return @io.interests if connecting?
  end

  # if the write buffer is full, we drain it
  return :w if @write_buffer.full?

  return @parser.interests if @parser

  nil
end

#match?(uri, options) ⇒ Boolean

Returns:

  • (Boolean)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/httpx/connection.rb', line 86

def match?(uri, options)
  return false if @state == :closing || @state == :closed

  return false if exhausted?

  (
    (
      @origins.include?(uri.origin) &&
      # if there is more than one origin to match, it means that this connection
      # was the result of coalescing. To prevent blind trust in the case where the
      # origin came from an ORIGIN frame, we're going to verify the hostname with the
      # SSL certificate
      (@origins.size == 1 || @origin == uri.origin || (@io && @io.verify_hostname(uri.host)))
    ) || match_altsvcs?(uri)
  ) && @options == options
end

#match_altsvcs?(uri) ⇒ Boolean

checks if this is connection is an alternative service of uri

Returns:

  • (Boolean)


161
162
163
164
165
166
167
# File 'lib/httpx/connection.rb', line 161

def match_altsvcs?(uri)
  @origins.any? { |origin| uri.altsvc_match?(origin) } ||
    AltSvc.cached_altsvc(@origin).any? do |altsvc|
      origin = altsvc["origin"]
      origin.altsvc_match?(uri.origin)
    end
end

#merge(connection) ⇒ Object



127
128
129
130
131
132
# File 'lib/httpx/connection.rb', line 127

def merge(connection)
  @origins += connection.instance_variable_get(:@origins)
  connection.purge_pending do |req|
    send(req)
  end
end

#mergeable?(connection) ⇒ Boolean

Returns:

  • (Boolean)


103
104
105
106
107
108
109
# File 'lib/httpx/connection.rb', line 103

def mergeable?(connection)
  return false if @state == :closing || @state == :closed || !@io

  return false if exhausted?

  !(@io.addresses & connection.addresses).empty? && @options == connection.options
end

#purge_pendingObject



145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/httpx/connection.rb', line 145

def purge_pending
  pendings = []
  if @parser
    @inflight -= @parser.pending.size
    pendings << @parser.pending
  end
  pendings << @pending
  pendings.each do |pending|
    pending.reject! do |request|
      yield request
    end
  end
end

#resetObject



219
220
221
222
223
# File 'lib/httpx/connection.rb', line 219

def reset
  transition(:closing)
  transition(:closed)
  emit(:close)
end

#send(request) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/httpx/connection.rb', line 225

def send(request)
  if @parser && !@write_buffer.full?
    request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri)
    if @keep_alive_timer
      # when pushing a request into an existing connection, we have to check whether there
      # is the possibility that the connection might have extended the keep alive timeout.
      # for such cases, we want to ping for availability before deciding to shovel requests.
      if @keep_alive_timer.fires_in.negative?
        @pending << request
        parser.ping
        return
      end

      @keep_alive_timer.pause
    end
    @inflight += 1
    parser.send(request)
  else
    @pending << request
  end
end

#timeoutObject



247
248
249
250
251
252
253
# File 'lib/httpx/connection.rb', line 247

def timeout
  return @timeout if defined?(@timeout)

  return @options.timeout.connect_timeout if @state == :idle

  @options.timeout.operation_timeout
end

#to_ioObject



193
194
195
# File 'lib/httpx/connection.rb', line 193

def to_io
  @io.to_io
end

#unmerge(connection) ⇒ Object



134
135
136
137
138
139
140
141
142
143
# File 'lib/httpx/connection.rb', line 134

def unmerge(connection)
  @origins -= connection.instance_variable_get(:@origins)
  purge_pending do |request|
    request.uri.origin == connection.origin && begin
      request.transition(:idle)
      connection.send(request)
      true
    end
  end
end