Class: Bricolage::SQSDataSource::DeleteMessageBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/bricolage/sqsdatasource.rb

Defined Under Namespace

Classes: Entry

Constant Summary collapse

BATCH_SIZE_MAX =

SQS system limit

10
MAX_RETRY_COUNT =
3

Instance Method Summary collapse

Constructor Details

#initialize(sqs_client, url, logger) ⇒ DeleteMessageBuffer

Returns a new instance of DeleteMessageBuffer.



214
215
216
217
218
219
# File 'lib/bricolage/sqsdatasource.rb', line 214

def initialize(sqs_client, url, logger)
  @sqs_client = sqs_client
  @url = url
  @logger = logger
  @buf = {}
end

Instance Method Details

#empty?Boolean

Returns:

  • (Boolean)


227
228
229
# File 'lib/bricolage/sqsdatasource.rb', line 227

def empty?
  @buf.empty?
end

#flush(now = Time.now) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/bricolage/sqsdatasource.rb', line 245

def flush(now = Time.now)
  entries = @buf.values.select {|ent| ent.issuable?(now) }
  return if entries.empty?
  @logger.info "flushing async delete requests"
  entries.each_slice(BATCH_SIZE_MAX) do |ents|
    res = @sqs_client.delete_message_batch(queue_url: @url, entries: ents.map(&:request_params))
    @logger.info "DeleteMessageBatch executed: #{res.successful.size} succeeded, #{res.failed.size} failed"
    issued_time = Time.now
    res.successful.each do |s|
      @buf.delete s.id
    end
    res.failed.each do |f|
      ent = @buf[f.id]
      unless ent
        @logger.error "[BUG] no corrensponding DeleteMessageBuffer entry: id=#{f.id}"
        next
      end
      ent.failed!(issued_time)
      if ent.too_many_failure?
        @logger.warn "DeleteMessage failure count exceeded the limit; give up: message_id=#{ent.message.message_id}, receipt_handle=#{ent.message.receipt_handle}"
        @buf.delete f.id
        next
      end
      @logger.info "DeleteMessageBatch partially failed (#{ent.n_failure} times): sender_fault=#{f.sender_fault}, code=#{f.code}, message=#{f.message}"
    end
  end
end

#flush_forceObject

Flushes all delayed delete requests, including pending requests



240
241
242
243
# File 'lib/bricolage/sqsdatasource.rb', line 240

def flush_force
  # retry continues in only 2m, now+1h must be after than all @next_issue_time
  flush(Time.now + 3600)
end

#full?Boolean

Returns:

  • (Boolean)


231
232
233
# File 'lib/bricolage/sqsdatasource.rb', line 231

def full?
  @buf.size >= BATCH_SIZE_MAX
end

#put(msg) ⇒ Object



221
222
223
224
225
# File 'lib/bricolage/sqsdatasource.rb', line 221

def put(msg)
  ent = Entry.new(msg)
  @buf[ent.id] = ent
  flush if full?
end

#sizeObject



235
236
237
# File 'lib/bricolage/sqsdatasource.rb', line 235

def size
  @buf.size
end