Class: Fluent::S3Input

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb

Defined Under Namespace

Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3Input

Returns a new instance of S3Input.



8
9
10
11
12
13
14
15
16
17
# File 'lib/fluent/plugin/in_s3.rb', line 8

def initialize
  super
  require 'aws-sdk-resources'
  require 'zlib'
  require 'time'
  require 'tempfile'
  require 'cgi/util'

  @extractor = nil
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



94
95
96
# File 'lib/fluent/plugin/in_s3.rb', line 94

def bucket
  @bucket
end

Instance Method Details

#configure(conf) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/in_s3.rb', line 96

def configure(conf)
  super

  unless @sqs.queue_name
    raise ConfigError, "sqs/queue_name is required"
  end

  @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log)
  @extractor.configure(conf)

  @parser = Plugin.new_parser(@format)
  @parser.configure(conf)
end

#desc(description) ⇒ Object



22
23
# File 'lib/fluent/plugin/in_s3.rb', line 22

def desc(description)
end

#shutdownObject



135
136
137
138
139
# File 'lib/fluent/plugin/in_s3.rb', line 135

def shutdown
  @running = false
  @thread.join
  super
end

#startObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_s3.rb', line 110

def start
  super

  s3_client = create_s3_client

  log.debug("Succeeded to create S3 client")
  @s3 = Aws::S3::Resource.new(:client => s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  raise "#{@bucket.name} is not found." unless @bucket.exists?

  check_apikeys if @check_apikey_on_start

  sqs_client = create_sqs_client
  log.debug("Succeeded to create SQS client")
  response = sqs_client.get_queue_url(queue_name: @sqs.queue_name)
  sqs_queue_url = response.queue_url
  log.debug("Succeeded to get SQS queue URL")

  @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)

  @running = true
  @thread = Thread.new(&method(:run))
end