Class: Fluent::Plugin::CloudwatchIngestInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_cloudwatch_ingest.rb

Defined Under Namespace

Classes: State

Instance Method Summary collapse

Constructor Details

#initializeCloudwatchIngestInput

Returns a new instance of CloudwatchIngestInput.



56
57
58
59
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 56

def initialize
  super
  log.info('Starting fluentd-plugin-cloudwatch-ingest')
end

Instance Method Details

#configure(conf) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 61

def configure(conf)
  super
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, '<parse> section is required.'
  end
  unless parser_config['expression']
    raise Fluent::ConfigError, 'parse/expression is required.'
  end
  unless parser_config['event_time']
    raise Fluent::ConfigError, 'parse/event_time is required.'
  end

  @parser = parser_create(conf: parser_config)
  log.info('Configured fluentd-plugin-cloudwatch-ingest')
end

#shutdownObject



104
105
106
107
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 104

def shutdown
  @finished = true
  @thread.join
end

#startObject



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 79

def start
  super
  log.info('Started fluentd-plugin-cloudwatch-ingest')

  # Get a handle to Cloudwatch
  aws_options = {}
  Aws.config[:region] = @region
  Aws.config[:logger] = log if @aws_logging
  log.info("Working in region #{@region}")

  if @sts_enabled
    aws_options[:credentials] = Aws::AssumeRoleCredentials.new(
      role_arn: @sts_arn,
      role_session_name: @sts_session_name
    )

    log.info("Using STS for authentication with source account ARN: #{@sts_arn}, session name: #{@sts_session_name}") # rubocop:disable all
  else
    log.info('Using local instance IAM role for authentication')
  end
  @aws = Aws::CloudWatchLogs::Client.new(aws_options)
  @finished = false
  @thread = Thread.new(&method(:run))
end