Class: Fluent::Plugin::S3Input

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb

Defined Under Namespace

Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor

Constant Summary collapse

DEFAULT_PARSE_TYPE =
"none"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3Input

Returns a new instance of S3Input.



18
19
20
21
# File 'lib/fluent/plugin/in_s3.rb', line 18

def initialize
  super
  @extractor = nil
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



90
91
92
# File 'lib/fluent/plugin/in_s3.rb', line 90

def bucket
  @bucket
end

Instance Method Details

#configure(conf) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/in_s3.rb', line 92

def configure(conf)
  super

  parser_config = conf.elements("parse").first
  unless @sqs.queue_name
    raise Fluent::ConfigError, "sqs/queue_name is required"
  end

  Aws.use_bundled_cert! if @use_bundled_cert

  @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log)
  @extractor.configure(conf)

  @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE)
end

#shutdownObject



132
133
134
135
# File 'lib/fluent/plugin/in_s3.rb', line 132

def shutdown
  @running = false
  super
end

#startObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/in_s3.rb', line 108

def start
  super

  s3_client = create_s3_client
  log.debug("Succeeded to create S3 client")
  @s3 = Aws::S3::Resource.new(client: s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  raise "#{@bucket.name} is not found." unless @bucket.exists?

  check_apikeys if @check_apikey_on_start

  sqs_client = create_sqs_client
  log.debug("Succeeded to create SQS client")
  response = sqs_client.get_queue_url(queue_name: @sqs.queue_name)
  sqs_queue_url = response.queue_url
  log.debug("Succeeded to get SQS queue URL")

  @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)

  @running = true
  thread_create(:in_s3, &method(:run))
end