Module: Hatetepe::Client
Defined Under Namespace
Modules: VerbMethods Classes: Job
Constant Summary collapse
- CONFIG_DEFAULTS =
The default configuration.
{ :timeout => 5, :connect_timeout => 5 }
Instance Attribute Summary collapse
-
#app ⇒ Object
readonly
private
The pipe of middleware and request transmission/response reception.
-
#config ⇒ Object
readonly
The configuration for this Client instance.
Attributes included from Connection
Class Method Summary collapse
- .request(verb, uri, headers = {}, body = []) ⇒ Object
-
.start(config) ⇒ Hatetepe::Client
Starts a new Client.
Instance Method Summary collapse
-
#<<(request) ⇒ Object
Sends a request and waits for the response without blocking.
-
#initialize(config) ⇒ Object
Initializes a new Client instance.
-
#post_init ⇒ Object
Initializes the parser, request queue, and middleware pipe.
-
#receive_data(data) ⇒ Object
Feeds response data into the parser.
-
#receive_response(response) ⇒ Object
private
Relates an incoming response to the corresponding request.
-
#request(verb, uri, headers = {}, body = []) ⇒ Hatetepe::Response?
Builds a
Request
, sends it, and blocks while waiting for the response. -
#request!(verb, uri, headers = {}, body = []) ⇒ Hatetepe
Like
#request
, but raises errors for 4xx and 5xx responses. -
#send_request(request) ⇒ Hatetepe::Response?
private
Feeds the request into the builder and blocks while waiting for the response to arrive.
- #ssl_handshake_completed ⇒ Object
-
#stop ⇒ Object
Gracefully stops the client.
-
#stop! ⇒ Object
Immediately stops the client by closing the connection.
-
#unbind(reason) ⇒ Object
Aborts all outstanding requests.
-
#wait ⇒ Object
Blocks until the last request has finished receiving its response.
Methods included from Connection
#close_connection, #closed?, #closed_by_connect_timeout?, #closed_by_remote?, #closed_by_self?, #closed_by_timeout?, #comm_inactivity_timeout=, #connected?, #connection_completed, #pending_connect_timeout=, #remote_address, #remote_port, #sockaddr
Instance Attribute Details
#app ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The pipe of middleware and request transmission/response reception.
39 40 41 |
# File 'lib/hatetepe/client.rb', line 39 def app @app end |
#config ⇒ Object (readonly)
The configuration for this Client instance.
34 35 36 |
# File 'lib/hatetepe/client.rb', line 34 def config @config end |
Class Method Details
.request(verb, uri, headers = {}, body = []) ⇒ Object
244 245 246 247 248 |
# File 'lib/hatetepe/client.rb', line 244 def self.request(verb, uri, headers = {}, body = []) uri = URI(uri) client = start(host: uri.host, port: uri.port, ssl: uri.scheme == 'https') client.request(verb, uri, headers, body) end |
.start(config) ⇒ Hatetepe::Client
Starts a new Client.
239 240 241 |
# File 'lib/hatetepe/client.rb', line 239 def self.start(config) EM.connect(config[:host], config[:port], self, config) end |
Instance Method Details
#<<(request) ⇒ Object
Sends a request and waits for the response without blocking.
Transmission and reception are performed within a separate Fiber
. #succeed
and #fail
will be called on the request
passing the response, depending on whether the response indicates success (100-399) or failure (400-599).
The request will #fail
with a nil
response if the connection was closed for whatever reason.
TODO find out if there are more cases where the response body
should automatically be closed.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/hatetepe/client.rb', line 114 def <<(request) Fiber.new do EM::Synchrony.sync(@ssl_handshake_completed) if config[:ssl] response = @app.call(request) if response && (request.verb == "HEAD" || response.status == 204) response.body.close_write end if !response request.fail elsif response.failure? request.fail(response) else request.succeed(response) end end.resume end |
#initialize(config) ⇒ Object
Initializes a new Client instance.
47 48 49 50 |
# File 'lib/hatetepe/client.rb', line 47 def initialize(config) @config = CONFIG_DEFAULTS.merge(config) @ssl_handshake_completed = EM::DefaultDeferrable.new end |
#post_init ⇒ Object
Initializes the parser, request queue, and middleware pipe.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/hatetepe/client.rb', line 57 def post_init @builder, @parser = Hatetepe::Builder.new, Hatetepe::Parser.new @builder.on_write << method(:send_data) # @builder.on_write {|data| p "|--> #{data}" } @parser.on_response << method(:receive_response) @queue = [] @app = proc {|request| send_request(request) } self.comm_inactivity_timeout = config[:timeout] self.pending_connect_timeout = config[:connect_timeout] start_tls if config[:ssl] end |
#receive_data(data) ⇒ Object
Feeds response data into the parser.
85 86 87 88 |
# File 'lib/hatetepe/client.rb', line 85 def receive_data(data) # p "|<-- #{data}" @parser << data end |
#receive_response(response) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Relates an incoming response to the corresponding request.
Supports the response bit of HTTP pipelining by relating responses to requests in the order the requests were sent.
TODO: raise a more meaningful error.
304 305 306 307 308 309 310 311 312 313 |
# File 'lib/hatetepe/client.rb', line 304 def receive_response(response) query = proc {|j| j.response.nil? } if job = @queue.find(&query) job.response = response job.fiber.resume else raise "Received response but didn't expect one: #{response.status}" end end |
#request(verb, uri, headers = {}, body = []) ⇒ Hatetepe::Response?
Builds a Request
, sends it, and blocks while waiting for the response.
149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/hatetepe/client.rb', line 149 def request(verb, uri, headers = {}, body = []) uri = URI(uri) uri.scheme ||= @config[:ssl] ? 'http' : 'https' uri.host ||= @config[:host] uri.port ||= @config[:port] headers['Host'] ||= "#{uri.host}:#{uri.port}" request = Hatetepe::Request.new(verb, URI(uri.to_s), headers, body) self << request EM::Synchrony.sync(request) end |
#request!(verb, uri, headers = {}, body = []) ⇒ Hatetepe
Like #request
, but raises errors for 4xx and 5xx responses.
182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/hatetepe/client.rb', line 182 def request!(verb, uri, headers = {}, body = []) response = request(verb, uri, headers, body) if response.nil? raise Hatetepe::RequestError elsif response.status >= 500 raise Hatetepe::ServerError elsif response.status >= 400 raise Hatetepe::ClientError end response end |
#send_request(request) ⇒ Hatetepe::Response?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Feeds the request into the builder and blocks while waiting for the response to arrive.
Supports the request bit of HTTP pipelining by waiting until the previous request has been sent.
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/hatetepe/client.rb', line 264 def send_request(request) previous = @queue.last current = Job.new(Fiber.current, request, false) @queue << current # wait for the previous request to be sent while previous && !previous.sent return if Fiber.yield == :kill end # send the request self.comm_inactivity_timeout = 0 @builder.request(request.to_a) current.sent = true self.comm_inactivity_timeout = config[:timeout] # wait for the response while !current.response return if Fiber.yield == :kill end # clean up and return response @queue.delete(current) current.response end |
#ssl_handshake_completed ⇒ Object
73 74 75 |
# File 'lib/hatetepe/client.rb', line 73 def ssl_handshake_completed EM::Synchrony.next_tick { @ssl_handshake_completed.succeed } end |
#stop ⇒ Object
Gracefully stops the client.
Waits for all requests to finish and then stops the client.
201 202 203 204 |
# File 'lib/hatetepe/client.rb', line 201 def stop wait stop! end |
#stop! ⇒ Object
Immediately stops the client by closing the connection.
This will lead to EventMachine’s event loop calling #unbind, which fail all outstanding requests.
214 215 216 |
# File 'lib/hatetepe/client.rb', line 214 def stop! close_connection end |
#unbind(reason) ⇒ Object
Aborts all outstanding requests.
95 96 97 98 |
# File 'lib/hatetepe/client.rb', line 95 def unbind(reason) super @queue.each {|job| job.fiber.resume(:kill) } end |
#wait ⇒ Object
Blocks until the last request has finished receiving its response.
Returns immediately if there are no outstanding requests.
223 224 225 226 227 228 |
# File 'lib/hatetepe/client.rb', line 223 def wait if job = @queue.last EM::Synchrony.sync(job.request) EM::Synchrony.sync(job.response.body) if job.response end end |