Class: SimpleSqs::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_sqs/worker.rb

Constant Summary collapse

VISIBILITY_TIMEOUT =

Allow another worker to get the message after this long. Suggest 25%-50% more than your average job

ENV.fetch('SIMPLE_SQS_VISABILITY_TIMEOUT', 15).to_i.freeze
DELETE_AFTER_MAX_RETRY =

Set to false if you’re using a redrive policy on your queue.

(ENV.fetch('DELETE_AFTER_MAX_RETRY', 'true').downcase == 'true').freeze
MAX_RETRY =

If DELETE_AFTER_MAX_RETRY enabled, delete after this many retrys

ENV.fetch('MAX_SQS_MESSAGE_RETRY', 5).to_i.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url:) ⇒ Worker

Returns a new instance of Worker.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/simple_sqs/worker.rb', line 14

def initialize queue_url:
  @queue_url = queue_url
  @client = Aws::SQS::Client.new(
    access_key_id: ENV.fetch('SIMPLE_SQS_PUBLIC_KEY'),
    secret_access_key: ENV.fetch('SIMPLE_SQS_SECRET_KEY'),
    region: ENV.fetch('SIMPLE_SQS_REGION')
  )
  @processor = SimpleSqs::Processor.new
  @poller = Aws::SQS::QueuePoller.new(@queue_url, {client: @client})
  @transaction = !ENV.key?('SIMPLE_SQS_NO_AR_TRANSACTION')
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



2
3
4
# File 'lib/simple_sqs/worker.rb', line 2

def client
  @client
end

#processorObject (readonly)

Returns the value of attribute processor.



2
3
4
# File 'lib/simple_sqs/worker.rb', line 2

def processor
  @processor
end

Instance Method Details

#receive_and_processObject



35
36
37
38
39
40
41
42
43
44
# File 'lib/simple_sqs/worker.rb', line 35

def receive_and_process
  @poller.before_request do |stats|
    trap "INT", -> (*args) { stop_polling }
    trap "TERM", -> (*args) { stop_polling }
  end

  @poller.poll(visibility_timeout: VISIBILITY_TIMEOUT) do |message|
    process(message)
  end
end

#startObject



30
31
32
33
# File 'lib/simple_sqs/worker.rb', line 30

def start
  logger.info 'Starting SQS polling'
  receive_and_process
end

#transaction?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/simple_sqs/worker.rb', line 26

def transaction?
  @transaction
end