Class: HTTPX::Channel

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Callbacks, Loggable, Registry
Defined in:
lib/httpx/channel.rb

Overview

The Channel entity can be watched for IO events.

It contains the io object to read/write from, and knows what to do when it can.

It defers connecting until absolutely necessary. Connection should be triggered from the IO selector (until then, any request will be queued).

A channel boots up its parser after connection is established. All pending requests will be redirected there after connection.

A channel can be prevented from closing by the parser, that is, if there are pending requests. This will signal that the channel was prematurely closed, due to a possible number of conditions:

  • Remote peer closed the connection (“Connection: close”);

  • Remote peer doesn’t support pipelining;

A channel may also route requests for a different host for which the io was connected to, provided that the IP is the same and the port and scheme as well. This will allow to share the same socket to send HTTP/2 requests to different hosts. TODO: For this to succeed, the certificates sent by the servers to the client must be

identical (or match both hosts).

Direct Known Subclasses

ProxyChannel

Defined Under Namespace

Classes: HTTP1, HTTP2

Constant Summary collapse

BUFFER_SIZE =
1 << 14

Constants included from Loggable

Loggable::COLORS

Constants included from Registry

Registry::Error

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Callbacks

#emit, #on, #once

Methods included from Loggable

#log

Methods included from Registry

extended, included

Constructor Details

#initialize(type, uri, options) ⇒ Channel

Returns a new instance of Channel.



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/httpx/channel.rb', line 70

def initialize(type, uri, options)
  @type = type
  @uri = uri
  @origins = [@uri.origin]
  @options = Options.new(options)
  @window_size = @options.window_size
  @read_buffer = Buffer.new(BUFFER_SIZE)
  @write_buffer = Buffer.new(BUFFER_SIZE)
  @pending = []
  on(:error) { |ex| on_error(ex) }
  transition(:idle)
end

Instance Attribute Details

#pendingObject (readonly)

Returns the value of attribute pending.



68
69
70
# File 'lib/httpx/channel.rb', line 68

def pending
  @pending
end

#stateObject (readonly)

Returns the value of attribute state.



68
69
70
# File 'lib/httpx/channel.rb', line 68

def state
  @state
end

#uriObject (readonly)

Returns the value of attribute uri.



68
69
70
# File 'lib/httpx/channel.rb', line 68

def uri
  @uri
end

Class Method Details

.by(uri, options) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/httpx/channel.rb', line 46

def by(uri, options)
  type = options.transport || begin
    case uri.scheme
    when "http"
      "tcp"
    when "https"
      "ssl"
    when "h2"
      options = options.merge(ssl: { alpn_protocols: %(h2) })
      "ssl"
    else
      raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme"
    end
  end
  new(type, uri, options)
end

Instance Method Details

#addresses=(addrs) ⇒ Object



83
84
85
# File 'lib/httpx/channel.rb', line 83

def addresses=(addrs)
  @io = IO.registry(@type).new(@uri, addrs, @options)
end

#callObject



189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/httpx/channel.rb', line 189

def call
  @timeout = @timeout_threshold
  case @state
  when :closed
    return
  when :closing
    dwrite
    transition(:closed)
    emit(:close)
  when :open
    consume
  end
  nil
end

#closeObject



167
168
169
170
# File 'lib/httpx/channel.rb', line 167

def close
  @parser.close if @parser
  transition(:closing)
end

#coalescable?(channel) ⇒ Boolean

coalescable channels need to be mergeable! but internally, #mergeable? is called before #coalescable?

Returns:

  • (Boolean)


94
95
96
97
98
99
100
# File 'lib/httpx/channel.rb', line 94

def coalescable?(channel)
  if @io.protocol == "h2" && @uri.scheme == "https"
    @io.verify_hostname(channel.uri.host)
  else
    @uri.origin == channel.uri.origin
  end
end

#connecting?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/httpx/channel.rb', line 144

def connecting?
  @state == :idle
end

#handle_timeout_error(e) ⇒ Object



209
210
211
212
213
214
215
216
217
218
# File 'lib/httpx/channel.rb', line 209

def handle_timeout_error(e)
  return emit(:error, e) unless @timeout
  @timeout -= e.timeout
  return unless @timeout <= 0
  if connecting?
    emit(:error, e.to_connection_error)
  else
    emit(:error, e)
  end
end

#interestsObject



148
149
150
151
152
153
154
155
156
157
# File 'lib/httpx/channel.rb', line 148

def interests
  return :w if @state == :idle
  readable = !@read_buffer.full?
  writable = !@write_buffer.empty?
  if readable
    writable ? :rw : :r
  else
    writable ? :w : :r
  end
end

#match?(uri) ⇒ Boolean

Returns:

  • (Boolean)


129
130
131
132
133
# File 'lib/httpx/channel.rb', line 129

def match?(uri)
  return false if @state == :closing

  @origins.include?(uri.origin) || match_altsvcs?(uri)
end

#match_altsvcs?(uri) ⇒ Boolean

checks if this is channel is an alternative service of uri

Returns:

  • (Boolean)


137
138
139
140
141
142
# File 'lib/httpx/channel.rb', line 137

def match_altsvcs?(uri)
  AltSvc.cached_altsvc(@uri.origin).any? do |altsvc|
    origin = altsvc["origin"]
    origin.altsvc_match?(uri.origin)
  end
end

#merge(channel) ⇒ Object



102
103
104
105
106
107
108
# File 'lib/httpx/channel.rb', line 102

def merge(channel)
  @origins += channel.instance_variable_get(:@origins)
  pending = channel.instance_variable_get(:@pending)
  pending.each do |req, args|
    send(req, args)
  end
end

#mergeable?(addresses) ⇒ Boolean

Returns:

  • (Boolean)


87
88
89
90
# File 'lib/httpx/channel.rb', line 87

def mergeable?(addresses)
  return false if @state == :closing || !@io
  !(@io.addresses & addresses).empty?
end

#purge_pendingObject



121
122
123
124
125
126
127
# File 'lib/httpx/channel.rb', line 121

def purge_pending
  [@parser.pending, @pending].each do |pending|
    pending.reject! do |request, *args|
      yield(request, args)
    end
  end
end

#resetObject



172
173
174
175
176
# File 'lib/httpx/channel.rb', line 172

def reset
  transition(:closing)
  transition(:closed)
  emit(:close)
end

#send(request, **args) ⇒ Object



178
179
180
181
182
183
184
185
186
187
# File 'lib/httpx/channel.rb', line 178

def send(request, **args)
  if @error_response
    emit(:response, request, @error_response)
  elsif @parser && !@write_buffer.full?
    request.headers["alt-used"] = @uri.authority if match_altsvcs?(request.uri)
    parser.send(request, **args)
  else
    @pending << [request, args]
  end
end

#to_ioObject



159
160
161
162
163
164
165
# File 'lib/httpx/channel.rb', line 159

def to_io
  case @state
  when :idle
    transition(:open)
  end
  @io.to_io
end

#unmerge(channel) ⇒ Object



110
111
112
113
114
115
116
117
118
119
# File 'lib/httpx/channel.rb', line 110

def unmerge(channel)
  @origins -= channel.instance_variable_get(:@origins)
  purge_pending do |request, args|
    request.uri == channel.uri && begin
      request.transition(:idle)
      channel.send(request, *args)
      true
    end
  end
end

#upgrade_parser(protocol) ⇒ Object



204
205
206
207
# File 'lib/httpx/channel.rb', line 204

def upgrade_parser(protocol)
  @parser.reset if @parser
  @parser = build_parser(protocol)
end