Class: Fluent::Plugin::SQSInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::SQSInput
- Defined in:
- lib/fluent/plugin/in_sqs.rb
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
- #parse_message(message) ⇒ Object
- #queue ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#client ⇒ Object
47 48 49 |
# File 'lib/fluent/plugin/in_sqs.rb', line 47 def client @client ||= Aws::SQS::Client.new(stub_responses: @stub_responses) end |
#configure(conf) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/fluent/plugin/in_sqs.rb', line 23 def configure(conf) super if @tag == nil raise Fluent::ConfigError, "tag configuration key is mandatory" end if @sqs_url == nil raise Fluent::ConfigError, "sqs_url configuration key is mandatory" end Aws.config = { access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, region: @region } end |
#parse_message(message) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/in_sqs.rb', line 76 def () record = { 'body' => .body.to_s, 'receipt_handle' => .receipt_handle.to_s, 'message_id' => ..to_s, 'md5_of_body' => .md5_of_body.to_s, 'queue_url' => .queue_url.to_s } if @attribute_name_to_extract.to_s.strip.length > 0 record[@attribute_name_to_extract] = .attributes[@attribute_name_to_extract].to_s end record end |
#queue ⇒ Object
51 52 53 |
# File 'lib/fluent/plugin/in_sqs.rb', line 51 def queue @queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url) end |
#run ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/fluent/plugin/in_sqs.rb', line 59 def run queue.( max_number_of_messages: @max_number_of_messages, wait_time_seconds: @wait_time_seconds, visibility_timeout: @visibility_timeout ).each do || record = () .delete if @delete_message router.emit(@tag, Fluent::Engine.now, record) end rescue => ex log.error 'failed to emit or receive', error: ex.to_s, error_class: ex.class log.warn_backtrace ex.backtrace end |
#shutdown ⇒ Object
55 56 57 |
# File 'lib/fluent/plugin/in_sqs.rb', line 55 def shutdown super end |
#start ⇒ Object
41 42 43 44 45 |
# File 'lib/fluent/plugin/in_sqs.rb', line 41 def start super timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run)) end |