Class: HTTPX::Channel::HTTP1

Inherits:
Object
  • Object
show all
Includes:
HTTPX::Callbacks, Loggable
Defined in:
lib/httpx/channel/http1.rb

Direct Known Subclasses

Plugins::Proxy::HTTP::ProxyParser

Constant Summary collapse

CRLF =
"\r\n"

Constants included from Loggable

Loggable::COLORS

Instance Method Summary collapse

Methods included from Loggable

#log

Methods included from HTTPX::Callbacks

#emit, #on, #once

Constructor Details

#initialize(buffer, options) ⇒ HTTP1

Returns a new instance of HTTP1.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/httpx/channel/http1.rb', line 12

def initialize(buffer, options)
  @options = Options.new(options)
  @max_concurrent_requests = @options.max_concurrent_requests
  @parser = HTTP::Parser.new(self)
  @parser.header_value_type = :arrays
  @buffer = buffer
  @version = [1, 1]
  @pending = []
  @requests = []
  @has_response = false
end

Instance Method Details

#<<(data) ⇒ Object



40
41
42
43
# File 'lib/httpx/channel/http1.rb', line 40

def <<(data)
  @parser << data
  dispatch if @has_response
end

#closeObject



29
30
31
32
# File 'lib/httpx/channel/http1.rb', line 29

def close
  reset
  emit(:close)
end

#consumeObject



62
63
64
65
66
# File 'lib/httpx/channel/http1.rb', line 62

def consume
  @requests.each do |request|
    handle(request)
  end
end

#dispatchObject



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/httpx/channel/http1.rb', line 131

def dispatch
  request = @requests.first
  return handle(request) if request.expects?

  @requests.shift
  response = request.response
  emit(:response, request, response)

  if @parser.upgrade?
    response << @parser.upgrade_data
    throw(:called)
  end
  reset
  send(@pending.shift) unless @pending.empty?
  return unless response.headers["connection"] == "close"
  disable_concurrency
  emit(:reset)
end

#empty?Boolean

Returns:

  • (Boolean)


34
35
36
37
38
# File 'lib/httpx/channel/http1.rb', line 34

def empty?
  # this means that for every request there's an available
  # partial response, so there are no in-flight requests waiting.
  @requests.empty? || @requests.all? { |request| !request.response.nil? }
end

#handle_error(ex) ⇒ Object



150
151
152
153
154
# File 'lib/httpx/channel/http1.rb', line 150

def handle_error(ex)
  @requests.each do |request|
    emit(:error, request, ex)
  end
end

#on_body(chunk) ⇒ Object



97
98
99
100
101
102
103
104
105
# File 'lib/httpx/channel/http1.rb', line 97

def on_body(chunk)
  log(color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
  log(level: 2, color: :green) { "-> #{chunk.inspect}" }
  response = @requests.first.response

  response << chunk

  # dispatch if response.complete?
end

#on_headers_complete(h) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/httpx/channel/http1.rb', line 76

def on_headers_complete(h)
  return on_trailer_headers_complete(h) if @parser_trailers
  # Wait for fix: https://github.com/tmm1/http_parser.rb/issues/52
  # callback is called 2 times when chunked
  request = @requests.first
  return if request.response

  log(level: 2) { "headers received" }
  headers = @options.headers_class.new(h)
  response = @options.response_class.new(@requests.last,
                                         @parser.status_code,
                                         @parser.http_version.join("."),
                                         headers, @options)
  log(color: :yellow) { "-> HEADLINE: #{response.status} HTTP/#{@parser.http_version.join(".")}" }
  log(color: :yellow) { response.headers.each.map { |f, v| "-> HEADER: #{f}: #{v}" }.join("\n") }

  request.response = response

  @has_response = true if response.complete?
end

#on_message_beginObject

HTTP Parser callbacks

must be public methods, or else they won’t be reachable



72
73
74
# File 'lib/httpx/channel/http1.rb', line 72

def on_message_begin
  log(level: 2) { "parsing begins" }
end

#on_message_completeObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/httpx/channel/http1.rb', line 107

def on_message_complete
  log(level: 2) { "parsing complete" }
  request = @requests.first
  response = request.response

  if !@parser_trailers && response.headers.key?("trailer")
    @parser_trailers = true
    # this is needed, because the parser can't accept further headers.
    # we need to reset it and artificially move it to receive headers state,
    # hence the bogus headline
    #
    @parser.reset!
    @parser << "#{request.verb.to_s.upcase} #{request.path} HTTP/#{response.version}#{CRLF}"
  else
    @has_response = true
  end
end

#on_trailer_headers_complete(h) ⇒ Object



125
126
127
128
129
# File 'lib/httpx/channel/http1.rb', line 125

def on_trailer_headers_complete(h)
  response = @requests.first.response

  response.merge_headers(h)
end

#reenqueue!Object



54
55
56
57
58
59
60
# File 'lib/httpx/channel/http1.rb', line 54

def reenqueue!
  requests = @requests.dup
  @requests.clear
  requests.each do |request|
    send(request)
  end
end

#resetObject



24
25
26
27
# File 'lib/httpx/channel/http1.rb', line 24

def reset
  @parser.reset!
  @has_response = false
end

#send(request) ⇒ Object



45
46
47
48
49
50
51
52
# File 'lib/httpx/channel/http1.rb', line 45

def send(request, **)
  if @requests.size >= @max_concurrent_requests
    @pending << request
    return
  end
  @requests << request unless @requests.include?(request)
  handle(request)
end