Class: Fluent::SQSInput
- Inherits:
-
Input
- Object
- Input
- Fluent::SQSInput
- Defined in:
- lib/fluent/plugin/in_sqs.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ SQSInput
constructor
A new instance of SQSInput.
- #run_periodic ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ SQSInput
Returns a new instance of SQSInput.
12 13 14 |
# File 'lib/fluent/plugin/in_sqs.rb', line 12 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 |
# File 'lib/fluent/plugin/in_sqs.rb', line 25 def configure(conf) super end |
#run_periodic ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/fluent/plugin/in_sqs.rb', line 51 def run_periodic until @finished begin sleep @receive_interval @queue.( :limit => @max_number_of_messages, :wait_time_seconds => @wait_time_seconds ) do || record = {} record['body'] = .body.to_s record['handle'] = .handle.to_s record['id'] = .id.to_s record['md5'] = .md5.to_s record['url'] = .queue.url.to_s record['sender_id'] = .sender_id.to_s router.emit(@tag, Time.now.to_i, record) end rescue $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s $log.warn_backtrace $!.backtrace end end end |
#shutdown ⇒ Object
44 45 46 47 48 49 |
# File 'lib/fluent/plugin/in_sqs.rb', line 44 def shutdown super @finished = true @thread.join end |
#start ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/in_sqs.rb', line 30 def start super AWS.config( :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key ) @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url] @finished = false @thread = Thread.new(&method(:run_periodic)) end |