Class: RightScale::NonBlockingClient

Inherits:
Object
  • Object
show all
Defined in:
lib/right_agent/clients/non_blocking_client.rb

Overview

Interface to HTTP using EM::HttpRequest This interface uses non-blocking i/o so that HTTP requests are synchronous to the caller but the underlying thread yields to other activity when blocked on i/o

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ NonBlockingClient

Initialize client

Parameters:

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :api_version (String)

    for X-API-Version header

  • :health_check_path (String)

    in URI for health check resource; defaults to BalancedHttpClient::DEFAULT_HEALTH_CHECK_PATH



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/right_agent/clients/non_blocking_client.rb', line 43

def initialize(options)
  # Defer requiring this gem until now so that right_agent can be used with ruby 1.8.7
  require 'em-http-request'

  @connections = {}

  # Initialize use of proxy if defined
  if (v = BalancedHttpClient::PROXY_ENVIRONMENT_VARIABLES.detect { |v| ENV.has_key?(v) })
    proxy_uri = ENV[v].match(/^[[:alpha:]]+:\/\//) ? URI.parse(ENV[v]) : URI.parse("http://" + ENV[v])
    @proxy = {:host => proxy_uri.host, :port => proxy_uri.port}
    @proxy[:authorization] = [proxy_uri.user, proxy_uri.password] if proxy_uri.user
  end

  # Create health check proc for use by request balancer
  # Strip user and password from host name since health-check does not require authorization
  @health_check_proc = Proc.new do |host|
    uri = URI.parse(host)
    uri.user = uri.password = nil
    uri.path = uri.path + (options[:health_check_path] || BalancedHttpClient::DEFAULT_HEALTH_CHECK_PATH)
    connect_options = {
      :connect_timeout => BalancedHttpClient::DEFAULT_OPEN_TIMEOUT,
      :inactivity_timeout => BalancedHttpClient::HEALTH_CHECK_TIMEOUT }
    connect_options[:proxy] = @proxy if @proxy
    request_options = {:path => uri.path}
    request_options[:head] = {"X-API-Version" => options[:api_version]} if options[:api_version]
    uri.path = ""
    request(:get, "", uri.to_s, connect_options, request_options)
  end
end

Instance Attribute Details

#connectionsObject (readonly)

Hash of active connections with request path as key and hash value containing :host, :connection, and :expires_at



36
37
38
# File 'lib/right_agent/clients/non_blocking_client.rb', line 36

def connections
  @connections
end

#health_check_procObject (readonly)

Fully configured health check procedure for use with this client



32
33
34
# File 'lib/right_agent/clients/non_blocking_client.rb', line 32

def health_check_proc
  @health_check_proc
end

Instance Method Details

#close(reason) ⇒ TrueClass

Close all persistent connections

Parameters:

  • reason (String)

    for closing

Returns:

  • (TrueClass)

    always true



165
166
167
168
169
# File 'lib/right_agent/clients/non_blocking_client.rb', line 165

def close(reason)
  @connections.each_value { |c| c[:connection].close(reason) }
  @connections = {}
  true
end

#options(verb, path, params, request_headers, options) ⇒ Array

Construct options for HTTP request

Parameters:

  • verb (Symbol)

    for HTTP REST request

  • path (String)

    in URI for desired resource

  • params (Hash)

    for HTTP request

  • request_headers (String)

    to be applied to request

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :open_timeout (Numeric)

    maximum wait for connection; defaults to DEFAULT_OPEN_TIMEOUT

  • :request_timeout (Numeric)

    maximum wait for response; defaults to DEFAULT_REQUEST_TIMEOUT

  • :poll_timeout (Numeric)

    maximum wait for individual poll; defaults to :request_timeout

Returns:

  • (Array)

    connect and request option hashes



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/right_agent/clients/non_blocking_client.rb', line 85

def options(verb, path, params, request_headers, options)
  poll_timeout = verb == :poll && options[:poll_timeout]
  connect_options = {
    :connect_timeout => options[:open_timeout] || BalancedHttpClient::DEFAULT_OPEN_TIMEOUT,
    :inactivity_timeout => poll_timeout || options[:request_timeout] || BalancedHttpClient::DEFAULT_REQUEST_TIMEOUT }
  connect_options[:proxy] = @proxy if @proxy

  request_body, request_path = if [:get, :delete].include?(verb)
    # Doing own formatting because :query option on EM::HttpRequest does not reliably
    # URL encode, e.g., messes up on arrays in hashes
    [nil, (params.is_a?(Hash) && params.any?) ? path + "?#{BalancedHttpClient.format(params)}" : path]
  else
    request_headers[:content_type] = "application/json"
    [(params.is_a?(Hash) && params.any?) ? JSON.dump(params) : nil, path]
  end
  request_options = {:path => request_path, :body => request_body, :head => request_headers}
  request_options[:keepalive] = true if verb == :poll
  [connect_options, request_options]
end

#poll(connection, request_options, stop_at) ⇒ Array

Make long-polling request Note that the underlying thread is not blocked by the HTTP i/o, but this call itself is blocking

Parameters:

  • connection (Hash)

    to server from previous request with keys :host, :connection, and :expires_at, with the :expires_at being adjusted on return

  • request_options (Hash)

    for HTTP request

  • stop_at (Time)

    time for polling

Returns:

  • (Array)

    result to be returned followed by response code, body, and headers

Raises:



149
150
151
152
153
154
155
156
157
158
# File 'lib/right_agent/clients/non_blocking_client.rb', line 149

def poll(connection, request_options, stop_at)
  uri = URI.parse(connection[:host])
  request_options[:path] = uri.path + request_options[:path]
  poll_again(Fiber.current, connection[:connection], request_options, stop_at)
  code, body, headers = Fiber.yield
  headers = beautify_headers(headers) if headers
  result = BalancedHttpClient.response(code, body, headers, request_options[:head][:accept])
  connection[:expires_at] = Time.now + BalancedHttpClient::CONNECTION_REUSE_TIMEOUT
  [result, code, body, headers]
end

#request(verb, path, host, connect_options, request_options) ⇒ Array

Make HTTP request Note that the underlying thread is not blocked by the HTTP i/o, but this call itself is blocking

Parameters:

  • verb (Symbol)

    for HTTP REST request

  • path (String)

    in URI for desired resource

  • host (String)

    name of server

  • connect_options (Hash)

    for HTTP connection

  • request_options (Hash)

    for HTTP request

Returns:

  • (Array)

    result to be returned followed by response code, body, and headers

Raises:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/right_agent/clients/non_blocking_client.rb', line 117

def request(verb, path, host, connect_options, request_options)
  # Finish forming path by stripping path, if any, from host
  uri = URI.parse(host)
  request_options[:path] = uri.path + request_options[:path]
  uri.path = ""

  # Make request an then yield fiber until it completes
  fiber = Fiber.current
  connection = EM::HttpRequest.new(uri.to_s, connect_options)
  # Store connection now so that close will get called if terminating or reconnecting
  c = @connections[path] = {:host => host, :connection => connection, :expires_at => Time.now} if request_options[:keepalive]
  http = connection.send(verb, request_options)
  http.errback { @connections.delete(path); fiber.resume(*handle_error(verb, http.error)) }
  http.callback { fiber.resume(http.response_header.status, http.response, http.response_header) }
  response_code, response_body, response_headers = Fiber.yield
  response_headers = beautify_headers(response_headers) if response_headers
  result = BalancedHttpClient.response(response_code, response_body, response_headers, request_options[:head][:accept])
  c[:expires_at] = Time.now + BalancedHttpClient::CONNECTION_REUSE_TIMEOUT if request_options[:keepalive]
  [result, response_code, response_body, response_headers]
end