Class: Fluent::GoogleCloudOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::GoogleCloudOutput
- Includes:
- selfself::ConfigConstants, selfself::InternalConstants, selfself::ServiceConstants
- Defined in:
- lib/fluent/plugin/out_google_cloud.rb
Overview
fluentd output plugin for the Stackdriver Logging API
Defined Under Namespace
Modules: ConfigConstants, CredentialsInfo, InternalConstants, Platform, ServiceConstants
Constant Summary collapse
- PLUGIN_NAME =
'Fluentd Google Cloud Logging plugin'.freeze
- PLUGIN_VERSION =
'0.6.16'.freeze
- LOGGING_SCOPE =
Name of the the Google cloud logging write scope.
'https://www.googleapis.com/auth/logging.write'.freeze
- METADATA_SERVICE_ADDR =
Address of the metadata service.
'169.254.169.254'.freeze
Instance Attribute Summary collapse
-
#common_labels ⇒ Object
readonly
Returns the value of attribute common_labels.
-
#project_id ⇒ Object
readonly
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
-
#resource ⇒ Object
readonly
Returns the value of attribute resource.
-
#vm_id ⇒ Object
readonly
Returns the value of attribute vm_id.
-
#zone ⇒ Object
readonly
Returns the value of attribute zone.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ GoogleCloudOutput
constructor
A new instance of GoogleCloudOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudOutput
Returns a new instance of GoogleCloudOutput.
378 379 380 381 382 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 378 def initialize super # use the global logger @log = $log # rubocop:disable Style/GlobalVars end |
Instance Attribute Details
#common_labels ⇒ Object (readonly)
Returns the value of attribute common_labels.
376 377 378 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 376 def common_labels @common_labels end |
#project_id ⇒ Object (readonly)
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
372 373 374 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 372 def project_id @project_id end |
#resource ⇒ Object (readonly)
Returns the value of attribute resource.
375 376 377 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 375 def resource @resource end |
#vm_id ⇒ Object (readonly)
Returns the value of attribute vm_id.
374 375 376 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 374 def vm_id @vm_id end |
#zone ⇒ Object (readonly)
Returns the value of attribute zone.
373 374 375 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 373 def zone @zone end |
Instance Method Details
#configure(conf) ⇒ Object
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 384 def configure(conf) super # TODO(qingling128): Remove this warning after the support is added. Also # remove the comment in the description of this configuration. unless @logging_api_url == DEFAULT_LOGGING_API_URL || @use_grpc @log.warn 'Detected customized logging_api_url while use_grpc is not' \ ' enabled. Customized logging_api_url for the non-gRPC path' \ ' is not supported. The logging_api_url option will be' \ ' ignored.' end # 1. If @metadata_agent_url is customized (aka not nil), use that. # 2. Otherwise check the presence of the environment variable # STACKDRIVER_METADATA_AGENT_URL and use that if set. # 3. Fall back to the default if neither is set. if # Convert to string to capture empty string. ||= if ENV[METADATA_AGENT_URL_ENV_VAR].to_s.empty? DEFAULT_METADATA_AGENT_URL else ENV[METADATA_AGENT_URL_ENV_VAR] end end # If monitoring is enabled, register metrics in the default registry # and store metric objects for future use. if @enable_monitoring registry = Monitoring::MonitoringRegistryFactory.create @monitoring_type @successful_requests_count = registry.counter( :stackdriver_successful_requests_count, 'A number of successful requests to the Stackdriver Logging API') @failed_requests_count = registry.counter( :stackdriver_failed_requests_count, 'A number of failed requests to the Stackdriver Logging API,'\ ' broken down by the error code') @ingested_entries_count = registry.counter( :stackdriver_ingested_entries_count, 'A number of log entries ingested by Stackdriver Logging') @dropped_entries_count = registry.counter( :stackdriver_dropped_entries_count, 'A number of log entries dropped by the Stackdriver output plugin') @retried_entries_count = registry.counter( :stackdriver_retried_entries_count, 'The number of log entries that failed to be ingested by the'\ ' Stackdriver output plugin due to a transient error and were'\ ' retried') @ok_code = @use_grpc ? GRPC::Core::StatusCodes::OK : 200 end # Alert on old authentication configuration. unless @auth_method.nil? && @private_key_email.nil? && @private_key_path.nil? && @private_key_passphrase.nil? extra = [] extra << 'auth_method' unless @auth_method.nil? extra << 'private_key_email' unless @private_key_email.nil? extra << 'private_key_path' unless @private_key_path.nil? extra << 'private_key_passphrase' unless @private_key_passphrase.nil? raise Fluent::ConfigError, "#{PLUGIN_NAME} no longer supports auth_method.\n" \ "Please remove configuration parameters: #{extra.join(' ')}" end set_regexp_patterns @platform = detect_platform # Set required variables: @project_id, @vm_id, @vm_name and @zone. # Retrieve monitored resource. # Fail over to retrieve monitored resource via the legacy path if we fail # to get it from Metadata Agent. @resource ||= determine_agent_level_monitored_resource_via_legacy # Set regexp that we should match tags against later on. Using a list # instead of a map to ensure order. For example, tags will be matched # against Cloud Functions first, then GKE. @tag_regexp_list = [] if @resource.type == GKE_CONSTANTS[:resource_type] # We only support Cloud Functions logs for GKE right now. if ('instance/attributes/' ).split.include?('gcf_region') # Fetch this info and store it to avoid recurring # metadata server calls. @gcf_region = ('instance/attributes/gcf_region') @tag_regexp_list << [ CLOUDFUNCTIONS_CONSTANTS[:resource_type], @compiled_cloudfunctions_tag_regexp ] end @tag_regexp_list << [ GKE_CONSTANTS[:resource_type], @compiled_kubernetes_tag_regexp ] end # Determine the common labels that should be added to all log entries # processed by this logging agent. @common_labels = determine_agent_level_common_labels # The resource and labels are now set up; ensure they can't be modified # without first duping them. @resource.freeze @resource.labels.freeze @common_labels.freeze if @use_grpc @construct_log_entry = method(:construct_log_entry_in_grpc_format) @write_request = method(:write_request_via_grpc) else @construct_log_entry = method(:construct_log_entry_in_rest_format) @write_request = method(:write_request_via_rest) end # Log an informational message containing the Logs viewer URL @log.info 'Logs viewer address: https://console.cloud.google.com/logs/', "viewer?project=#{@project_id}&resource=#{@resource.type}/", "instance_id/#{@vm_id}" end |
#shutdown ⇒ Object
513 514 515 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 513 def shutdown super end |
#start ⇒ Object
506 507 508 509 510 511 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 506 def start super init_api_client @successful_call = false @timenanos_warning = false end |
#write(chunk) ⇒ Object
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 517 def write(chunk) grouped_entries = group_log_entries_by_tag_and_local_resource_id(chunk) requests_to_send = [] grouped_entries.each do |(tag, local_resource_id), arr| entries = [] group_level_resource, group_level_common_labels = determine_group_level_monitored_resource_and_labels( tag, local_resource_id) arr.each do |time, record| entry_level_resource, entry_level_common_labels = determine_entry_level_monitored_resource_and_labels( group_level_resource, group_level_common_labels, record) is_json = false if @detect_json # Save the timestamp and severity if available, then clear it out to # allow for determining whether we should parse the log or message # field. = record.delete('time') severity = record.delete('severity') # If the log is json, we want to export it as a structured log # unless there is additional metadata that would be lost. record_json = nil if record.length == 1 %w(log message msg).each do |field| if record.key?(field) record_json = parse_json_or_nil(record[field]) end end end unless record_json.nil? record = record_json is_json = true end # Restore timestamp and severity if necessary. Note that we don't # want to override these keys in the JSON we've just parsed. record['time'] ||= if record['severity'] ||= severity if severity end ts_secs, ts_nanos = ( entry_level_resource.type, record, time) severity = compute_severity( entry_level_resource.type, record, entry_level_common_labels) entry = @construct_log_entry.call(entry_level_common_labels, entry_level_resource, severity, ts_secs, ts_nanos) # Get fully-qualified trace id for LogEntry "trace" field. fq_trace_id = record.delete(@trace_key) entry.trace = fq_trace_id if fq_trace_id span_id = record.delete(@span_id_key) entry.span_id = span_id if span_id set_log_entry_fields(record, entry) set_payload(entry_level_resource.type, record, entry, is_json) entries.push(entry) end # Don't send an empty request if we rejected all the entries. next if entries.empty? log_name = "projects/#{@project_id}/logs/#{log_name( tag, group_level_resource)}" requests_to_send << { entries: entries, log_name: log_name, resource: group_level_resource, labels: group_level_common_labels } end if @split_logs_by_tag requests_to_send.each do |request| @write_request.call(request) end else # Combine all requests into one. The request level "log_name" will be # ported to the entry level. The request level "resource" and "labels" # are ignored as they should have been folded into the entry level # "resource" and "labels" already anyway. combined_entries = [] requests_to_send.each do |request| request[:entries].each do |entry| # Modify entries in-place as they are not needed later on. entry.log_name = request[:log_name] end combined_entries.concat(request[:entries]) end @write_request.call(entries: combined_entries) end end |