Class: Sqewer::LocalConnection

Inherits:
Connection show all
Defined in:
lib/sqewer/local_connection.rb

Constant Summary collapse

ASSUME_DEADLETTER_AFTER_N_DELIVERIES =
10

Constants inherited from Connection

Connection::BATCH_RECEIVE_SIZE, Connection::DEFAULT_TIMEOUT_SECONDS, Connection::MAX_RANDOM_FAILURES_PER_CALL, Connection::MAX_RANDOM_RECEIVE_FAILURES, Connection::NotOurFaultAwsError

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Connection

default, #delete_message, #send_message

Constructor Details

#initialize(queue_url_with_sqlite3_scheme) ⇒ LocalConnection

Returns a new instance of LocalConnection.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/sqewer/local_connection.rb', line 20

def initialize(queue_url_with_sqlite3_scheme)
  require 'sqlite3'
  @db_path, @queue_name = self.class.parse_queue_url(queue_url_with_sqlite3_scheme)
  with_db do |db|
    db.execute("CREATE TABLE IF NOT EXISTS sqewer_messages_v3 (
      id INTEGER PRIMARY KEY AUTOINCREMENT ,
      queue_name VARCHAR NOT NULL,
      receipt_handle VARCHAR NOT NULL,
      deliver_after_epoch INTEGER,
      times_delivered_so_far INTEGER DEFAULT 0,
      last_delivery_at_epoch INTEGER,
      visible BOOLEAN DEFAULT 't',
      sent_timestamp_millis INTEGER,
      message_body TEXT)"
    )
    db.execute("CREATE INDEX IF NOT EXISTS on_receipt_handle ON sqewer_messages_v3 (receipt_handle)")
    db.execute("CREATE INDEX IF NOT EXISTS on_queue_name ON sqewer_messages_v3 (queue_name)")
  end
rescue LoadError => e
  raise e, "You need the sqlite3 gem in your Gemfile to use LocalConnection. Add it to your Gemfile (`gem 'sqlite3'')"
end

Class Method Details

.parse_queue_url(queue_url_starting_with_sqlite3_proto) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/sqewer/local_connection.rb', line 5

def self.parse_queue_url(queue_url_starting_with_sqlite3_proto)
  uri = URI.parse(queue_url_starting_with_sqlite3_proto)

  unless uri.scheme == 'sqlite3'
    raise "The scheme of the SQS queue URL should be with `sqlite3' but was %s" % uri.scheme
  end

  path_components = ['/', uri.hostname, uri.path].reject(&:nil?).reject(&:empty?).join('/').squeeze('/')
  dbfile_path = File.expand_path(path_components)

  queue_name = Rack::Utils.parse_nested_query(uri.query).fetch('queue', 'sqewer_local')

  [dbfile_path, queue_name]
end

Instance Method Details

#delete_multiple_messages {|#delete_message| ... } ⇒ void

This method returns an undefined value.

Deletes multiple messages after they all have been succesfully decoded and processed.

Yields:

  • (#delete_message)

    an object you can delete an individual message through



62
63
64
65
66
# File 'lib/sqewer/local_connection.rb', line 62

def delete_multiple_messages
  buffer = DeleteBuffer.new
  yield(buffer)
  delete_persisted_messages(buffer.messages)
end

#receive_messagesArray<Message>

Returns an array of Message objects.

Returns:

  • (Array<Message>)

    an array of Message objects



43
44
45
46
47
# File 'lib/sqewer/local_connection.rb', line 43

def receive_messages
  load_receipt_handles_bodies_and_timestamps.map do |(receipt_handle, message_body, sent_timestamp_millis)|
    Message.new(receipt_handle, message_body, {'SentTimestamp' => sent_timestamp_millis})
  end
end

#send_multiple_messages {|#send_message| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (#send_message)

    the object you can send messages through (will be flushed at method return)



51
52
53
54
55
56
# File 'lib/sqewer/local_connection.rb', line 51

def send_multiple_messages
  buffer = SendBuffer.new
  yield(buffer)
  messages = buffer.messages
  persist_messages(messages)
end

#truncate!Object

Only gets used in tests



69
70
71
72
73
# File 'lib/sqewer/local_connection.rb', line 69

def truncate!
  with_db do |db|
    db.execute("DELETE FROM sqewer_messages_v3 WHERE queue_name = ?", @queue_name)
  end
end