Class: RightScale::NonBlockingClient
- Defined in:
- lib/right_agent/clients/non_blocking_client.rb
Overview
Interface to HTTP using EM::HttpRequest This interface uses non-blocking i/o so that HTTP requests are synchronous to the caller but the underlying thread yields to other activity when blocked on i/o
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Hash of active connections with request path as key and hash value containing :host, :connection, and :expires_at.
-
#health_check_proc ⇒ Object
readonly
Fully configured health check procedure for use with this client.
Instance Method Summary collapse
-
#close(reason) ⇒ TrueClass
Close all persistent connections.
-
#initialize(options) ⇒ NonBlockingClient
constructor
Initialize client.
-
#options(verb, path, params, request_headers, options) ⇒ Array
Construct options for HTTP request.
-
#poll(connection, request_options, stop_at) ⇒ Array
Make long-polling request Note that the underlying thread is not blocked by the HTTP i/o, but this call itself is blocking.
-
#request(verb, path, host, connect_options, request_options) ⇒ Array
Make HTTP request Note that the underlying thread is not blocked by the HTTP i/o, but this call itself is blocking.
Constructor Details
#initialize(options) ⇒ NonBlockingClient
Initialize client
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/right_agent/clients/non_blocking_client.rb', line 43 def initialize() # Defer requiring this gem until now so that right_agent can be used with ruby 1.8.7 require 'em-http-request' @connections = {} # Initialize use of proxy if defined if (v = BalancedHttpClient::PROXY_ENVIRONMENT_VARIABLES.detect { |v| ENV.has_key?(v) }) proxy_uri = ENV[v].match(/^[[:alpha:]]+:\/\//) ? URI.parse(ENV[v]) : URI.parse("http://" + ENV[v]) @proxy = {:host => proxy_uri.host, :port => proxy_uri.port} @proxy[:authorization] = [proxy_uri.user, proxy_uri.password] if proxy_uri.user end # Create health check proc for use by request balancer # Strip user and password from host name since health-check does not require authorization @health_check_proc = Proc.new do |host| uri = URI.parse(host) uri.user = uri.password = nil uri.path = uri.path + ([:health_check_path] || BalancedHttpClient::DEFAULT_HEALTH_CHECK_PATH) = { :connect_timeout => BalancedHttpClient::DEFAULT_OPEN_TIMEOUT, :inactivity_timeout => BalancedHttpClient::HEALTH_CHECK_TIMEOUT } [:proxy] = @proxy if @proxy = {:path => uri.path} [:head] = {"X-API-Version" => [:api_version]} if [:api_version] uri.path = "" request(:get, "", uri.to_s, , ) end end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Hash of active connections with request path as key and hash value containing :host, :connection, and :expires_at
36 37 38 |
# File 'lib/right_agent/clients/non_blocking_client.rb', line 36 def connections @connections end |
#health_check_proc ⇒ Object (readonly)
Fully configured health check procedure for use with this client
32 33 34 |
# File 'lib/right_agent/clients/non_blocking_client.rb', line 32 def health_check_proc @health_check_proc end |
Instance Method Details
#close(reason) ⇒ TrueClass
Close all persistent connections
165 166 167 168 169 |
# File 'lib/right_agent/clients/non_blocking_client.rb', line 165 def close(reason) @connections.each_value { |c| c[:connection].close(reason) } @connections = {} true end |
#options(verb, path, params, request_headers, options) ⇒ Array
Construct options for HTTP request
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/right_agent/clients/non_blocking_client.rb', line 85 def (verb, path, params, request_headers, ) poll_timeout = verb == :poll && [:poll_timeout] = { :connect_timeout => [:open_timeout] || BalancedHttpClient::DEFAULT_OPEN_TIMEOUT, :inactivity_timeout => poll_timeout || [:request_timeout] || BalancedHttpClient::DEFAULT_REQUEST_TIMEOUT } [:proxy] = @proxy if @proxy request_body, request_path = if [:get, :delete].include?(verb) # Doing own formatting because :query option on EM::HttpRequest does not reliably # URL encode, e.g., messes up on arrays in hashes [nil, (params.is_a?(Hash) && params.any?) ? path + "?#{BalancedHttpClient.format(params)}" : path] else request_headers[:content_type] = "application/json" [(params.is_a?(Hash) && params.any?) ? JSON.dump(params) : nil, path] end = {:path => request_path, :body => request_body, :head => request_headers} [:keepalive] = true if verb == :poll [, ] end |
#poll(connection, request_options, stop_at) ⇒ Array
Make long-polling request Note that the underlying thread is not blocked by the HTTP i/o, but this call itself is blocking
149 150 151 152 153 154 155 156 157 158 |
# File 'lib/right_agent/clients/non_blocking_client.rb', line 149 def poll(connection, , stop_at) uri = URI.parse(connection[:host]) [:path] = uri.path + [:path] poll_again(Fiber.current, connection[:connection], , stop_at) code, body, headers = Fiber.yield headers = beautify_headers(headers) if headers result = BalancedHttpClient.response(code, body, headers, [:head][:accept]) connection[:expires_at] = Time.now + BalancedHttpClient::CONNECTION_REUSE_TIMEOUT [result, code, body, headers] end |
#request(verb, path, host, connect_options, request_options) ⇒ Array
Make HTTP request Note that the underlying thread is not blocked by the HTTP i/o, but this call itself is blocking
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/right_agent/clients/non_blocking_client.rb', line 117 def request(verb, path, host, , ) # Finish forming path by stripping path, if any, from host uri = URI.parse(host) [:path] = uri.path + [:path] uri.path = "" # Make request an then yield fiber until it completes fiber = Fiber.current connection = EM::HttpRequest.new(uri.to_s, ) # Store connection now so that close will get called if terminating or reconnecting c = @connections[path] = {:host => host, :connection => connection, :expires_at => Time.now} if [:keepalive] http = connection.send(verb, ) http.errback { @connections.delete(path); fiber.resume(*handle_error(verb, http.error)) } http.callback { fiber.resume(http.response_header.status, http.response, http.response_header) } response_code, response_body, response_headers = Fiber.yield response_headers = beautify_headers(response_headers) if response_headers result = BalancedHttpClient.response(response_code, response_body, response_headers, [:head][:accept]) c[:expires_at] = Time.now + BalancedHttpClient::CONNECTION_REUSE_TIMEOUT if [:keepalive] [result, response_code, response_body, response_headers] end |