Class: Fluent::SQSInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sqs.rb

Instance Method Summary collapse

Instance Method Details

#clientObject



39
40
41
# File 'lib/fluent/plugin/in_sqs.rb', line 39

def client
  @client ||= Aws::SQS::Client.new(stub_responses: @stub_responses)
end

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/fluent/plugin/in_sqs.rb', line 22

def configure(conf)
  super

  Aws.config = {
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region
  }
end

#queueObject



43
44
45
# File 'lib/fluent/plugin/in_sqs.rb', line 43

def queue
  @queue ||= Aws::SQS::Resource.new(client: client).queue(@sqs_url)
end

#runObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/in_sqs.rb', line 61

def run
  queue.receive_messages(
    max_number_of_messages: @max_number_of_messages,
    wait_time_seconds: @wait_time_seconds,
    visibility_timeout: @visibility_timeout
  ).each do |message|
    record = parse_message(message)

    message.delete if @delete_message

    router.emit(@tag, Time.now.to_i, record)
  end
rescue
  $log.error 'failed to emit or receive', error: $ERROR_INFO.to_s, error_class: $ERROR_INFO.class.to_s
  $log.warn_backtrace $ERROR_INFO.backtrace
end

#run_periodicObject



54
55
56
57
58
59
# File 'lib/fluent/plugin/in_sqs.rb', line 54

def run_periodic
  until @finished
    sleep @receive_interval
    run
  end
end

#shutdownObject



47
48
49
50
51
52
# File 'lib/fluent/plugin/in_sqs.rb', line 47

def shutdown
  super

  @finished = true
  @thread.join
end

#startObject



32
33
34
35
36
37
# File 'lib/fluent/plugin/in_sqs.rb', line 32

def start
  super

  @finished = false
  @thread = Thread.new(&method(:run_periodic))
end