33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/fluent/plugin/in_sqs_poll.rb', line 33
def poll
region = @sqs_url.split('.')[1]
Aws.config.update(region: region)
if @aws_access_key && @aws_secret_key
Aws.config.update(
credentials: Aws::Credentials.new(@aws_access_key, @aws_secret_key)
)
end
poller = Aws::SQS::QueuePoller.new(@sqs_url)
poller.before_request do |stats|
throw :stop_polling if @terminate
end
poller.poll(max_number_of_messages: @max_number_of_messages) do |messages|
messages.each do |msg|
begin
router.emit(@tag, Time.now.to_i,
{
'body' => msg.body,
'handle' => msg.receipt_handle,
'id' => msg.message_id,
'md5' => msg.md5_of_body,
'sqs_receive_count' => msg.attributes['ApproximateReceiveCount'],
}
)
rescue Exception => e
$log.error("SQS exception", error: e.to_s, error_class: e.class.to_s)
$log.warn_backtrace(e.backtrace)
end
end
throw :stop_polling if @terminate
end
end
|