Class: Sqewer::Connection
- Inherits:
-
Object
- Object
- Sqewer::Connection
- Defined in:
- lib/sqewer/connection.rb
Overview
Adapter that handles communication with a specific queue. In the future this could be switched to a Google PubSub queue, or to AMQP, or to any other queue with guaranteed re-delivery without ACK. The required queue semantics are very simple:
- no message should be deleted if the receiving client has not deleted it explicitly
- any execution that ends with an exception should cause the message to be re-enqueued
Direct Known Subclasses
Defined Under Namespace
Classes: DeleteBuffer, Message, MessageBuffer, SendBuffer
Constant Summary collapse
- DEFAULT_TIMEOUT_SECONDS =
5
- BATCH_RECEIVE_SIZE =
10
- MAX_RANDOM_FAILURES_PER_CALL =
10
- MAX_RANDOM_RECEIVE_FAILURES =
sure to hit the max_elapsed_time of 900 seconds
100
- NotOurFaultAwsError =
Class.new(StandardError)
Class Method Summary collapse
-
.default ⇒ Object
Returns the default adapter, connected to the queue set via the
SQS_QUEUE_URL
environment variable.
Instance Method Summary collapse
-
#delete_message(message_identifier) ⇒ void
Deletes a message after it has been succesfully decoded and processed.
-
#delete_multiple_messages {|#delete_message| ... } ⇒ void
Deletes multiple messages after they all have been succesfully decoded and processed.
-
#initialize(queue_url) ⇒ Connection
constructor
Initializes a new adapter, with access to the SQS queue at the given URL.
-
#receive_messages ⇒ Array<Message>
Receive at most 10 messages from the queue, and return the array of Message objects.
-
#send_message(message_body, **kwargs_for_send) ⇒ void
Send a message to the backing queue.
-
#send_multiple_messages {|#send_message| ... } ⇒ void
Send multiple messages.
Constructor Details
#initialize(queue_url) ⇒ Connection
Initializes a new adapter, with access to the SQS queue at the given URL.
45 46 47 48 |
# File 'lib/sqewer/connection.rb', line 45 def initialize(queue_url) require 'aws-sdk-sqs' @queue_url = queue_url end |
Class Method Details
.default ⇒ Object
Returns the default adapter, connected to the queue set via the SQS_QUEUE_URL
environment variable. Switches to SQLite-backed local queue if the SQS_QUEUE_URL
is prefixed with 'sqlite3://'
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/sqewer/connection.rb', line 30 def self.default url_str = ENV.fetch('SQS_QUEUE_URL') uri = URI(url_str) if uri.scheme == 'sqlite3' Sqewer::LocalConnection.new(uri.to_s) else new(uri.to_s) end rescue KeyError => e raise "SQS_QUEUE_URL not set in the environment. This is the queue URL Sqewer uses by default." end |
Instance Method Details
#delete_message(message_identifier) ⇒ void
This method returns an undefined value.
Deletes a message after it has been succesfully decoded and processed
121 122 123 |
# File 'lib/sqewer/connection.rb', line 121 def () {|via| via.() } end |
#delete_multiple_messages {|#delete_message| ... } ⇒ void
This method returns an undefined value.
Deletes multiple messages after they all have been succesfully decoded and processed.
129 130 131 132 133 134 |
# File 'lib/sqewer/connection.rb', line 129 def buffer = DeleteBuffer.new yield(buffer) buffer.each_batch {|batch| handle_batch_with_retries(:delete_message_batch, batch) } end |
#receive_messages ⇒ Array<Message>
Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to continue.
56 57 58 59 60 61 62 |
# File 'lib/sqewer/connection.rb', line 56 def Retriable.retriable on: Seahorse::Client::NetworkingError, tries: MAX_RANDOM_RECEIVE_FAILURES do response = client.(queue_url: @queue_url, wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: BATCH_RECEIVE_SIZE) response..map {|| Message.new(.receipt_handle, .body, .attributes) } end end |
#send_message(message_body, **kwargs_for_send) ⇒ void
This method returns an undefined value.
Send a message to the backing queue
Passes the arguments to the AWS SDK.
70 71 72 |
# File 'lib/sqewer/connection.rb', line 70 def (, **kwargs_for_send) {|via| via.(, **kwargs_for_send) } end |
#send_multiple_messages {|#send_message| ... } ⇒ void
This method returns an undefined value.
Send multiple messages. If any messages fail to send, an exception will be raised.
110 111 112 113 114 115 |
# File 'lib/sqewer/connection.rb', line 110 def buffer = SendBuffer.new yield(buffer) buffer.each_batch {|batch| handle_batch_with_retries(:send_message_batch, batch) } end |