Class: Fluent::Plugin::S3Input
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::S3Input
- Defined in:
- lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb
Defined Under Namespace
Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor
Constant Summary collapse
- DEFAULT_PARSE_TYPE =
"none"
Instance Attribute Summary collapse
-
#bucket ⇒ Object
readonly
Returns the value of attribute bucket.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ S3Input
constructor
A new instance of S3Input.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ S3Input
Returns a new instance of S3Input.
18 19 20 21 |
# File 'lib/fluent/plugin/in_s3.rb', line 18 def initialize super @extractor = nil end |
Instance Attribute Details
#bucket ⇒ Object (readonly)
Returns the value of attribute bucket.
90 91 92 |
# File 'lib/fluent/plugin/in_s3.rb', line 90 def bucket @bucket end |
Instance Method Details
#configure(conf) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/in_s3.rb', line 92 def configure(conf) super parser_config = conf.elements("parse").first unless @sqs.queue_name raise Fluent::ConfigError, "sqs/queue_name is required" end Aws.use_bundled_cert! if @use_bundled_cert @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log) @extractor.configure(conf) @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE) end |
#shutdown ⇒ Object
132 133 134 135 |
# File 'lib/fluent/plugin/in_s3.rb', line 132 def shutdown @running = false super end |
#start ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/fluent/plugin/in_s3.rb', line 108 def start super s3_client = create_s3_client log.debug("Succeeded to create S3 client") @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) raise "#{@bucket.name} is not found." unless @bucket.exists? check_apikeys if @check_apikey_on_start sqs_client = create_sqs_client log.debug("Succeeded to create SQS client") response = sqs_client.get_queue_url(queue_name: @sqs.queue_name) sqs_queue_url = response.queue_url log.debug("Succeeded to get SQS queue URL") @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client) @running = true thread_create(:in_s3, &method(:run)) end |