Class: Sqewer::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/sqewer/connection.rb

Overview

Adapter that handles communication with a specific queue. In the future this could be switched to a Google PubSub queue, or to AMQP, or to any other queue with guaranteed re-delivery without ACK. The required queue semantics are very simple:

  • no message should be deleted if the receiving client has not deleted it explicitly
  • any execution that ends with an exception should cause the message to be re-enqueued

Direct Known Subclasses

LocalConnection

Defined Under Namespace

Classes: DeleteBuffer, Message, MessageBuffer, SendBuffer

Constant Summary collapse

DEFAULT_TIMEOUT_SECONDS =
5
BATCH_RECEIVE_SIZE =
10
MAX_RANDOM_FAILURES_PER_CALL =
10
MAX_RANDOM_RECEIVE_FAILURES =

sure to hit the max_elapsed_time of 900 seconds

100
NotOurFaultAwsError =
Class.new(StandardError)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url) ⇒ Connection

Initializes a new adapter, with access to the SQS queue at the given URL.

Parameters:

  • queue_url (String)

    the SQS queue URL (the URL can be copied from your AWS console)



45
46
47
48
# File 'lib/sqewer/connection.rb', line 45

def initialize(queue_url)
  require 'aws-sdk-sqs'
  @queue_url = queue_url
end

Class Method Details

.defaultObject

Returns the default adapter, connected to the queue set via the SQS_QUEUE_URL environment variable. Switches to SQLite-backed local queue if the SQS_QUEUE_URL is prefixed with 'sqlite3://'



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/sqewer/connection.rb', line 30

def self.default
  url_str = ENV.fetch('SQS_QUEUE_URL')
  uri = URI(url_str)
  if uri.scheme == 'sqlite3'
    Sqewer::LocalConnection.new(uri.to_s)
  else
    new(uri.to_s)
  end
rescue KeyError => e
  raise "SQS_QUEUE_URL not set in the environment. This is the queue URL Sqewer uses by default."
end

Instance Method Details

#delete_message(message_identifier) ⇒ void

This method returns an undefined value.

Deletes a message after it has been succesfully decoded and processed

Parameters:

  • message_identifier (String)

    the ID of the message to delete. For SQS, it is the receipt handle



121
122
123
# File 'lib/sqewer/connection.rb', line 121

def delete_message(message_identifier)
  delete_multiple_messages {|via| via.delete_message(message_identifier) }
end

#delete_multiple_messages {|#delete_message| ... } ⇒ void

This method returns an undefined value.

Deletes multiple messages after they all have been succesfully decoded and processed.

Yields:

  • (#delete_message)

    an object you can delete an individual message through



129
130
131
132
133
134
# File 'lib/sqewer/connection.rb', line 129

def delete_multiple_messages
  buffer = DeleteBuffer.new
  yield(buffer)

  buffer.each_batch {|batch| handle_batch_with_retries(:delete_message_batch, batch) }
end

#receive_messagesArray<Message>

Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to continue.

Returns:

  • (Array<Message>)

    an array of Message objects



56
57
58
59
60
61
62
# File 'lib/sqewer/connection.rb', line 56

def receive_messages
  Retriable.retriable on: Seahorse::Client::NetworkingError, tries: MAX_RANDOM_RECEIVE_FAILURES do
    response = client.receive_message(queue_url: @queue_url,
      wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: BATCH_RECEIVE_SIZE)
    response.messages.map {|message| Message.new(message.receipt_handle, message.body, message.attributes) }
  end
end

#send_message(message_body, **kwargs_for_send) ⇒ void

This method returns an undefined value.

Send a message to the backing queue

Passes the arguments to the AWS SDK.

Parameters:

  • message_body (String)

    the message to send

  • kwargs_for_send (Hash)

    additional arguments for the submit (such as delay_seconds).



70
71
72
# File 'lib/sqewer/connection.rb', line 70

def send_message(message_body, **kwargs_for_send)
  send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) }
end

#send_multiple_messages {|#send_message| ... } ⇒ void

This method returns an undefined value.

Send multiple messages. If any messages fail to send, an exception will be raised.

Yields:

  • (#send_message)

    the object you can send messages through (will be flushed at method return)



110
111
112
113
114
115
# File 'lib/sqewer/connection.rb', line 110

def send_multiple_messages
  buffer = SendBuffer.new
  yield(buffer)

  buffer.each_batch {|batch| handle_batch_with_retries(:send_message_batch, batch) }
end