Class: Fluent::S3Input
- Inherits:
-
Input
- Object
- Input
- Fluent::S3Input
show all
- Defined in:
- lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb
Defined Under Namespace
Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of S3Input.
7
8
9
10
11
12
13
14
15
|
# File 'lib/fluent/plugin/in_s3.rb', line 7
def initialize
super
require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
@extractor = nil
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
92
93
94
|
# File 'lib/fluent/plugin/in_s3.rb', line 92
def bucket
@bucket
end
|
Instance Method Details
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/fluent/plugin/in_s3.rb', line 94
def configure(conf)
super
unless @sqs.queue_name
raise ConfigError, "sqs/queue_name is required"
end
@extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log)
@extractor.configure(conf)
@parser = Plugin.new_parser(@format)
@parser.configure(conf)
end
|
#desc(description) ⇒ Object
20
21
|
# File 'lib/fluent/plugin/in_s3.rb', line 20
def desc(description)
end
|
#shutdown ⇒ Object
129
130
131
132
133
|
# File 'lib/fluent/plugin/in_s3.rb', line 129
def shutdown
@running = false
@thread.join
super
end
|
#start ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/fluent/plugin/in_s3.rb', line 108
def start
super
s3_client = create_s3_client
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
raise "#{@bucket.name} is not found." unless @bucket.exists?
check_apikeys if @check_apikey_on_start
sqs_client = create_sqs_client
response = sqs_client.get_queue_url(queue_name: @sqs.queue_name)
sqs_queue_url = response.queue_url
@poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)
@running = true
@thread = Thread.new(&method(:run))
end
|