Class: SimpleSqs::Worker
- Inherits:
-
Object
- Object
- SimpleSqs::Worker
- Defined in:
- lib/simple_sqs/worker.rb
Constant Summary collapse
- VISIBILITY_TIMEOUT =
Allow another worker to get the message after this long. Suggest 25%-50% more than your average job
ENV.fetch('SIMPLE_SQS_VISABILITY_TIMEOUT', 15).to_i.freeze
- DELETE_AFTER_MAX_RETRY =
Set to false if you’re using a redrive policy on your queue.
(ENV.fetch('DELETE_AFTER_MAX_RETRY', 'true').downcase == 'true').freeze
- MAX_RETRY =
If DELETE_AFTER_MAX_RETRY enabled, delete after this many retrys
ENV.fetch('MAX_SQS_MESSAGE_RETRY', 5).to_i.freeze
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#processor ⇒ Object
readonly
Returns the value of attribute processor.
Instance Method Summary collapse
-
#initialize(queue_url:) ⇒ Worker
constructor
A new instance of Worker.
- #receive_and_process ⇒ Object
- #start ⇒ Object
- #transaction? ⇒ Boolean
Constructor Details
#initialize(queue_url:) ⇒ Worker
Returns a new instance of Worker.
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/simple_sqs/worker.rb', line 14 def initialize queue_url: @queue_url = queue_url @client = Aws::SQS::Client.new( access_key_id: ENV.fetch('SIMPLE_SQS_PUBLIC_KEY'), secret_access_key: ENV.fetch('SIMPLE_SQS_SECRET_KEY'), region: ENV.fetch('SIMPLE_SQS_REGION') ) @processor = SimpleSqs::Processor.new @poller = Aws::SQS::QueuePoller.new(@queue_url, {client: @client}) @transaction = !ENV.key?('SIMPLE_SQS_NO_AR_TRANSACTION') end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
2 3 4 |
# File 'lib/simple_sqs/worker.rb', line 2 def client @client end |
#processor ⇒ Object (readonly)
Returns the value of attribute processor.
2 3 4 |
# File 'lib/simple_sqs/worker.rb', line 2 def processor @processor end |
Instance Method Details
#receive_and_process ⇒ Object
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/simple_sqs/worker.rb', line 35 def receive_and_process @poller.before_request do |stats| trap "INT", -> (*args) { stop_polling } trap "TERM", -> (*args) { stop_polling } end @poller.poll(visibility_timeout: VISIBILITY_TIMEOUT) do || process() end end |
#start ⇒ Object
30 31 32 33 |
# File 'lib/simple_sqs/worker.rb', line 30 def start logger.info 'Starting SQS polling' receive_and_process end |
#transaction? ⇒ Boolean
26 27 28 |
# File 'lib/simple_sqs/worker.rb', line 26 def transaction? @transaction end |