Class: Fluent::Plugin::AmazonSNSOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::AmazonSNSOutput
- Defined in:
- lib/fluent/plugin/out_amazon_sns.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
- #get_topics ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 32 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super @topic_generator = case when @topic_name ->(tag, record){ @topic_name } when @topic_map_key ->(tag, record){ record[@topic_map_key] } when @topic_map_tag ->(tag, record){ tag.gsub(/^#{@remove_tag_prefix}(\.)?/, '') } else raise Fluent::ConfigError, "no one way specified to decide target" end end |
#format(tag, time, record) ⇒ Object
66 67 68 69 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 66 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary? ⇒ Boolean
71 72 73 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 71 def formatted_to_msgpack_binary? true end |
#get_topics ⇒ Object
93 94 95 96 97 98 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 93 def get_topics @sns.topics.inject({}) do |product, topic| product[topic.arn.split(/:/).last] = topic product end end |
#multi_workers_ready? ⇒ Boolean
75 76 77 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 75 def multi_workers_ready? true end |
#shutdown ⇒ Object
62 63 64 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 62 def shutdown super end |
#start ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 48 def start super = {} [:access_key_id, :secret_access_key, :region].each do |key| [key] = instance_variable_get "@aws_#{key}" end [:http_proxy] = @aws_proxy_uri sns_client = Aws::SNS::Client.new() @sns = Aws::SNS::Resource.new(client: sns_client) @topics = get_topics end |
#write(chunk) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/fluent/plugin/out_amazon_sns.rb', line 79 def write(chunk) chunk.msgpack_each do |tag, time, record| record["time"] = Time.at(time).localtime subject = record.delete(@subject_key) || @subject || 'Fluent-Notification' topic = @topic_generator.call(tag, record) topic = topic.gsub(/\./, '-') if topic # SNS doesn't allow . if @topics[topic] @topics[topic].publish(message: record.to_json, subject: subject) else $log.error "Could not find topic '#{topic}' on SNS" end end end |