Class: Scalyr::Common::Client::ClientSession
- Inherits:
-
Object
- Object
- Scalyr::Common::Client::ClientSession
- Defined in:
- lib/scalyr/common/client.rb
Overview
Encapsulates the connection between the agent and the Scalyr server, thus shielding the implementation (which may create a new connection for every post or use a persistent connection)
Instance Method Summary collapse
- #close ⇒ Object
-
#get_new_latency_stats ⇒ Object
Convenience method to create a fresh quantile estimator.
-
#get_stats ⇒ Object
Get a clone of current statistics hash and calculate percentiles.
-
#initialize(logger, add_events_uri, compression_type, compression_level, ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert, record_stats_for_status, flush_quantile_estimates_on_status_send) ⇒ ClientSession
constructor
A new instance of ClientSession.
-
#post_add_events(body, is_status, body_serialization_duration = 0) ⇒ Object
Upload data to Scalyr.
Constructor Details
#initialize(logger, add_events_uri, compression_type, compression_level, ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert, record_stats_for_status, flush_quantile_estimates_on_status_send) ⇒ ClientSession
Returns a new instance of ClientSession.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/scalyr/common/client.rb', line 54 def initialize(logger, add_events_uri, compression_type, compression_level, ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert, record_stats_for_status, flush_quantile_estimates_on_status_send) @logger = logger @add_events_uri = add_events_uri # typically /addEvents @compression_type = compression_type @compression_level = compression_level @ssl_verify_peer = ssl_verify_peer @ssl_ca_bundle_path = ssl_ca_bundle_path @append_builtin_cert = append_builtin_cert @ssl_verify_depth = ssl_verify_depth @record_stats_for_status = record_stats_for_status @flush_quantile_estimates_on_status_send = flush_quantile_estimates_on_status_send # A cert to use by default to avoid issues caused by the OpenSSL library not validating certs according to standard @cert_string = "" \ "-----BEGIN CERTIFICATE-----\n" \ "MIIG6zCCBNOgAwIBAgIJAM5aknNWtN6oMA0GCSqGSIb3DQEBCwUAMIGpMQswCQYD\n" \ "VQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEXMBUGA1UEBxMOUG9ydG9sYSBW\n" \ "YWxsZXkxEzARBgNVBAoTClNjYWx5ciBJbmMxFTATBgNVBAsTDFNjYWx5ciBBZ2Vu\n" \ "dDEdMBsGA1UEAxMUU2NhbHlyIEFnZW50IENBIFJvb3QxITAfBgkqhkiG9w0BCQEW\n" \ "EmNvbnRhY3RAc2NhbHlyLmNvbTAeFw0xNDA5MDkyMTUyMDVaFw0yNDA5MDYyMTUy\n" \ "MDVaMIGpMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEXMBUGA1UE\n" \ "BxMOUG9ydG9sYSBWYWxsZXkxEzARBgNVBAoTClNjYWx5ciBJbmMxFTATBgNVBAsT\n" \ "DFNjYWx5ciBBZ2VudDEdMBsGA1UEAxMUU2NhbHlyIEFnZW50IENBIFJvb3QxITAf\n" \ "BgkqhkiG9w0BCQEWEmNvbnRhY3RAc2NhbHlyLmNvbTCCAiIwDQYJKoZIhvcNAQEB\n" \ "BQADggIPADCCAgoCggIBALdNamcMNVxkIB6qVWmNCi1jeyeqOX00rYAWDlyBHff7\n" \ "vU833Evuixgrf0HxrOQNiPsOK66ehG6LfJd2UIBDEHBCXRo+aeFQLrCLIVXiqJ2W\n" \ "Tvl7dUU9d7zfw/XXif3lMQTiyQAWYTyjfugDczEScEUk93EWFfW47j9PTGh96yKm\n" \ "nVbfOxD4XbN0ykdo85cs7M/NOHQj4q34l77XGXrit+nb1cL3wS9ZzJG8s40J2+Dp\n" \ "LUA8KBQuvim6hfqrjaDX0bXVvc52a7TSh/zb58gkLbiqvBuPo5P8PBLHCx8bJtZu\n" \ "fjWRdjaftgw7CcsdIuMhbm3823WI/A+/p4s1B5KOPqOYRkgG8FBqFIRTecKAV5wC\n" \ "Z2ruTytoOUBWItrheyJhm+99X1I2y/6mdecBdk7j3+8U+nCsGHkH5Jwjl2BH9tfT\n" \ "RUhVTCQs25XLNm41kZo7xK464xZsJKHXj9jr5gLIdF6CgzU2uYsQHKcw1pAVITLe\n" \ "bfGEob8AcL0E7+1hurRjyYxtxZpsZeGMwI0/BStT+fLEAOJ1byGUgSUbhi9lJ8Hc\n" \ "+NZDfaCaCZKRxjePCqeWjZUUdVoH3fNSi2GuNLqtOFzxlkP5tBErnXufE6XZAtEQ\n" \ "lv/9qxa4ZLsvhbt+6qQryIAHL4aReh/VReER438ARdwG2QDK+vRfhNpke69em5Kb\n" \ "AgMBAAGjggESMIIBDjAdBgNVHQ4EFgQUENX6MjnzqTJdTQMAEakSdXV/I80wgd4G\n" \ "A1UdIwSB1jCB04AUENX6MjnzqTJdTQMAEakSdXV/I82hga+kgawwgakxCzAJBgNV\n" \ "BAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRcwFQYDVQQHEw5Qb3J0b2xhIFZh\n" \ "bGxleTETMBEGA1UEChMKU2NhbHlyIEluYzEVMBMGA1UECxMMU2NhbHlyIEFnZW50\n" \ "MR0wGwYDVQQDExRTY2FseXIgQWdlbnQgQ0EgUm9vdDEhMB8GCSqGSIb3DQEJARYS\n" \ "Y29udGFjdEBzY2FseXIuY29tggkAzlqSc1a03qgwDAYDVR0TBAUwAwEB/zANBgkq\n" \ "hkiG9w0BAQsFAAOCAgEAmmgm1AeO7wfuR36HHgpZCKZxboRFwc2FzKaHNSg2FQ0G\n" \ "MuOP6HZUQWsaXLe0Kc8emJKrIwrn6x2jMm19Bjbps2bPW6ao6UE/6fb5Z7CX82IX\n" \ "pKlDDH6OfYjDplBzoqf5PkPgxZNyiZ7nyNUWz+P2vesLFVynmej2MvLIZVnEJ2Wp\n" \ "xzyHMKQo92DP8yNEudoK8QQpoLcuNcAli9blt8+NIV9RSDrI9CvArLNpZJMlS1Vx\n" \ "gdzEU3wEQYWc36j3XCsp7ZDvgTm6FpyHS5ccMpXR1E62tVINGX9r+97ZHyxjqurb\n" \ "606y1FzV/5Mf/aihPYSSreq63UVqdsaQfyS77Q4tpJofq875w8nd2Vs3guDs2T0h\n" \ "1bOlV3e2HfglWsHKwNguQZo2nfMUp11IYfV/HOKWNQkbrPhuayXMi3i2wCZe9JNt\n" \ "P9uZ2OjzsVu2QFcSlvZF6y02/bjbNATRfj/J/SHNFyCDu6bXhtAu0yZzFLiOZxjD\n" \ "LwzunBMoWcJj+P2Vx3OhbE9FMyMeKdOWdTgiI1GLEkfJi6s7d/tk1ayLmbBTRD/e\n" \ "XkjSeLBss6mA1INuE1+gKVA4MABsUiLqGZ8xCPN16CyPcTqL2TJFo1IOqivMxKDh\n" \ "H4Z/mHoGi5SRnye+Wo+jyiQiWjJQ5LrlQPbHmuO0tLs9lM1t9nhzLifzga5F4+o=\n" \ "-----END CERTIFICATE-----" # Request statistics are accumulated across multiple threads and must be accessed through a mutex @stats_lock = Mutex.new @latency_stats = get_new_latency_stats @stats = { :total_requests_sent => 0, # The total number of RPC requests sent. :total_requests_failed => 0, # The total number of RPC requests that failed. :total_request_bytes_sent => 0, # The total number of bytes sent over the network. :total_compressed_request_bytes_sent => 0, # The total number of compressed bytes sent over the network :total_response_bytes_received => 0, # The total number of bytes received. :total_request_latency_secs => 0, # The total number of secs spent waiting for a responses (so average latency # can be calculated by dividing this number by @total_requests_sent). # This includes connection establishment time. :total_serialization_duration_secs => 0, # The total duration (in seconds) it took to serialize (JSON dumos) all the request bodies. # You can calculate avg compression duration by diving this value with total_requests_sent :total_compression_duration_secs => 0, # The total duration (in seconds) it took to compress all the request bodies. # You can calculate avg compression duration by diving this value with total_requests_sent :compression_type => @compression_type, :compression_level => @compression_level, } @http = Net::HTTP::Persistent.new # verify peers to prevent potential MITM attacks if @ssl_verify_peer @ca_cert = Tempfile.new("ca_cert") if File.file?(@ssl_ca_bundle_path) @ca_cert.write(File.read(@ssl_ca_bundle_path)) @ca_cert.flush end if @append_builtin_cert open(@ca_cert.path, 'a') do |f| f.puts @cert_string end end @ca_cert.flush @http.ca_file = @ca_cert.path @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @http.verify_depth = @ssl_verify_depth else @http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end |
Instance Method Details
#close ⇒ Object
247 248 249 |
# File 'lib/scalyr/common/client.rb', line 247 def close @http.shutdown end |
#get_new_latency_stats ⇒ Object
Convenience method to create a fresh quantile estimator
154 155 156 157 158 159 160 161 162 |
# File 'lib/scalyr/common/client.rb', line 154 def get_new_latency_stats return { # The total number of HTTP connections successfully created. :serialization_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to serialize (JSON dumos) all the request bodies. :compression_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to compress all the request bodies. :request_latency_secs => Quantile::Estimator.new, # Secs spent waiting for a responses. This includes connection establishment time. :bytes_sent => Quantile::Estimator.new # The number of bytes sent over the network. Batch size with a bit more overhead. } end |
#get_stats ⇒ Object
Get a clone of current statistics hash and calculate percentiles
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/scalyr/common/client.rb', line 165 def get_stats current_stats = @stats.clone current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5) current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9) current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99) current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5) current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9) current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99) current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5) current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9) current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99) current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5) current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9) current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99) if @flush_quantile_estimates_on_status_send @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics" @latency_stats = get_new_latency_stats end current_stats end |
#post_add_events(body, is_status, body_serialization_duration = 0) ⇒ Object
Upload data to Scalyr. Assumes that the body size complies with Scalyr limits
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/scalyr/common/client.rb', line 191 def post_add_events(body, is_status, body_serialization_duration = 0) post, compression_duration = prepare_post_object @add_events_uri.path, body fail_count = 1 # putative assume failure start_time = Time.now uncompressed_bytes_sent = 0 compressed_bytes_sent = 0 bytes_received = 0 begin response = @http.request(@add_events_uri, post) handle_response(response) fail_count -= 1 # success means we negate the putative failure uncompressed_bytes_sent = (body.bytesize + @add_events_uri.path.bytesize) compressed_bytes_sent = (post.body.bytesize + @add_events_uri.path.bytesize) bytes_received = response.body.bytesize # echee: double check # echee TODO add more statistics rescue OpenSSL::SSL::SSLError => e if @ssl_verify_peer and @ssl_ca_bundle_path.nil? and !File.file?(@ca_cert.path) @ca_cert = Tempfile.new("ca_cert") @ca_cert.write(@cert_string) @ca_cert.flush @http.ca_file = @ca_cert.path raise ClientError.new("Packaged certificate appears to have been deleted, writing a new one.", @add_events_uri) else raise e end rescue Net::HTTP::Persistent::Error => e # The underlying persistent-connection library automatically retries when there are network-related errors. # Eventually, it will give up and raise this generic error, at which time, we convert it to a ClientError raise ClientError.new(e., @add_events_uri) ensure if @record_stats_for_status or !is_status @stats_lock.synchronize do @stats[:total_requests_sent] += 1 @stats[:total_requests_failed] += fail_count @stats[:total_request_bytes_sent] += uncompressed_bytes_sent @stats[:total_compressed_request_bytes_sent] += compressed_bytes_sent @stats[:total_response_bytes_received] += bytes_received @stats[:total_serialization_duration_secs] += body_serialization_duration @stats[:total_compression_duration_secs] += compression_duration end_time = Time.now @stats[:total_request_latency_secs] += (end_time - start_time) @latency_stats[:request_latency_secs].observe(end_time - start_time) @latency_stats[:serialization_duration_secs].observe(body_serialization_duration) @latency_stats[:compression_duration_secs].observe(compression_duration) @latency_stats[:bytes_sent].observe(uncompressed_bytes_sent) end end end end |