Class: LogStash::Outputs::Scalyr
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Scalyr
- Defined in:
- lib/logstash/outputs/scalyr.rb
Overview
Implements the Scalyr output plugin
Instance Method Summary collapse
-
#add_client_timestamp_to_body(body) ⇒ Object
Helper method that adds a client_timestamp to a batch addEvents request body.
-
#build_multi_event_request_array(logstash_events) ⇒ Object
Builds an array of multi-event requests from LogStash events Each array element is a request that groups multiple events (to be posted to Scalyr’s addEvents endpoint).
- #close ⇒ Object
-
#create_multi_event_request(scalyr_events, current_threads, current_logs) ⇒ Object
A request comprises multiple Scalyr Events.
-
#dlq_enabled? ⇒ Boolean
Helper method to check if the dead-letter queue is enabled.
-
#get_new_metrics ⇒ Object
Convenience method to create a fresh quantile estimator.
-
#get_sleep_sec(current_interval) ⇒ Object
Helper method that gets the next sleep time for exponential backoff, capped at a defined maximum.
-
#get_stats ⇒ Object
Retrieve batch and other event level metric values.
- #multi_receive(events) ⇒ Object
- #register ⇒ Object
-
#send_status ⇒ Object
Sends a status update to Scalyr by posting a log entry under the special logfile of ‘logstash_plugin.log’ Instead of creating a separate thread, let this method be invoked once at startup and then every 5 minutes at most.
-
#should_sample? ⇒ Boolean
Returns true if we should sample and record metrics for a specific event based on the sampling rate and random value.
-
#should_transmit_status? ⇒ Boolean
Returns true if it is time to transmit status.
-
#sleep_for(sleep_interval) ⇒ Object
Helper method that performs synchronous sleep for a certain time interval.
Instance Method Details
#add_client_timestamp_to_body(body) ⇒ Object
Helper method that adds a client_timestamp to a batch addEvents request body
583 584 585 586 587 588 |
# File 'lib/logstash/outputs/scalyr.rb', line 583 def (body) current_time_millis = DateTime.now.strftime('%Q').to_i # echee TODO scalyr_agent code suggests this should be "client_time", not "client_timestamp" # however, I cannot find any documentation anywhere. Is it even used? body[:client_timestamp] = current_time_millis.to_s end |
#build_multi_event_request_array(logstash_events) ⇒ Object
Builds an array of multi-event requests from LogStash events Each array element is a request that groups multiple events (to be posted to Scalyr’s addEvents endpoint)
This function also performs data transformations to support special fields and, optionally, flatten JSON values.
Special fields are those that have special semantics to Scalyr, i.e. ‘message’ contains the main log message, ‘serverHost’ and ‘logfile’ have a dedicated search boxes to facilitate filtering. All Logstash event key/values will be marshalled into a Scalyr addEvents ‘attr` key/value unless they are identified as alternate names for special fields. The special fields (’message’, ‘serverHost’, ‘logfile’) may be remapped from other fields (configured by setting ‘message_field’, ‘serverhost_field’, ‘logfile_field’)
Values that are nested JSON may be optionally flattened (See README.md for some examples).
Certain fields are removed (e.g. @timestamp and @version)
Tags are either propagated as a comma-separated string, or optionally transposed into key-values where the keys are tag names and the values are 1 (may be configured.)
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 |
# File 'lib/logstash/outputs/scalyr.rb', line 354 def build_multi_event_request_array(logstash_events) if logstash_events.nil? or logstash_events.empty? return [] end multi_event_request_array = Array.new total_bytes = 0 # Set of unique scalyr threads for this chunk current_threads = Hash.new # Create a Scalyr event object for each record in the chunk scalyr_events = Array.new thread_ids = Hash.new next_id = 1 #incrementing thread id for the session # per-logfile attributes logs = Hash.new logs_ids = Hash.new next_log_id = 1 logstash_events.each {|l_event| record = l_event.to_hash # Create optional threads hash if serverHost is non-nil # echee: TODO I don't think threads are necessary. Too much info? # they seem to be a second level of granularity within a logfile serverHost = record.fetch(@serverhost_field, nil) if serverHost # get thread id or add a new one if we haven't seen this serverHost before if thread_ids.key? serverHost thread_id = thread_ids[serverHost] else thread_id = next_id thread_ids[serverHost] = thread_id next_id += 1 end # then update the map of threads for this chunk current_threads[serverHost] = thread_id end rename = lambda do |renamed_field, standard_field| if standard_field != renamed_field if record.key? renamed_field if record.key? standard_field @logger.warn "Overwriting log record field '#{standard_field}'. You are seeing this warning because in " + "your LogStash config file you have configured the '#{renamed_field}' field to be converted to the " + "'#{standard_field}' field, but the event already contains a field called '#{standard_field}' and " "this is now being overwritten." end record[standard_field] = record[renamed_field] record.delete(renamed_field) end end end # Rename user-specified message field -> 'message' rename.call(, 'message') # Fix message encoding if and !record['message'].to_s.empty? if @replace_invalid_utf8 and == Encoding::UTF_8 record["message"] = record["message"].encode("UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>").force_encoding('UTF-8') else record["message"].force_encoding() end end # Rename user-specified serverHost field -> 'serverHost' rename.call(@serverhost_field, 'serverHost') # Rename user-specified logfile field -> 'logfile' rename.call(@logfile_field, 'logfile') # Set a default parser is none is present in the event if record['parser'].to_s.empty? record['parser'] = "logstashParser" end # Set logfile field if empty and serverHost is supplied if record['logfile'].to_s.empty? and serverHost record['logfile'] = "/logstash/#{serverHost}" end # Set a default if no serverHost value is present. if serverHost.nil? record['serverHost'] = "Logstash" end log_identifier = nil add_log = false if serverHost log_identifier = serverHost + record['logfile'] end if log_identifier and not logs.key? log_identifier add_log = true logs[log_identifier] = { 'id' => next_log_id, 'attrs' => Hash.new } if not record['serverHost'].to_s.empty? logs[log_identifier]['attrs']['serverHost'] = record['serverHost'] record.delete('serverHost') end if not record['logfile'].to_s.empty? logs[log_identifier]['attrs']['logfile'] = record['logfile'] record.delete('logfile') end if @log_constants @log_constants.each {|log_constant| if record.key? log_constant logs[log_identifier]['attrs'][log_constant] = record[log_constant] record.delete(log_constant) end } end logs_ids[log_identifier] = next_log_id next_log_id += 1 end # Delete unwanted fields from record record.delete('@version') record.delete('@timestamp') # flatten tags if and record.key? 'tags' record['tags'].each do |tag| record["#{@flat_tag_prefix}#{tag}"] = @flat_tag_value end record.delete('tags') end # Record per-event level metrics (flatten duration, event attributes count). Doing this for every single # event would be somewhat expensive so we use sampling. should_sample_event_metrics = should_sample? # flatten record if @flatten_nested_values start_time = Time.now.to_f record = Scalyr::Common::Util.flatten(record, delimiter=@flatten_nested_values_delimiter) end_time = Time.now.to_f flatten_nested_values_duration = end_time - start_time end if should_sample_event_metrics @plugin_metrics[:event_attributes_count].observe(record.count) if @flatten_nested_values @plugin_metrics[:flatten_values_duration_secs].observe(flatten_nested_values_duration) end end # Use LogStash event.timestamp as the 'ts' Scalyr timestamp. Note that this may be overwritten by input # filters so may not necessarily reflect the actual originating timestamp. scalyr_event = { :ts => (l_event..time.to_f * (10**9)).round, :attrs => record } # optionally set thread if serverHost scalyr_event[:thread] = thread_id.to_s scalyr_event[:log] = logs_ids[log_identifier] end # get json string of event to keep track of how many bytes we are sending begin event_json = scalyr_event.to_json log_json = nil if add_log log_json = logs[log_identifier].to_json end rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e @logger.warn "#{e.class}: #{e.message}" # Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex) # TODO # atime = Fluent::EventTime.new( sec, nsec ) # router.emit_error_event(serverHost, time, record, e) scalyr_event[:attrs].each do |key, value| @logger.debug "\t#{key} (#{value.encoding.name}): '#{value}'" scalyr_event[:attrs][key] = value.encode( "UTF-8", :invalid => :replace, :undef => :replace, :replace => "<?>" ).force_encoding('UTF-8') end event_json = scalyr_event.to_json end # generate new request if json size of events in the array exceed maximum request buffer size append_event = true add_bytes = event_json.bytesize if log_json add_bytes = add_bytes + log_json.bytesize end if total_bytes + add_bytes > @max_request_buffer # make sure we always have at least one event if scalyr_events.size == 0 scalyr_events << scalyr_event append_event = false end multi_event_request = self.create_multi_event_request(scalyr_events, current_threads, logs) multi_event_request_array << multi_event_request total_bytes = 0 current_threads = Hash.new logs = Hash.new logs_ids = Hash.new scalyr_events = Array.new end # if we haven't consumed the current event already # add it to the end of our array and keep track of the json bytesize if append_event scalyr_events << scalyr_event total_bytes += add_bytes end } # create a final request with any left over events multi_event_request = self.create_multi_event_request(scalyr_events, current_threads, logs) multi_event_request_array << multi_event_request multi_event_request_array end |
#close ⇒ Object
132 133 134 135 |
# File 'lib/logstash/outputs/scalyr.rb', line 132 def close @running = false @client_session.close if @client_session end |
#create_multi_event_request(scalyr_events, current_threads, current_logs) ⇒ Object
A request comprises multiple Scalyr Events. This function creates a request hash for final upload to Scalyr (from an array of events, and an optional hash of current threads) Note: The request body field will be json-encoded.
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 |
# File 'lib/logstash/outputs/scalyr.rb', line 594 def create_multi_event_request(scalyr_events, current_threads, current_logs) body = { :session => @session_id, :token => @api_write_token, :events => scalyr_events, } body # build the scalyr thread JSON object if current_threads threads = Array.new current_threads.each do |thread_name, id| threads << { :id => id.to_s, :name => "LogStash: #{thread_name}" } end body[:threads] = threads end # build the scalyr thread logs object if current_logs logs = Array.new current_logs.each do |identifier, log| logs << log end body[:logs] = logs end # add serverAttributes body[:sessionInfo] = @server_attributes if @server_attributes # We time serialization to get some insight on how long it takes to serialize the request body start_time = Time.now.to_f serialized_body = body.to_json end_time = Time.now.to_f serialization_duration = end_time - start_time { :body => serialized_body, :record_count => scalyr_events.size, :serialization_duration => serialization_duration } end |
#dlq_enabled? ⇒ Boolean
Helper method to check if the dead-letter queue is enabled
750 751 752 753 754 |
# File 'lib/logstash/outputs/scalyr.rb', line 750 def dlq_enabled? # echee TODO submit to DLQ respond_to?(:execution_context) && execution_context.respond_to?(:dlq_writer) && !execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter) end |
#get_new_metrics ⇒ Object
Convenience method to create a fresh quantile estimator
235 236 237 238 239 240 241 242 |
# File 'lib/logstash/outputs/scalyr.rb', line 235 def get_new_metrics return { :multi_receive_duration_secs => Quantile::Estimator.new, :multi_receive_event_count => Quantile::Estimator.new, :event_attributes_count => Quantile::Estimator.new, :flatten_values_duration_secs => Quantile::Estimator.new } end |
#get_sleep_sec(current_interval) ⇒ Object
Helper method that gets the next sleep time for exponential backoff, capped at a defined maximum
743 744 745 746 |
# File 'lib/logstash/outputs/scalyr.rb', line 743 def get_sleep_sec(current_interval) doubled = current_interval * 2 doubled > @retry_max_interval ? @retry_max_interval : doubled end |
#get_stats ⇒ Object
Retrieve batch and other event level metric values
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 |
# File 'lib/logstash/outputs/scalyr.rb', line 636 def get_stats current_stats = @multi_receive_statistics.clone current_stats[:multi_receive_duration_p50] = @plugin_metrics[:multi_receive_duration_secs].query(0.5) current_stats[:multi_receive_duration_p90] = @plugin_metrics[:multi_receive_duration_secs].query(0.9) current_stats[:multi_receive_duration_p99] = @plugin_metrics[:multi_receive_duration_secs].query(0.99) current_stats[:multi_receive_event_count_p50] = @plugin_metrics[:multi_receive_event_count].query(0.5) current_stats[:multi_receive_event_count_p90] = @plugin_metrics[:multi_receive_event_count].query(0.9) current_stats[:multi_receive_event_count_p99] = @plugin_metrics[:multi_receive_event_count].query(0.99) current_stats[:event_attributes_count_p50] = @plugin_metrics[:event_attributes_count].query(0.5) current_stats[:event_attributes_count_p90] = @plugin_metrics[:event_attributes_count].query(0.9) current_stats[:event_attributes_count_p99] = @plugin_metrics[:event_attributes_count].query(0.99) if @flatten_nested_values # We only return those metrics in case flattening is enabled current_stats[:flatten_values_duration_secs_p50] = @plugin_metrics[:flatten_values_duration_secs].query(0.5) current_stats[:flatten_values_duration_secs_p90] = @plugin_metrics[:flatten_values_duration_secs].query(0.9) current_stats[:flatten_values_duration_secs_p99] = @plugin_metrics[:flatten_values_duration_secs].query(0.99) end if @flush_quantile_estimates_on_status_send @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics" @plugin_metrics = get_new_metrics end current_stats end |
#multi_receive(events) ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/logstash/outputs/scalyr.rb', line 255 def multi_receive(events) start_time = Time.now.to_f multi_event_request_array = build_multi_event_request_array(events) # Loop over all array of multi-event requests, sending each multi-event to Scalyr sleep_interval = @retry_initial_interval batch_num = 1 total_batches = multi_event_request_array.length unless multi_event_request_array.nil? result = [] records_count = events.to_a.length while !multi_event_request_array.to_a.empty? begin multi_event_request = multi_event_request_array.pop # For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we # ignore these. if !multi_event_request.nil? @client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration]) sleep_interval = 0 result.push(multi_event_request) end rescue OpenSSL::SSL::SSLError => e # cannot rely on exception message, so we always log the following warning @logger.error "SSL certificate verification failed. " + "Please make sure your certificate bundle is configured correctly and points to a valid file. " + "You can configure this with the ssl_ca_bundle_path configuration option. " + "The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'" @logger.error e. @logger.error "Discarding buffer chunk without retrying." rescue Scalyr::Common::Client::ServerError, Scalyr::Common::Client::ClientError => e sleep_interval = sleep_for(sleep_interval) = "Error uploading to Scalyr (will backoff-retry)" exc_data = { :url => e.url.to_s, :message => e., :batch_num => batch_num, :total_batches => total_batches, :record_count => multi_event_request[:record_count], :payload_size => multi_event_request[:body].bytesize, :will_retry_in_seconds => sleep_interval, } exc_data[:code] = e.response_code if e.code exc_data[:body] = e.response_body if @logger.debug? and e.body exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug? if e.is_commonly_retried? # well-known retriable errors should be debug @logger.debug(, exc_data) else # all other failed uploads should be errors @logger.error(, exc_data) end retry if @running rescue => e # Any unexpected errors should be fully logged @logger.error( "Unexpected error occurred while uploading to Scalyr (will backoff-retry)", :error_message => e., :error_class => e.class.name, :backtrace => e.backtrace ) @logger.debug("Failed multi_event_request", :multi_event_request => multi_event_request) sleep_interval = sleep_for(sleep_interval) retry if @running end end if records_count > 0 @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time) @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time) @plugin_metrics[:multi_receive_event_count].observe(records_count) end send_status return result end |
#register ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/logstash/outputs/scalyr.rb', line 138 def register @prng = Random.new if @event_metrics_sample_rate < 0 or @event_metrics_sample_rate > 1 raise LogStash::ConfigurationError, "Minimum possible value for 'event_metrics_sample_rate' is 0 (dont sample any events) and maximum is 1 (sample every event)" end @node_hostname = Socket.gethostname if @log_constants and not @log_constants.all? { |x| x.is_a? String } raise LogStash::ConfigurationError, "All elements of 'log_constants' must be strings." end if @max_request_buffer > 6000000 @logger.warn "Maximum request buffer > 6 MB. This may result in requests being rejected by Scalyr." end @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil = nil if .to_s != '' begin = Encoding.find() @logger.debug "Forcing message encoding to '#{@force_message_encoding}'" rescue ArgumentError @logger.warn "Encoding '#{@force_message_encoding}' not found. Ignoring." end end #evaluate any statements in string value of the server_attributes object if @server_attributes new_attributes = {} @server_attributes.each do |key, value| if value.is_a?( String ) m = /^\#{(.*)}$/.match( value ) if m new_attributes[key] = eval( m[1] ) else new_attributes[key] = value end end end @server_attributes = new_attributes end # See if we should use the hostname as the server_attributes.serverHost if @use_hostname_for_serverhost if @server_attributes.nil? @server_attributes = {} end # only set serverHost if it doesn't currently exist in server_attributes # Note: Use strings rather than symbols for the key, because keys coming # from the config file will be strings unless @server_attributes.key? 'serverHost' @server_attributes['serverHost'] = @node_hostname end end # Add monitor server attribute to identify this as coming from a plugin @server_attributes['monitor'] = 'pluginLogstash' @scalyr_server << '/' unless @scalyr_server.end_with?('/') @add_events_uri = URI(@scalyr_server) + "addEvents" @logger.info "Scalyr LogStash Plugin ID - #{self.id}" @session_id = SecureRandom.uuid @last_status_transmit_time_lock = Mutex.new @last_status_transmit_time = nil @last_status_ = false # Plugin level (either per batch or event level metrics). Other request # level metrics are handled by the HTTP Client class. @multi_receive_statistics = { :total_multi_receive_secs => 0 } @plugin_metrics = get_new_metrics # create a client session for uploading to Scalyr @running = true @client_session = Scalyr::Common::Client::ClientSession.new( @logger, @add_events_uri, @compression_type, @compression_level, @ssl_verify_peer, @ssl_ca_bundle_path, @ssl_verify_depth, @append_builtin_cert, @record_stats_for_status, @flush_quantile_estimates_on_status_send ) @logger.info("Started Scalyr output plugin", :class => self.class.name) # Finally, send a status line to Scalyr send_status end |
#send_status ⇒ Object
Sends a status update to Scalyr by posting a log entry under the special logfile of ‘logstash_plugin.log’ Instead of creating a separate thread, let this method be invoked once at startup and then every 5 minutes at most. (If no events are received, no status update will be sent even if 5 minutes has elapsed). Finally, note that there could be multiple instances of this plugin (one per worker), in which case each worker thread sends their own status updates. This is intentional so that we know how much data each worker thread is uploading to Scalyr over time.
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 |
# File 'lib/logstash/outputs/scalyr.rb', line 673 def send_status status_event = { :ts => (Time.now.to_f * (10**9)).round, :attrs => { 'logfile' => "scalyr_logstash.log", 'plugin_id' => self.id, } } if !@last_status_transmit_time status_event[:attrs]['message'] = "Started Scalyr LogStash output plugin." status_event[:attrs]['serverHost'] = @node_hostname else cur_time = Time.now() return if (cur_time.to_i - @last_status_transmit_time.to_i) < @status_report_interval # echee TODO: get instance stats from session and create a status log line msg = 'plugin_status: ' cnt = 0 @client_session.get_stats.each do |k, v| val = v.instance_of?(Float) ? sprintf("%.3f", v) : v val = val.nil? ? 0 : val msg << ' ' if cnt > 0 msg << "#{k.to_s}=#{val}" cnt += 1 end get_stats.each do |k, v| val = v.instance_of?(Float) ? sprintf("%.3f", v) : v val = val.nil? ? 0 : val msg << ' ' if cnt > 0 msg << "#{k.to_s}=#{val}" cnt += 1 end status_event[:attrs]['message'] = msg status_event[:attrs]['serverHost'] = @node_hostname status_event[:attrs]['parser'] = @status_parser end multi_event_request = create_multi_event_request([status_event], nil, nil) @client_session.post_add_events(multi_event_request[:body], true, 0) @last_status_transmit_time = Time.now() status_event end |
#should_sample? ⇒ Boolean
Returns true if we should sample and record metrics for a specific event based on the sampling rate and random value
718 719 720 |
# File 'lib/logstash/outputs/scalyr.rb', line 718 def should_sample? return @prng.rand(0.0..1.0) < @event_metrics_sample_rate end |
#should_transmit_status? ⇒ Boolean
Returns true if it is time to transmit status
724 725 726 727 728 729 730 731 732 |
# File 'lib/logstash/outputs/scalyr.rb', line 724 def should_transmit_status? @last_status_transmit_time_lock.synchronize do saved_last_time = @last_status_transmit_time if Time.now.to_i - saved_last_time.to_i > @status_report_interval @last_status_transmit_time = Float::INFINITY return saved_last_time end end end |
#sleep_for(sleep_interval) ⇒ Object
Helper method that performs synchronous sleep for a certain time interval
736 737 738 739 |
# File 'lib/logstash/outputs/scalyr.rb', line 736 def sleep_for(sleep_interval) Stud.stoppable_sleep(sleep_interval) { !@running } get_sleep_sec(sleep_interval) end |