Class: LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter
- Inherits:
-
Object
- Object
- LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter
- Defined in:
- lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#manticore ⇒ Object
readonly
Returns the value of attribute manticore.
Instance Method Summary collapse
- #client ⇒ Object
- #close ⇒ Object
- #format_url(url, path_and_query = nil) ⇒ Object
- #host_unreachable_exceptions ⇒ Object
-
#initialize(logger, options = {}) ⇒ ManticoreAdapter
constructor
A new instance of ManticoreAdapter.
-
#manticore_proxy_hash(proxy_uri) ⇒ Object
Transform the proxy option to a hash.
-
#perform_request(url, method, path, params = {}, body = nil) ⇒ Response
Performs the request by invoking Transport::Base#perform_request with a block.
Constructor Details
#initialize(logger, options = {}) ⇒ ManticoreAdapter
Returns a new instance of ManticoreAdapter.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 21 def initialize(logger, ={}) @logger = logger = .clone || {} [:ssl] = [:ssl] || {} # We manage our own retries directly, so let's disable them here [:automatic_retries] = 0 # We definitely don't need cookies [:cookies] = false @client_params = {:headers => DEFAULT_HEADERS.merge([:headers]|| {}),} @port = [:port] || 9200 @protocol = [:protocol] || 'http' @region = [:region] || 'us-east-1' aws_access_key_id = [:aws_access_key_id] || nil aws_secret_access_key = [:aws_secret_access_key] || nil session_token = [:session_token] || nil profile = [:profile] || 'default' instance_cred_retries = [:instance_profile_credentials_retries] || 0 instance_cred_timeout = [:instance_profile_credentials_timeout] || 1 credential_config = CredentialConfig.new(aws_access_key_id, aws_secret_access_key, session_token, profile, instance_cred_retries, instance_cred_timeout, @region) @credentials = Aws::CredentialProviderChain.new(credential_config).resolve if [:proxy] [:proxy] = manticore_proxy_hash([:proxy]) end @manticore = ::Manticore::Client.new() end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
19 20 21 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 19 def logger @logger end |
#manticore ⇒ Object (readonly)
Returns the value of attribute manticore.
19 20 21 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 19 def manticore @manticore end |
Instance Method Details
#client ⇒ Object
64 65 66 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 64 def client @manticore end |
#close ⇒ Object
158 159 160 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 158 def close @manticore.close end |
#format_url(url, path_and_query = nil) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 132 def format_url(url, path_and_query=nil) request_uri = url.clone # We excise auth info from the URL in case manticore itself tries to stick # sensitive data in a thrown exception or log data request_uri.user = nil request_uri.password = nil return request_uri.to_s if path_and_query.nil? parsed_path_and_query = java.net.URI.new(path_and_query) query = request_uri.query parsed_query = parsed_path_and_query.query new_query_parts = [request_uri.query, parsed_path_and_query.query].select do |part| part && !part.empty? # Skip empty nil and "" end request_uri.query = new_query_parts.join("&") unless new_query_parts.empty? request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.path}".gsub(/\/{2,}/, "/") request_uri end |
#host_unreachable_exceptions ⇒ Object
162 163 164 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 162 def host_unreachable_exceptions [::Manticore::Timeout,::Manticore::SocketException, ::Manticore::ClientProtocolException, ::Manticore::ResolutionFailure, Manticore::SocketTimeout] end |
#manticore_proxy_hash(proxy_uri) ⇒ Object
Transform the proxy option to a hash. Manticore’s support for non-hash proxy options is broken. This was fixed in github.com/cheald/manticore/commit/34a00cee57a56148629ed0a47c329181e7319af5 but this is not yet released
56 57 58 59 60 61 62 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 56 def manticore_proxy_hash(proxy_uri) [:scheme, :port, :user, :password, :path].reduce(:host => proxy_uri.host) do |acc,opt| value = proxy_uri.send(opt) acc[opt] = value unless value.nil? || (value.is_a?(String) && value.empty?) acc end end |
#perform_request(url, method, path, params = {}, body = nil) ⇒ Response
Performs the request by invoking Transport::Base#perform_request with a block.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/logstash/outputs/amazon_es/http_client/manticore_adapter.rb', line 75 def perform_request(url, method, path, params={}, body=nil) # Perform 2-level deep merge on the params, so if the passed params and client params will both have hashes stored on a key they # will be merged as well, instead of choosing just one of the values params = (params || {}).merge(@client_params) { |key, oldval, newval| (oldval.is_a?(Hash) && newval.is_a?(Hash)) ? oldval.merge(newval) : newval } params[:headers] = params[:headers].clone params[:body] = body if body if url.user params[:auth] = { :user => CGI.unescape(url.user), # We have to unescape the password here since manticore won't do it # for us unless its part of the URL :password => CGI.unescape(url.password), :eager => true } end request_uri = format_url(url, path) if @protocol == "https" url = URI::HTTPS.build({:host=>URI(request_uri.to_s).host, :port=>@port.to_s, :path=>path}) else url = URI::HTTP.build({:host=>URI(request_uri.to_s).host, :port=>@port.to_s, :path=>path}) end key = Seahorse::Client::Http::Request.new(={:endpoint=>url, :http_method => method.to_s.upcase, :headers => params[:headers],:body => params[:body]}) aws_signer = Aws::Signers::V4.new(@credentials, 'es', @region ) signed_key = aws_signer.sign(key) params[:headers] = params[:headers].merge(signed_key.headers) resp = @manticore.send(method.downcase, request_uri.to_s, params) # Manticore returns lazy responses by default # We want to block for our usage, this will wait for the repsonse # to finish resp.call # 404s are excluded because they are valid codes in the case of # template installation. We might need a better story around this later # but for our current purposes this is correct if resp.code < 200 || resp.code > 299 && resp.code != 404 raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(resp.code, request_uri, body, resp.body) end resp end |