Class: SqsPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/inputs/sqs/poller.rb

Constant Summary collapse

DEFAULT_OPTIONS =

queue poller options we want to set explicitly

{
    # we query one message at a time, so we can ensure correct error
    # handling if we can't download a single file correctly
    # (we will throw :skip_delete if download size isn't correct to allow
    # for processing the event again later, so make sure to set a reasonable
    # "DefaultVisibilityTimeout" for your queue so that there's enough time
    # to process the log files!)
    max_number_of_messages: 1,
    visibility_timeout: 10,
    # long polling; by default we use the queue's setting.
    # A good value is 10 seconds to to balance between a quick logstash
    # shutdown and fewer api calls.
    wait_time_seconds: nil,
    #attribute_names: ["All"], # Receive all available built-in message attributes.
    #message_attribute_names: ["All"], # Receive any custom message attributes.
    skip_delete: false,
}
BACKOFF_SLEEP_TIME =

only needed in “run_with_backoff”:

1
BACKOFF_FACTOR =
2
MAX_TIME_BEFORE_GIVING_UP =
60
EVENT_SOURCE =

only needed in “preprocess”:

'aws:s3'
EVENT_TYPE =
'ObjectCreated'

Instance Method Summary collapse

Constructor Details

#initialize(logger, stop_semaphore, poller_options = {}, client_options = {}, aws_options_hash) ⇒ SqsPoller

initialization and setup happens once, outside the threads:



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/logstash/inputs/sqs/poller.rb', line 38

def initialize(logger, stop_semaphore, poller_options = {}, client_options = {}, aws_options_hash)
  @logger = logger
  @stopped = stop_semaphore
  @queue = client_options[:sqs_queue]
  @from_sns = client_options[:from_sns]
  @max_processing_time = client_options[:max_processing_time]
  @sqs_delete_on_failure = client_options[:sqs_delete_on_failure]
  @options = DEFAULT_OPTIONS.merge(poller_options)
  begin
    @logger.info("Registering SQS input", :queue => @queue)
    sqs_client = Aws::SQS::Client.new(aws_options_hash)
    if uri?(@queue)
      queue_url = @queue
    else
      queue_url = sqs_client.get_queue_url({
        queue_name: @queue,
        queue_owner_aws_account_id: client_options[:queue_owner_aws_account_id]
      }).queue_url
    end

    @poller = Aws::SQS::QueuePoller.new(queue_url,
      :client => sqs_client
    )
    @logger.info("[#{Thread.current[:name]}] connected to queue.", :queue_url => queue_url)
  rescue Aws::SQS::Errors::ServiceError => e
    @logger.error("Cannot establish connection to Amazon SQS", :error => e)
    raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials"
  end
end

Instance Method Details

#runObject

this is called by every worker thread:



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/logstash/inputs/sqs/poller.rb', line 69

def run() # not (&block) - pass explicitly (use yield below)
  # per-thread timer to extend visibility if necessary
  extender = nil
  message_backoff = (@options[:visibility_timeout] * 95).to_f / 100.0
  new_visibility = 2 * @options[:visibility_timeout]

  # "shutdown handler":
  @poller.before_request do |_|
    if stop?
      # kill visibility extender thread if active?
      extender.kill if extender
      extender = nil
      @logger.warn('issuing :stop_polling on "stop?" signal', :queue => @queue)
      # this can take up to "Receive Message Wait Time" (of the sqs queue) seconds to be recognized
      throw :stop_polling
    end
  end

  run_with_backoff do
    message_count = 0 #PROFILING
    @poller.poll(@options) do |message|
      message_count += 1 #PROFILING
      message_t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING
      # auto-increase the timeout if processing takes too long:
      poller_thread = Thread.current
      extender = Thread.new do
        while new_visibility < @max_processing_time do

          sleep message_backoff
          begin
            @poller.change_message_visibility_timeout(message, new_visibility)
            @logger.warn("[#{Thread.current[:name]}] Extended visibility for a long running message", :visibility => new_visibility) if new_visibility > 600.0
            new_visibility += message_backoff
          rescue Aws::SQS::Errors::InvalidParameterValue => e
            @logger.debug("Extending visibility failed for message", :error => e)
          else
            @logger.debug("[#{Thread.current[:name]}] Extended visibility for message", :visibility => new_visibility) #PROFILING
          end
        end
        @logger.error("[#{Thread.current[:name]}] Maximum visibility reached! We will delete this message from queue!")
        @poller.delete_message(message) if @sqs_delete_on_failure
        poller_thread.kill
      end
      extender[:name] = "#{Thread.current[:name]}/extender" #PROFILING
      failed = false
      record_count = 0
      begin
        message_completed = catch(:skip_delete) do
          preprocess(message) do |record|
            record_count += 1
            extender[:name] = "#{Thread.current[:name]}/extender/#{record[:key]}" #PROFILING
            yield(record)
          end
        end
      rescue Exception => e
        @logger.warn("Error in poller loop", :error => e)
        @logger.warn("Backtrace:\n\t#{e.backtrace.join("\n\t")}")
        failed = true
      end
      message_t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING
      unless message_completed
        @logger.debug("[#{Thread.current[:name]}] uncompleted message at the end of poller loop. We´ll throw skip_delete.", :message_count => message_count)
        extender.run if extender
      end
      # at this time the extender has either fired or is obsolete
      extender.kill if extender
      extender = nil
      throw :skip_delete if failed or ! message_completed
      #@logger.info("[#{Thread.current[:name]}] completed message.", :message => message_count)
    end
  end
end