Module: Hatetepe::Client

Extended by:
VerbMethods
Includes:
VerbMethods, Connection
Defined in:
lib/hatetepe/client.rb

Defined Under Namespace

Modules: VerbMethods Classes: Job

Constant Summary collapse

CONFIG_DEFAULTS =

The default configuration.

{
  :timeout         => 5,
  :connect_timeout => 5
}

Instance Attribute Summary collapse

Attributes included from Connection

#processing_enabled

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Connection

#close_connection, #closed?, #closed_by_connect_timeout?, #closed_by_remote?, #closed_by_self?, #closed_by_timeout?, #comm_inactivity_timeout=, #connected?, #connection_completed, #pending_connect_timeout=, #remote_address, #remote_port, #sockaddr

Instance Attribute Details

#appObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The pipe of middleware and request transmission/response reception.



39
40
41
# File 'lib/hatetepe/client.rb', line 39

def app
  @app
end

#configObject (readonly)

The configuration for this Client instance.



34
35
36
# File 'lib/hatetepe/client.rb', line 34

def config
  @config
end

Class Method Details

.request(verb, uri, headers = {}, body = []) ⇒ Object



244
245
246
247
248
# File 'lib/hatetepe/client.rb', line 244

def self.request(verb, uri, headers = {}, body = [])
  uri    = URI(uri)
  client = start(host: uri.host, port: uri.port, ssl: uri.scheme == 'https')
  client.request(verb, uri, headers, body)
end

.start(config) ⇒ Hatetepe::Client

Starts a new Client.

Parameters:

  • config (Hash)

    The :host and :port the Client should connect to.

Returns:



239
240
241
# File 'lib/hatetepe/client.rb', line 239

def self.start(config)
  EM.connect(config[:host], config[:port], self, config)
end

Instance Method Details

#<<(request) ⇒ Object

Sends a request and waits for the response without blocking.

Transmission and reception are performed within a separate Fiber. #succeed and #fail will be called on the request passing the response, depending on whether the response indicates success (100-399) or failure (400-599).

The request will #fail with a nil response if the connection was closed for whatever reason.

TODO find out if there are more cases where the response body

should automatically be closed.


114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/hatetepe/client.rb', line 114

def <<(request)
  Fiber.new do
    EM::Synchrony.sync(@ssl_handshake_completed) if config[:ssl]

    response = @app.call(request)

    if response && (request.verb == "HEAD" || response.status == 204)
      response.body.close_write
    end

    if !response
      request.fail
    elsif response.failure?
      request.fail(response)
    else
      request.succeed(response)
    end
  end.resume
end

#initialize(config) ⇒ Object

Initializes a new Client instance.

Parameters:

  • config (Hash)

    Configuration values that overwrite the defaults.



47
48
49
50
# File 'lib/hatetepe/client.rb', line 47

def initialize(config)
  @config = CONFIG_DEFAULTS.merge(config)
  @ssl_handshake_completed = EM::DefaultDeferrable.new
end

#post_initObject

Initializes the parser, request queue, and middleware pipe.

See Also:

  • EM::Connection#post_init


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/hatetepe/client.rb', line 57

def post_init
  @builder, @parser   =  Hatetepe::Builder.new, Hatetepe::Parser.new
  @builder.on_write   << method(:send_data)
  # @builder.on_write {|data| p "|--> #{data}" }
  @parser.on_response << method(:receive_response)

  @queue = []

  @app = proc {|request| send_request(request) }

  self.comm_inactivity_timeout = config[:timeout]
  self.pending_connect_timeout = config[:connect_timeout]

  start_tls if config[:ssl]
end

#receive_data(data) ⇒ Object

Feeds response data into the parser.

Parameters:

  • data (String)

    The received data that’s gonna be fed into the parser.

See Also:

  • EM::Connection#receive_data


85
86
87
88
# File 'lib/hatetepe/client.rb', line 85

def receive_data(data)
  # p "|<-- #{data}"
  @parser << data
end

#receive_response(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Relates an incoming response to the corresponding request.

Supports the response bit of HTTP pipelining by relating responses to requests in the order the requests were sent.

TODO: raise a more meaningful error.

Parameters:

Raises:

  • (RuntimeError)

    There is no request that’s waiting for a response.



304
305
306
307
308
309
310
311
312
313
# File 'lib/hatetepe/client.rb', line 304

def receive_response(response)
  query = proc {|j| j.response.nil? }

  if job = @queue.find(&query)
    job.response = response
    job.fiber.resume
  else
    raise "Received response but didn't expect one: #{response.status}"
  end
end

#request(verb, uri, headers = {}, body = []) ⇒ Hatetepe::Response?

Builds a Request, sends it, and blocks while waiting for the response.

Parameters:

  • verb (Symbol, String)

    The HTTP method verb, e.g. :get or “PUT”.

  • uri (String, URI)

    The request URI.

  • headers (Hash) (defaults to: {})

    (optional) The request headers.

  • body (#each) (defaults to: [])

    (optional) A request body object whose #each method yields objects that respond to #to_s.

Returns:



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/hatetepe/client.rb', line 149

def request(verb, uri, headers = {}, body = [])
  uri = URI(uri)
  uri.scheme ||= @config[:ssl] ? 'http' : 'https'
  uri.host ||= @config[:host]
  uri.port ||= @config[:port]

  headers['Host'] ||= "#{uri.host}:#{uri.port}"

  request =  Hatetepe::Request.new(verb, URI(uri.to_s), headers, body)
  self    << request
  EM::Synchrony.sync(request)
end

#request!(verb, uri, headers = {}, body = []) ⇒ Hatetepe

Like #request, but raises errors for 4xx and 5xx responses.

Parameters:

  • verb (Symbol, String)

    The HTTP method verb, e.g. :get or “PUT”.

  • uri (String, URI)

    The request URI.

  • headers (Hash) (defaults to: {})

    (optional) The request headers.

  • body (#each) (defaults to: [])

    (optional) A request body object whose #each method yields objects that respond to #to_s.

Returns:

Raises:



182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/hatetepe/client.rb', line 182

def request!(verb, uri, headers = {}, body = [])
  response = request(verb, uri, headers, body)

  if response.nil?
    raise Hatetepe::RequestError
  elsif response.status >= 500
    raise Hatetepe::ServerError
  elsif response.status >= 400
    raise Hatetepe::ClientError
  end

  response
end

#send_request(request) ⇒ Hatetepe::Response?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Feeds the request into the builder and blocks while waiting for the response to arrive.

Supports the request bit of HTTP pipelining by waiting until the previous request has been sent.

Parameters:

Returns:

  • (Hatetepe::Response, nil)

    The received response or nil if the connection has been closed before receiving a response.



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/hatetepe/client.rb', line 264

def send_request(request)
  previous =  @queue.last
  current  =  Job.new(Fiber.current, request, false)
  @queue   << current

  # wait for the previous request to be sent
  while previous && !previous.sent
    return if Fiber.yield == :kill
  end

  # send the request
  self.comm_inactivity_timeout = 0
  @builder.request(request.to_a)
  current.sent = true
  self.comm_inactivity_timeout = config[:timeout]

  # wait for the response
  while !current.response
    return if Fiber.yield == :kill
  end

  # clean up and return response
  @queue.delete(current)
  current.response
end

#ssl_handshake_completedObject



73
74
75
# File 'lib/hatetepe/client.rb', line 73

def ssl_handshake_completed
  EM::Synchrony.next_tick { @ssl_handshake_completed.succeed }
end

#stopObject

Gracefully stops the client.

Waits for all requests to finish and then stops the client.



201
202
203
204
# File 'lib/hatetepe/client.rb', line 201

def stop
  wait
  stop!
end

#stop!Object

Immediately stops the client by closing the connection.

This will lead to EventMachine’s event loop calling #unbind, which fail all outstanding requests.

See Also:



214
215
216
# File 'lib/hatetepe/client.rb', line 214

def stop!
  close_connection
end

#unbind(reason) ⇒ Object

Aborts all outstanding requests.

See Also:

  • EM::Connection#unbind


95
96
97
98
# File 'lib/hatetepe/client.rb', line 95

def unbind(reason)
  super
  @queue.each {|job| job.fiber.resume(:kill) }
end

#waitObject

Blocks until the last request has finished receiving its response.

Returns immediately if there are no outstanding requests.



223
224
225
226
227
228
# File 'lib/hatetepe/client.rb', line 223

def wait
  if job = @queue.last
    EM::Synchrony.sync(job.request)
    EM::Synchrony.sync(job.response.body) if job.response
  end
end