Class: Pheme::QueuePoller

Inherits:
Object
  • Object
show all
Defined in:
lib/pheme/queue_poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}) ⇒ QueuePoller

Returns a new instance of QueuePoller.

Raises:

  • (ArgumentError)


5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/pheme/queue_poller.rb', line 5

def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {})
  raise ArgumentError, "must specify non-nil queue_url" unless queue_url.present?
  @queue_url = queue_url
  @queue_poller = Aws::SQS::QueuePoller.new(queue_url)
  @connection_pool_block = connection_pool_block
  @format = format
  @max_messages = max_messages
  @poller_configuration = {
    wait_time_seconds: 10, # amount of time a long polling receive call can wait for a mesage before receiving a empty response (which will trigger another polling request)
    idle_timeout: 20, # disconnects poller after 20 seconds of idle time
    skip_delete: true, # manually delete messages
  }.merge(poller_configuration || {})

  if max_messages
    queue_poller.before_request do |stats|
      throw :stop_polling if stats.received_message_count >= max_messages
    end
  end
end

Instance Attribute Details

#connection_pool_blockObject

Returns the value of attribute connection_pool_block.



3
4
5
# File 'lib/pheme/queue_poller.rb', line 3

def connection_pool_block
  @connection_pool_block
end

#formatObject

Returns the value of attribute format.



3
4
5
# File 'lib/pheme/queue_poller.rb', line 3

def format
  @format
end

#max_messagesObject

Returns the value of attribute max_messages.



3
4
5
# File 'lib/pheme/queue_poller.rb', line 3

def max_messages
  @max_messages
end

#poller_configurationObject

Returns the value of attribute poller_configuration.



3
4
5
# File 'lib/pheme/queue_poller.rb', line 3

def poller_configuration
  @poller_configuration
end

#queue_pollerObject

Returns the value of attribute queue_poller.



3
4
5
# File 'lib/pheme/queue_poller.rb', line 3

def queue_poller
  @queue_poller
end

#queue_urlObject

Returns the value of attribute queue_url.



3
4
5
# File 'lib/pheme/queue_poller.rb', line 3

def queue_url
  @queue_url
end

Instance Method Details

#handle(message) ⇒ Object

Raises:

  • (NotImplementedError)


66
67
68
# File 'lib/pheme/queue_poller.rb', line 66

def handle(message)
  raise NotImplementedError
end

#parse_csv(message_contents) ⇒ Object



56
57
58
59
# File 'lib/pheme/queue_poller.rb', line 56

def parse_csv(message_contents)
  parsed_body = SmarterCSV.process(StringIO.new(message_contents))
  parsed_body.map{ |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) }
end

#parse_json(message_contents) ⇒ Object



61
62
63
64
# File 'lib/pheme/queue_poller.rb', line 61

def parse_json(message_contents)
  parsed_body = JSON.parse(message_contents)
  RecursiveOpenStruct.new({wrapper: parsed_body}, recurse_over_arrays: true).wrapper
end

#parse_message(message) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pheme/queue_poller.rb', line 43

def parse_message(message)
  Pheme.log(:info, "Received JSON payload: #{message.body}")
  body = JSON.parse(message.body)
  case format
  when :csv
    parse_csv(body['Message'])
  when :json
    parse_json(body['Message'])
  else
    raise ArgumentError.new("Unknown format #{format}. Valid formats: :csv, :json.")
  end
end

#pollObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/pheme/queue_poller.rb', line 25

def poll
  Pheme.log(:info, "Long-polling for messages on #{queue_url}")
  with_optional_connection_pool_block do
    queue_poller.poll(poller_configuration) do |message|
      data = parse_message(message)
      begin
        handle(data)
        queue_poller.delete_message(message)
      rescue => e
        Pheme.log(:error, "Exception: #{e.inspect}")
        Pheme.log(:error, e.backtrace.join("\n"))
        Pheme.rollbar(e, "#{self.class} failed to process message", data)
      end
    end
  end
  Pheme.log(:info, "Finished long-polling after #{@poller_configuration[:idle_timeout]} seconds.")
end