Class: Sqewer::Connection::SendBuffer

Inherits:
MessageBuffer show all
Defined in:
lib/sqewer/connection.rb

Overview

Saves the messages to send to the SQS queue

Constant Summary

Constants inherited from MessageBuffer

MessageBuffer::MAX_RECORDS

Instance Attribute Summary

Attributes inherited from MessageBuffer

#messages

Instance Method Summary collapse

Methods inherited from MessageBuffer

#initialize

Constructor Details

This class inherits a constructor from Sqewer::Connection::MessageBuffer

Instance Method Details

#each_batchObject

each_batch here also needs to ensure that the sum of payload lengths does not exceed 256kb



121
122
123
124
125
126
# File 'lib/sqewer/connection.rb', line 121

def each_batch
  regrouped = pack_into_batches(messages, weight_limit: 256 * 1024, batch_length_limit: 10) do |message|
    message.fetch(:message_body).bytesize
  end
  regrouped.each { |b| yield(b) }
end

#pack_into_batches(items, weight_limit:, batch_length_limit:) ⇒ Object

Optimizes a large list of items into batches of 10 items or less and with the sum of item lengths being below 256KB The block given to the method should return the weight of the given item



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/sqewer/connection.rb', line 131

def pack_into_batches(items, weight_limit:, batch_length_limit:)
  batches = []
  current_batch = []
  current_batch_weight = 0

  # Sort the items by their weight (length of the body).
  sorted_items = items.sort_by { |item| yield(item) }

  # and then take 1 item from the list and append it to the batch if it fits.
  # If it doesn't fit, no item after it will fit into this batch either (!)
  # which is how we can optimize
  sorted_items.each_with_index do |item|
    weight_of_this_item = yield(item)

    # First protect from invalid input
    if weight_of_this_item > weight_limit
      raise "#{item.inspect} was larger than the permissible limit"
    # The first limit is on the item count per batch -
    # if we are limited on that the batch needs to be closed
    elsif current_batch.length == batch_length_limit
      batches << current_batch
      current_batch = []
      current_batch_weight = 0
    # If placing this item in the batch would make the batch overweight
    # we need to close the batch, because all the items which come after
    # this one will be same size or larger. This is the key part of the optimization.
    elsif (current_batch_weight + weight_of_this_item) > weight_limit
      batches << current_batch
      current_batch = []
      current_batch_weight = 0
    end

    # and then append the item to the current batch
    current_batch_weight += weight_of_this_item
    current_batch << item
  end
  batches << current_batch unless current_batch.empty?

  batches
end

#send_message(message_body, **kwargs_for_send) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/sqewer/connection.rb', line 112

def send_message(message_body, **kwargs_for_send)
  # The "id" is only valid _within_ the request, and is used when
  # an error response refers to a specific ID within a batch
  m = {message_body: message_body, id: messages.length.to_s}
  m[:delay_seconds] = kwargs_for_send[:delay_seconds] if kwargs_for_send[:delay_seconds]
  messages << m
end