Class: Sqewer::Connection::SendBuffer
- Inherits:
-
MessageBuffer
- Object
- Struct
- MessageBuffer
- Sqewer::Connection::SendBuffer
- Defined in:
- lib/sqewer/connection.rb
Overview
Saves the messages to send to the SQS queue
Constant Summary
Constants inherited from MessageBuffer
Instance Attribute Summary
Attributes inherited from MessageBuffer
Instance Method Summary collapse
-
#each_batch ⇒ Object
each_batch here also needs to ensure that the sum of payload lengths does not exceed 256kb.
-
#pack_into_batches(items, weight_limit:, batch_length_limit:) ⇒ Object
Optimizes a large list of items into batches of 10 items or less and with the sum of item lengths being below 256KB The block given to the method should return the weight of the given item.
- #send_message(message_body, **kwargs_for_send) ⇒ Object
Methods inherited from MessageBuffer
Constructor Details
This class inherits a constructor from Sqewer::Connection::MessageBuffer
Instance Method Details
#each_batch ⇒ Object
each_batch here also needs to ensure that the sum of payload lengths does not exceed 256kb
121 122 123 124 125 126 |
# File 'lib/sqewer/connection.rb', line 121 def each_batch regrouped = pack_into_batches(, weight_limit: 256 * 1024, batch_length_limit: 10) do || .fetch(:message_body).bytesize end regrouped.each { |b| yield(b) } end |
#pack_into_batches(items, weight_limit:, batch_length_limit:) ⇒ Object
Optimizes a large list of items into batches of 10 items or less and with the sum of item lengths being below 256KB The block given to the method should return the weight of the given item
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/sqewer/connection.rb', line 131 def pack_into_batches(items, weight_limit:, batch_length_limit:) batches = [] current_batch = [] current_batch_weight = 0 # Sort the items by their weight (length of the body). sorted_items = items.sort_by { |item| yield(item) } # and then take 1 item from the list and append it to the batch if it fits. # If it doesn't fit, no item after it will fit into this batch either (!) # which is how we can optimize sorted_items.each_with_index do |item| weight_of_this_item = yield(item) # First protect from invalid input if weight_of_this_item > weight_limit raise "#{item.inspect} was larger than the permissible limit" # The first limit is on the item count per batch - # if we are limited on that the batch needs to be closed elsif current_batch.length == batch_length_limit batches << current_batch current_batch = [] current_batch_weight = 0 # If placing this item in the batch would make the batch overweight # we need to close the batch, because all the items which come after # this one will be same size or larger. This is the key part of the optimization. elsif (current_batch_weight + weight_of_this_item) > weight_limit batches << current_batch current_batch = [] current_batch_weight = 0 end # and then append the item to the current batch current_batch_weight += weight_of_this_item current_batch << item end batches << current_batch unless current_batch.empty? batches end |
#send_message(message_body, **kwargs_for_send) ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/sqewer/connection.rb', line 112 def (, **kwargs_for_send) # The "id" is only valid _within_ the request, and is used when # an error response refers to a specific ID within a batch m = {message_body: , id: .length.to_s} m[:delay_seconds] = kwargs_for_send[:delay_seconds] if kwargs_for_send[:delay_seconds] << m end |