Class: HTTPX::Channel::HTTP1

Inherits:
Object
  • Object
show all
Includes:
HTTPX::Callbacks, Loggable
Defined in:
lib/httpx/channel/http1.rb

Direct Known Subclasses

Plugins::Proxy::HTTP::ProxyParser

Constant Summary collapse

CRLF =
"\r\n"

Instance Method Summary collapse

Methods included from Loggable

#log

Methods included from HTTPX::Callbacks

#emit, #on, #once

Constructor Details

#initialize(buffer, options) ⇒ HTTP1

Returns a new instance of HTTP1.



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/httpx/channel/http1.rb', line 12

def initialize(buffer, options)
  @options = Options.new(options)
  @max_concurrent_requests = @options.max_concurrent_requests
  @retries = options.max_retries
  @parser = HTTP::Parser.new(self)
  @parser.header_value_type = :arrays
  @buffer = buffer
  @version = [1, 1]
  @pending = []
  @requests = []
  @has_response = false
end

Instance Method Details

#<<(data) ⇒ Object



41
42
43
44
# File 'lib/httpx/channel/http1.rb', line 41

def <<(data)
  @parser << data
  dispatch if @has_response
end

#closeObject



30
31
32
33
# File 'lib/httpx/channel/http1.rb', line 30

def close
  reset
  emit(:close)
end

#consumeObject



63
64
65
66
67
# File 'lib/httpx/channel/http1.rb', line 63

def consume
  @requests.each do |request|
    handle(request)
  end
end

#dispatchObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/httpx/channel/http1.rb', line 132

def dispatch
  request = @requests.first
  return handle(request) if request.expects?

  @requests.shift
  response = request.response
  emit(:response, request, response)

  if @parser.upgrade?
    response << @parser.upgrade_data
    throw(:called)
  end
  close
  send(@pending.shift) unless @pending.empty?
  return unless response.headers["connection"] == "close"
  unless @requests.empty?
    @requests.map { |r| r.transition(:idle) }
    # server doesn't handle pipelining, and probably
    # doesn't support keep-alive. Fallback to send only
    # 1 keep alive request.
    @max_concurrent_requests = 1
  end
  emit(:complete)
end

#empty?Boolean

Returns:

  • (Boolean)


35
36
37
38
39
# File 'lib/httpx/channel/http1.rb', line 35

def empty?
  # this means that for every request there's an available
  # partial response, so there are no in-flight requests waiting.
  @requests.empty? || @requests.all? { |request| !request.response.nil? }
end

#on_body(chunk) ⇒ Object



98
99
100
101
102
103
104
105
106
# File 'lib/httpx/channel/http1.rb', line 98

def on_body(chunk)
  log { "-> DATA: #{chunk.bytesize} bytes..." }
  log(2) { "-> #{chunk.inspect}" }
  response = @requests.first.response

  response << chunk

  dispatch if response.complete?
end

#on_headers_complete(h) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/httpx/channel/http1.rb', line 77

def on_headers_complete(h)
  return on_trailer_headers_complete(h) if @parser_trailers
  # Wait for fix: https://github.com/tmm1/http_parser.rb/issues/52
  # callback is called 2 times when chunked
  request = @requests.first
  return if request.response

  log(2) { "headers received" }
  headers = @options.headers_class.new(h)
  response = @options.response_class.new(@requests.last,
                                         @parser.status_code,
                                         @parser.http_version.join("."),
                                         headers, @options)
  log { "-> HEADLINE: #{response.status} HTTP/#{@parser.http_version.join(".")}" }
  log { response.headers.each.map { |f, v| "-> HEADER: #{f}: #{v}" }.join("\n") }

  request.response = response

  @has_response = true if response.complete?
end

#on_message_beginObject

HTTP Parser callbacks

must be public methods, or else they won’t be reachable



73
74
75
# File 'lib/httpx/channel/http1.rb', line 73

def on_message_begin
  log(2) { "parsing begins" }
end

#on_message_completeObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/httpx/channel/http1.rb', line 108

def on_message_complete
  log(2) { "parsing complete" }
  request = @requests.first
  response = request.response

  if !@parser_trailers && response.headers.key?("trailer")
    @parser_trailers = true
    # this is needed, because the parser can't accept further headers.
    # we need to reset it and artificially move it to receive headers state,
    # hence the bogus headline
    #
    @parser.reset!
    @parser << "#{request.verb.to_s.upcase} #{request.path} HTTP/#{response.version}#{CRLF}"
  else
    @has_response = true
  end
end

#on_trailer_headers_complete(h) ⇒ Object



126
127
128
129
130
# File 'lib/httpx/channel/http1.rb', line 126

def on_trailer_headers_complete(h)
  response = @requests.first.response

  response.merge_headers(h)
end

#reenqueue!Object



55
56
57
58
59
60
61
# File 'lib/httpx/channel/http1.rb', line 55

def reenqueue!
  requests = @requests.dup
  @requests.clear
  requests.each do |request|
    send(request)
  end
end

#resetObject



25
26
27
28
# File 'lib/httpx/channel/http1.rb', line 25

def reset
  @parser.reset!
  @has_response = false
end

#send(request) ⇒ Object



46
47
48
49
50
51
52
53
# File 'lib/httpx/channel/http1.rb', line 46

def send(request, **)
  if @requests.size >= @max_concurrent_requests
    @pending << request
    return
  end
  @requests << request unless @requests.include?(request)
  handle(request)
end