Class: Sqewer::ConnectionMessagebox

Inherits:
Object
  • Object
show all
Defined in:
lib/sqewer/connection_messagebox.rb

Overview

A recorder for send_message and delete_message calls. Will buffer those calls as if it were a Connection, and then execute them within a synchronized mutex lock, to prevent concurrent submits to the Connection object, and, consequently, concurrent calls to the SQS client. We also buffer calls to the connection in the messagebox to implement simple batching of message submits and deletes. For example, imagine your job does this:

context.submit!(dependent_job)
context.submit!(another_dependent_job)
# ...100 lines further on
context.submit!(yet_another_job)

you would be doing 3 separate SQS requests and spending more money. Whereas a messagebox will be able to buffer those sends and pack them in batches, consequently performing less requests

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ ConnectionMessagebox

Returns a new instance of ConnectionMessagebox.



20
21
22
23
24
25
# File 'lib/sqewer/connection_messagebox.rb', line 20

def initialize(connection)
  @connection = connection
  @deletes = []
  @sends = []
  @mux = Mutex.new
end

Instance Method Details

#delete_message(message_identifier) ⇒ Object

Saves the given identifier to be deleted from the queue. If there are more deletes in the same flush, they will be batched using batched deletes.

See Also:

  • Sqewer::ConnectionMessagebox.{Connection{Connection#delete_message}


41
42
43
44
45
# File 'lib/sqewer/connection_messagebox.rb', line 41

def delete_message(message_identifier)
  @mux.synchronize {
    @deletes << message_identifier
  }
end

#flush!Object

Flushes all the accumulated commands to the queue connection. First the message sends are going to be flushed, then the message deletes. All of those will use batching where possible.



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/sqewer/connection_messagebox.rb', line 50

def flush!
  @mux.synchronize do
    @connection.send_multiple_messages do | buffer |
      @sends.each { |body, kwargs| buffer.send_message(body, **kwargs) }
    end

    @connection.delete_multiple_messages do | buffer |
      @deletes.each { |id| buffer.delete_message(id) }
    end
    (@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear }
  end
end

#send_message(message_body, **kwargs_for_send) ⇒ Object

Saves the given body and the keyword arguments (such as delay_seconds) to be sent into the queue. If there are more sends in the same flush, they will be batched using batched deletes.G

See Also:

  • Sqewer::ConnectionMessagebox.{Connection{Connection#send_message}


31
32
33
34
35
# File 'lib/sqewer/connection_messagebox.rb', line 31

def send_message(message_body, **kwargs_for_send)
  @mux.synchronize {
    @sends << [message_body, kwargs_for_send]
  }
end