Class: Fluent::AmazonSNSOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::AmazonSNSOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_amazon_sns.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #get_topics ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 26 def configure(conf) super @topic_generator = case when @topic_name ->(tag, record){ @topic_name } when @topic_map_key ->(tag, record){ record[@topic_map_key] } when @topic_map_tag ->(tag, record){ tag.gsub(/^#{@remove_tag_prefix}(\.)?/, '') } else raise Fluent::ConfigError, "no one way specified to decide target" end = {} [:access_key_id, :secret_access_key, :region, :proxy_uri].each do |key| [key] = instance_variable_get "@aws_#{key}" end AWS.config() end |
#format(tag, time, record) ⇒ Object
58 59 60 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 58 def format(tag, time, record) [tag, time, record].to_msgpack end |
#get_topics ⇒ Object
76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 76 def get_topics() @sns.topics.inject({}) do |product, topic| product[topic.name] = topic product end end |
#shutdown ⇒ Object
54 55 56 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 54 def shutdown super end |
#start ⇒ Object
48 49 50 51 52 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 48 def start super @sns = AWS::SNS.new @topics = get_topics end |
#write(chunk) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 62 def write(chunk) chunk.msgpack_each do |tag, time, record| record["time"] = Time.at(time).localtime subject = record.delete(@subject_key) || @subject || 'Fluent-Notification' topic = @topic_generator.call(tag, record) topic = topic.gsub(/\./, '-') if topic # SNS doesn't allow . if @topics[topic] @topics[topic].publish(record.to_json, subject: subject) else $log.error "Could not find topic '#{topic}' on SNS" end end end |