Class: Fluent::Plugin::SQSInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sqs.rb

Instance Method Summary collapse

Instance Method Details

#clientObject



47
48
49
# File 'lib/fluent/plugin/in_sqs.rb', line 47

def client
  @client ||= Aws::SQS::Client.new(stub_responses: @stub_responses)
end

#configure(conf) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/in_sqs.rb', line 23

def configure(conf)
  super

  if @tag == nil
    raise Fluent::ConfigError, "tag configuration key is mandatory"
  end

  if @sqs_url == nil
    raise Fluent::ConfigError, "sqs_url configuration key is mandatory"
  end

  Aws.config = {
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region
  }
end

#parse_message(message) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/in_sqs.rb', line 76

def parse_message(message)
  record = {
    'body' => message.body.to_s,
    'receipt_handle' => message.receipt_handle.to_s,
    'message_id' => message.message_id.to_s,
    'md5_of_body' => message.md5_of_body.to_s,
    'queue_url' => message.queue_url.to_s
  }

  if @attribute_name_to_extract.to_s.strip.length > 0
    record[@attribute_name_to_extract] = message.attributes[@attribute_name_to_extract].to_s
  end
  
  record
end

#queueObject



51
52
53
# File 'lib/fluent/plugin/in_sqs.rb', line 51

def queue
  @queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url)
end

#runObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/in_sqs.rb', line 59

def run
  queue.receive_messages(
    max_number_of_messages: @max_number_of_messages,
    wait_time_seconds: @wait_time_seconds,
    visibility_timeout: @visibility_timeout
  ).each do |message|
    record = parse_message(message)

    message.delete if @delete_message

    router.emit(@tag, Fluent::Engine.now, record)
  end
rescue => ex
  log.error 'failed to emit or receive', error: ex.to_s, error_class: ex.class
  log.warn_backtrace ex.backtrace
end

#shutdownObject



55
56
57
# File 'lib/fluent/plugin/in_sqs.rb', line 55

def shutdown
  super
end

#startObject



41
42
43
44
45
# File 'lib/fluent/plugin/in_sqs.rb', line 41

def start
  super

  timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run))
end