Class: Fluent::SQSInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sqs.rb

Instance Method Summary collapse

Constructor Details

#initializeSQSInput

Returns a new instance of SQSInput.



12
13
14
# File 'lib/fluent/plugin/in_sqs.rb', line 12

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



25
26
27
28
# File 'lib/fluent/plugin/in_sqs.rb', line 25

def configure(conf)
  super

end

#run_periodicObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/in_sqs.rb', line 51

def run_periodic
  until @finished
    begin
      sleep @receive_interval
      @queue.receive_message(
        :limit => @max_number_of_messages,
        :wait_time_seconds => @wait_time_seconds
      ) do |message|
        record = {}
        record['body'] = message.body.to_s
        record['handle'] = message.handle.to_s
        record['id'] = message.id.to_s
        record['md5'] = message.md5.to_s
        record['url'] = message.queue.url.to_s
        record['sender_id'] = message.sender_id.to_s

        router.emit(@tag, Time.now.to_i, record)
      end
    rescue
      $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s
      $log.warn_backtrace $!.backtrace
    end
  end
end

#shutdownObject



44
45
46
47
48
49
# File 'lib/fluent/plugin/in_sqs.rb', line 44

def shutdown
  super

  @finished = true
  @thread.join
end

#startObject



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/in_sqs.rb', line 30

def start
  super

  AWS.config(
    :access_key_id => @aws_key_id,
    :secret_access_key => @aws_sec_key
    )

  @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url]

  @finished = false
  @thread = Thread.new(&method(:run_periodic))
end