Class: Fluent::ForwardAWSOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
HandleTagNameMixin
Defined in:
lib/fluent/plugin/out_forward_aws.rb

Instance Method Summary collapse

Constructor Details

#initializeForwardAWSOutput

Returns a new instance of ForwardAWSOutput.



35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_forward_aws.rb', line 35

def initialize
  super
  require 'aws-sdk'
  require 'zlib'
  require 'tempfile'
  require 'json'
  require 'securerandom'
end

Instance Method Details

#configure(conf) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_forward_aws.rb', line 44

def configure(conf)
  super
  if /^\s*$/ =~ @channel
    raise Fluent::ConfigError.new("channel is invalid. Exp=[\w]+")
  end
  unless @aws_access_key_id
    raise Fluent::ConfigError.new("aws_access_key_id is required")
  end
  unless @aws_secret_access_key
    raise Fluent::ConfigError.new("aws_secret_access_key is required")
  end
  unless @aws_s3_endpoint
    raise Fluent::ConfigError.new("aws_s3_endpoint is required")
  end
  unless @aws_s3_bucketname
    raise Fluent::ConfigError.new("aws_s3_bucketname is required")
  end
  unless @aws_sns_endpoint
    raise Fluent::ConfigError.new("aws_sns_endpoint is required")
  end
  unless @aws_sns_topic_arn
    raise Fluent::ConfigError.new("aws_sns_topic_arn is required")
  end
  unless(@aws_s3_skiptest)
    init_aws_s3_bucket()
    begin
      @bucket.objects[@aws_s3_testobjectname].write("TEST", :content_type => 'text/plain')
    rescue
      raise Fluent::ConfigError.new("Cannot put object to S3. Need s3:PutObject permission for resource arn:aws:s3:::" + @aws_s3_bucketname+"/*")
    end
  end
  unless(@aws_sns_skiptest)
    init_aws_sns_topic()
    begin
      notification = {
        "type" => "ping"
      }
      topic = @sns.topics[@aws_sns_topic_arn]
      topic.publish(JSON.pretty_generate(notification), :subject => @aws_sns_emailsubject)
    rescue
      raise Fluent::ConfigError.new("Cannot post notification to SNS. Need sns:Publish permission for resource " + @aws_sns_topic_arn)
    end
  end
end

#format(tag, time, record) ⇒ Object



100
101
102
# File 'lib/fluent/plugin/out_forward_aws.rb', line 100

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#format_stream(tag, es) ⇒ Object

Workaround for HandleTagNameMixin bug github.com/fluent/fluentd/pull/109



8
9
10
11
12
13
14
15
16
# File 'lib/fluent/plugin/out_forward_aws.rb', line 8

def format_stream(tag, es)
  out = ''
  es.each {|time,record|
    tag_temp = String.new(tag)
    filter_record(tag_temp, time, record)
    out << format(tag_temp, time, record)
  }
  out
end

#shutdownObject



95
96
97
98
# File 'lib/fluent/plugin/out_forward_aws.rb', line 95

def shutdown
  super
  # destroy
end

#startObject



89
90
91
92
93
# File 'lib/fluent/plugin/out_forward_aws.rb', line 89

def start
  super
  init_aws_s3_bucket()
  init_aws_sns_topic()
end

#write(chunk) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/out_forward_aws.rb', line 104

def write(chunk)
  #Add UUID to avoid name conflict
  #Current version supports only msgpack + gz, but add suffix for future extentions
  format = "msgpack"
  compression = "gzip"
  compression_suffix = "gz"
  s3path = "#{@channel}/#{chunk.key}/#{SecureRandom.uuid}.#{format}.#{compression_suffix}"
  
  # Create temp gzip file
  tmpFile = Tempfile.new("forward-aws-")
  writer = Zlib::GzipWriter.new(tmpFile)
  begin
    chunk.write_to(writer)
    writer.close
    $log.debug "Upload log object to S3 bucket #{@aws_s3_bucketname} path #{s3path}"
    @bucket.objects[s3path].write(Pathname.new(tmpFile.path), :content_type => 'application/x-gzip')
    notification = {
      "type"        => "out",
      "channel"     => @channel,
      "bucketname"  => @aws_s3_bucketname,
      "path"        => s3path,
      "format"      => format,
      "compression" => compression
    }
    topic = @sns.topics[@aws_sns_topic_arn]
    $log.debug "Posting notification #{notification}"
    topic.publish(JSON.pretty_generate(notification), :subject => @aws_sns_emailsubject)
  ensure
    writer.close rescue nil
    tmp.close(true) rescue nil
  end
end