Class: Fluent::GoogleCloudOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::GoogleCloudOutput
- Includes:
- selfself::ConfigConstants, selfself::InternalConstants, selfself::ServiceConstants
- Defined in:
- lib/fluent/plugin/out_google_cloud.rb
Overview
fluentd output plugin for the Stackdriver Logging API
Defined Under Namespace
Modules: ConfigConstants, CredentialsInfo, InternalConstants, Platform, ServiceConstants
Constant Summary collapse
- PLUGIN_NAME =
'Fluentd Google Cloud Logging plugin'.freeze
- PLUGIN_VERSION =
begin # Extract plugin version from file path. match_data = __FILE__.match( %r{fluent-plugin-google-cloud-(?<version>[0-9a-zA-Z\.]*)/}) if match_data match_data['version'] else # Extract plugin version by finding the spec this file was loaded from. dependency = Gem::Dependency.new('fluent-plugin-google-cloud') all_specs, = Gem::SpecFetcher.fetcher.spec_for_dependency(dependency) matching_version, = all_specs.grep( proc { |spec,| __FILE__.include?(spec.full_gem_path) }) do |spec,| spec.version.to_s end matching_version end end.freeze
- LOGGING_SCOPE =
Name of the the Google cloud logging write scope.
'https://www.googleapis.com/auth/logging.write'.freeze
- METADATA_SERVICE_ADDR =
Address of the metadata service.
'169.254.169.254'.freeze
Instance Attribute Summary collapse
-
#common_labels ⇒ Object
readonly
Returns the value of attribute common_labels.
-
#project_id ⇒ Object
readonly
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
-
#resource ⇒ Object
readonly
Returns the value of attribute resource.
-
#vm_id ⇒ Object
readonly
Returns the value of attribute vm_id.
-
#zone ⇒ Object
readonly
Returns the value of attribute zone.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ GoogleCloudOutput
constructor
A new instance of GoogleCloudOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudOutput
Returns a new instance of GoogleCloudOutput.
444 445 446 447 448 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 444 def initialize super # use the global logger @log = $log # rubocop:disable Style/GlobalVars end |
Instance Attribute Details
#common_labels ⇒ Object (readonly)
Returns the value of attribute common_labels.
442 443 444 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 442 def common_labels @common_labels end |
#project_id ⇒ Object (readonly)
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
438 439 440 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 438 def project_id @project_id end |
#resource ⇒ Object (readonly)
Returns the value of attribute resource.
441 442 443 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 441 def resource @resource end |
#vm_id ⇒ Object (readonly)
Returns the value of attribute vm_id.
440 441 442 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 440 def vm_id @vm_id end |
#zone ⇒ Object (readonly)
Returns the value of attribute zone.
439 440 441 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 439 def zone @zone end |
Instance Method Details
#configure(conf) ⇒ Object
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 450 def configure(conf) super # TODO(qingling128): Remove this warning after the support is added. Also # remove the comment in the description of this configuration. unless @logging_api_url == DEFAULT_LOGGING_API_URL || @use_grpc @log.warn 'Detected customized logging_api_url while use_grpc is not' \ ' enabled. Customized logging_api_url for the non-gRPC path' \ ' is not supported. The logging_api_url option will be' \ ' ignored.' end # 1. If @metadata_agent_url is customized (aka not nil), use that. # 2. Otherwise check the presence of the environment variable # STACKDRIVER_METADATA_AGENT_URL and use that if set. # 3. Fall back to the default if neither is set. if @enable_metadata_agent # Convert to string to capture empty string. @metadata_agent_url ||= if ENV[METADATA_AGENT_URL_ENV_VAR].to_s.empty? DEFAULT_METADATA_AGENT_URL else ENV[METADATA_AGENT_URL_ENV_VAR] end end # If monitoring is enabled, register metrics in the default registry # and store metric objects for future use. if @enable_monitoring registry = Monitoring::MonitoringRegistryFactory.create @monitoring_type @successful_requests_count = registry.counter( :stackdriver_successful_requests_count, 'A number of successful requests to the Stackdriver Logging API') @failed_requests_count = registry.counter( :stackdriver_failed_requests_count, 'A number of failed requests to the Stackdriver Logging API,'\ ' broken down by the error code') @ingested_entries_count = registry.counter( :stackdriver_ingested_entries_count, 'A number of log entries ingested by Stackdriver Logging') @dropped_entries_count = registry.counter( :stackdriver_dropped_entries_count, 'A number of log entries dropped by the Stackdriver output plugin') @retried_entries_count = registry.counter( :stackdriver_retried_entries_count, 'The number of log entries that failed to be ingested by the'\ ' Stackdriver output plugin due to a transient error and were'\ ' retried') @ok_code = @use_grpc ? GRPC::Core::StatusCodes::OK : 200 end # Alert on old authentication configuration. unless @auth_method.nil? && @private_key_email.nil? && @private_key_path.nil? && @private_key_passphrase.nil? extra = [] extra << 'auth_method' unless @auth_method.nil? extra << 'private_key_email' unless @private_key_email.nil? extra << 'private_key_path' unless @private_key_path.nil? extra << 'private_key_passphrase' unless @private_key_passphrase.nil? raise Fluent::ConfigError, "#{PLUGIN_NAME} no longer supports auth_method.\n" \ "Please remove configuration parameters: #{extra.join(' ')}" end set_regexp_patterns @platform = detect_platform # Treat an empty setting of the credentials file path environment variable # as unset. This way the googleauth lib could fetch the credentials # following the fallback path. ENV.delete(CREDENTIALS_PATH_ENV_VAR) if ENV[CREDENTIALS_PATH_ENV_VAR] == '' # Set required variables: @project_id, @vm_id, @vm_name and @zone. # Retrieve monitored resource. # Fail over to retrieve monitored resource via the legacy path if we fail # to get it from Metadata Agent. @resource ||= determine_agent_level_monitored_resource_via_legacy # Set regexp that we should match tags against later on. Using a list # instead of a map to ensure order. For example, tags will be matched # against Cloud Functions first, then GKE. @tag_regexp_list = [] if @resource.type == GKE_CONSTANTS[:resource_type] # We only support Cloud Functions logs for GKE right now. if ('instance/attributes/' ).split.include?('gcf_region') # Fetch this info and store it to avoid recurring # metadata server calls. @gcf_region = ('instance/attributes/gcf_region') @tag_regexp_list << [ CLOUDFUNCTIONS_CONSTANTS[:resource_type], @compiled_cloudfunctions_tag_regexp ] end @tag_regexp_list << [ GKE_CONSTANTS[:resource_type], @compiled_kubernetes_tag_regexp ] end # Determine the common labels that should be added to all log entries # processed by this logging agent. @common_labels = determine_agent_level_common_labels # The resource and labels are now set up; ensure they can't be modified # without first duping them. @resource.freeze @resource.labels.freeze @common_labels.freeze if @use_grpc @construct_log_entry = method(:construct_log_entry_in_grpc_format) @write_request = method(:write_request_via_grpc) else @construct_log_entry = method(:construct_log_entry_in_rest_format) @write_request = method(:write_request_via_rest) end if [Platform::GCE, Platform::EC2].include?(@platform) # Log an informational message containing the Logs viewer URL @log.info 'Logs viewer address: https://console.cloud.google.com/logs/', "viewer?project=#{@project_id}&resource=#{@resource.type}/", "instance_id/#{@vm_id}" end end |
#shutdown ⇒ Object
587 588 589 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 587 def shutdown super end |
#start ⇒ Object
580 581 582 583 584 585 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 580 def start super init_api_client @successful_call = false @timenanos_warning = false end |
#write(chunk) ⇒ Object
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 591 def write(chunk) grouped_entries = group_log_entries_by_tag_and_local_resource_id(chunk) requests_to_send = [] grouped_entries.each do |(tag, local_resource_id), arr| entries = [] group_level_resource, group_level_common_labels = determine_group_level_monitored_resource_and_labels( tag, local_resource_id) arr.each do |time, record| entry_level_resource, entry_level_common_labels = determine_entry_level_monitored_resource_and_labels( group_level_resource, group_level_common_labels, record) is_json = false if @detect_json # Save the following fields if available, then clear them out to # allow for determining whether we should parse the log or message # field. preserved_keys = [ 'time', 'severity', @trace_key, @span_id_key, @insert_id_key ] # If the log is json, we want to export it as a structured log # unless there is additional metadata that would be lost. record_json = nil if (record.keys - preserved_keys).length == 1 %w(log message msg).each do |field| if record.key?(field) record_json = parse_json_or_nil(record[field]) end end end unless record_json.nil? # Propagate these if necessary. Note that we don't want to # override these keys in the JSON we've just parsed. preserved_keys.each do |key| record_json[key] ||= record[key] if record.key?(key) end record = record_json is_json = true end end ts_secs, ts_nanos = ( entry_level_resource.type, record, time) severity = compute_severity( entry_level_resource.type, record, entry_level_common_labels) entry = @construct_log_entry.call(entry_level_common_labels, entry_level_resource, severity, ts_secs, ts_nanos) trace = record.delete(@trace_key) entry.trace = compute_trace(trace) if trace span_id = record.delete(@span_id_key) entry.span_id = span_id if span_id insert_id = record.delete(@insert_id_key) entry.insert_id = insert_id if insert_id set_log_entry_fields(record, entry) set_payload(entry_level_resource.type, record, entry, is_json) entries.push(entry) end # Don't send an empty request if we rejected all the entries. next if entries.empty? log_name = "projects/#{@project_id}/logs/#{log_name( tag, group_level_resource)}" requests_to_send << { entries: entries, log_name: log_name, resource: group_level_resource, labels: group_level_common_labels } end if @split_logs_by_tag requests_to_send.each do |request| @write_request.call(request) end else # Combine all requests into one. The request level "log_name" will be # ported to the entry level. The request level "resource" and "labels" # are ignored as they should have been folded into the entry level # "resource" and "labels" already anyway. combined_entries = [] requests_to_send.each do |request| request[:entries].each do |entry| # Modify entries in-place as they are not needed later on. entry.log_name = request[:log_name] end combined_entries.concat(request[:entries]) end @write_request.call(entries: combined_entries) unless combined_entries.empty? end end |