Class: HTTPX::Channel::HTTP1
Constant Summary
collapse
- CRLF =
"\r\n"
Instance Method Summary
collapse
Methods included from Loggable
#log
#emit, #on, #once
Constructor Details
#initialize(buffer, options) ⇒ HTTP1
Returns a new instance of HTTP1.
12
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/httpx/channel/http1.rb', line 12
def initialize(buffer, options)
@options = Options.new(options)
@max_concurrent_requests = @options.max_concurrent_requests
@retries = options.max_retries
@parser = HTTP::Parser.new(self)
@parser. = :arrays
@buffer = buffer
@version = [1, 1]
@pending = []
@requests = []
@has_response = false
end
|
Instance Method Details
#<<(data) ⇒ Object
41
42
43
44
|
# File 'lib/httpx/channel/http1.rb', line 41
def <<(data)
@parser << data
dispatch if @has_response
end
|
#close ⇒ Object
30
31
32
33
|
# File 'lib/httpx/channel/http1.rb', line 30
def close
reset
emit(:close)
end
|
#consume ⇒ Object
63
64
65
66
67
|
# File 'lib/httpx/channel/http1.rb', line 63
def consume
@requests.each do |request|
handle(request)
end
end
|
#dispatch ⇒ Object
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/httpx/channel/http1.rb', line 132
def dispatch
request = @requests.first
return handle(request) if request.expects?
@requests.shift
response = request.response
emit(:response, request, response)
if @parser.upgrade?
response << @parser.upgrade_data
throw(:called)
end
close
send(@pending.shift) unless @pending.empty?
return unless response.["connection"] == "close"
unless @requests.empty?
@requests.map { |r| r.transition(:idle) }
@max_concurrent_requests = 1
end
emit(:complete)
end
|
#empty? ⇒ Boolean
35
36
37
38
39
|
# File 'lib/httpx/channel/http1.rb', line 35
def empty?
@requests.empty? || @requests.all? { |request| !request.response.nil? }
end
|
#on_body(chunk) ⇒ Object
98
99
100
101
102
103
104
105
106
|
# File 'lib/httpx/channel/http1.rb', line 98
def on_body(chunk)
log { "-> DATA: #{chunk.bytesize} bytes..." }
log(2) { "-> #{chunk.inspect}" }
response = @requests.first.response
response << chunk
dispatch if response.complete?
end
|
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/httpx/channel/http1.rb', line 77
def (h)
return (h) if @parser_trailers
request = @requests.first
return if request.response
log(2) { "headers received" }
= @options..new(h)
response = @options.response_class.new(@requests.last,
@parser.status_code,
@parser.http_version.join("."),
, @options)
log { "-> HEADLINE: #{response.status} HTTP/#{@parser.http_version.join(".")}" }
log { response..each.map { |f, v| "-> HEADER: #{f}: #{v}" }.join("\n") }
request.response = response
@has_response = true if response.complete?
end
|
#on_message_begin ⇒ Object
HTTP Parser callbacks
must be public methods, or else they won’t be reachable
73
74
75
|
# File 'lib/httpx/channel/http1.rb', line 73
def on_message_begin
log(2) { "parsing begins" }
end
|
#on_message_complete ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/httpx/channel/http1.rb', line 108
def on_message_complete
log(2) { "parsing complete" }
request = @requests.first
response = request.response
if !@parser_trailers && response..key?("trailer")
@parser_trailers = true
@parser.reset!
@parser << "#{request.verb.to_s.upcase} #{request.path} HTTP/#{response.version}#{CRLF}"
else
@has_response = true
end
end
|
126
127
128
129
130
|
# File 'lib/httpx/channel/http1.rb', line 126
def (h)
response = @requests.first.response
response.(h)
end
|
#reenqueue! ⇒ Object
55
56
57
58
59
60
61
|
# File 'lib/httpx/channel/http1.rb', line 55
def reenqueue!
requests = @requests.dup
@requests.clear
requests.each do |request|
send(request)
end
end
|
#reset ⇒ Object
25
26
27
28
|
# File 'lib/httpx/channel/http1.rb', line 25
def reset
@parser.reset!
@has_response = false
end
|
#send(request) ⇒ Object
46
47
48
49
50
51
52
53
|
# File 'lib/httpx/channel/http1.rb', line 46
def send(request, **)
if @requests.size >= @max_concurrent_requests
@pending << request
return
end
@requests << request unless @requests.include?(request)
handle(request)
end
|