Class: Fluent::S3Input

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb

Defined Under Namespace

Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3Input

Returns a new instance of S3Input.



7
8
9
10
11
12
13
14
15
# File 'lib/fluent/plugin/in_s3.rb', line 7

def initialize
  super
  require 'aws-sdk-resources'
  require 'zlib'
  require 'time'
  require 'tempfile'

  @extractor = nil
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



92
93
94
# File 'lib/fluent/plugin/in_s3.rb', line 92

def bucket
  @bucket
end

Instance Method Details

#configure(conf) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/in_s3.rb', line 94

def configure(conf)
  super

  unless @sqs.queue_name
    raise ConfigError, "sqs/queue_name is required"
  end

  @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log)
  @extractor.configure(conf)

  @parser = Plugin.new_parser(@format)
  @parser.configure(conf)
end

#desc(description) ⇒ Object



20
21
# File 'lib/fluent/plugin/in_s3.rb', line 20

def desc(description)
end

#shutdownObject



129
130
131
132
133
# File 'lib/fluent/plugin/in_s3.rb', line 129

def shutdown
  @running = false
  @thread.join
  super
end

#startObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/in_s3.rb', line 108

def start
  super

  s3_client = create_s3_client
  @s3 = Aws::S3::Resource.new(:client => s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  raise "#{@bucket.name} is not found." unless @bucket.exists?

  check_apikeys if @check_apikey_on_start

  sqs_client = create_sqs_client
  response = sqs_client.get_queue_url(queue_name: @sqs.queue_name)
  sqs_queue_url = response.queue_url

  @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)

  @running = true
  @thread = Thread.new(&method(:run))
end