Class: Fluent::Plugin::S3Input
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::S3Input
show all
- Defined in:
- lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb
Defined Under Namespace
Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor
Constant Summary
collapse
- DEFAULT_PARSE_TYPE =
"none"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
18
19
20
21
|
# File 'lib/fluent/plugin/in_s3.rb', line 18
def initialize
super
= nil
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
116
117
118
|
# File 'lib/fluent/plugin/in_s3.rb', line 116
def bucket
@bucket
end
|
Instance Method Details
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
# File 'lib/fluent/plugin/in_s3.rb', line 118
def configure(conf)
super
if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) })
raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
if @sqs.endpoint && (@sqs.endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @sqs.endpoint.include?(e) })
raise Fluent::ConfigError, "sqs/endpoint parameter is not supported for SQS, use s3_region instead. This parameter is for SQS compatible services"
end
parser_config = conf.elements("parse").first
unless @sqs.queue_name
raise Fluent::ConfigError, "sqs/queue_name is required"
end
Aws.use_bundled_cert! if @use_bundled_cert
= .lookup(@store_as).new(log: log)
.configure(conf)
@parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE)
end
|
#shutdown ⇒ Object
166
167
168
169
|
# File 'lib/fluent/plugin/in_s3.rb', line 166
def shutdown
@running = false
super
end
|
#start ⇒ Object
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
# File 'lib/fluent/plugin/in_s3.rb', line 142
def start
super
s3_client = create_s3_client
log.debug("Succeeded to create S3 client")
@s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)
raise "#{@bucket.name} is not found." unless @bucket.exists?
check_apikeys if @check_apikey_on_start
sqs_client = create_sqs_client
log.debug("Succeeded to create SQS client")
response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id)
sqs_queue_url = response.queue_url
log.debug("Succeeded to get SQS queue URL")
@poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)
@running = true
thread_create(:in_s3, &method(:run))
end
|