Class: Pheme::QueuePoller
- Inherits:
-
Object
- Object
- Pheme::QueuePoller
- Defined in:
- lib/pheme/queue_poller.rb
Instance Attribute Summary collapse
-
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
-
#format ⇒ Object
Returns the value of attribute format.
-
#max_messages ⇒ Object
Returns the value of attribute max_messages.
-
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
-
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
-
#queue_url ⇒ Object
Returns the value of attribute queue_url.
Instance Method Summary collapse
- #handle(message) ⇒ Object
-
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}) ⇒ QueuePoller
constructor
A new instance of QueuePoller.
- #parse_csv(message_contents) ⇒ Object
- #parse_json(message_contents) ⇒ Object
- #parse_message(message) ⇒ Object
- #poll ⇒ Object
Constructor Details
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}) ⇒ QueuePoller
Returns a new instance of QueuePoller.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/pheme/queue_poller.rb', line 5 def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}) raise ArgumentError, "must specify non-nil queue_url" unless queue_url.present? @queue_url = queue_url @queue_poller = Aws::SQS::QueuePoller.new(queue_url) @connection_pool_block = connection_pool_block @format = format = @poller_configuration = { wait_time_seconds: 10, # amount of time a long polling receive call can wait for a mesage before receiving a empty response (which will trigger another polling request) idle_timeout: 20, # disconnects poller after 20 seconds of idle time skip_delete: true, # manually delete messages }.merge(poller_configuration || {}) if queue_poller.before_request do |stats| throw :stop_polling if stats. >= end end end |
Instance Attribute Details
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
3 4 5 |
# File 'lib/pheme/queue_poller.rb', line 3 def connection_pool_block @connection_pool_block end |
#format ⇒ Object
Returns the value of attribute format.
3 4 5 |
# File 'lib/pheme/queue_poller.rb', line 3 def format @format end |
#max_messages ⇒ Object
Returns the value of attribute max_messages.
3 4 5 |
# File 'lib/pheme/queue_poller.rb', line 3 def end |
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
3 4 5 |
# File 'lib/pheme/queue_poller.rb', line 3 def poller_configuration @poller_configuration end |
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
3 4 5 |
# File 'lib/pheme/queue_poller.rb', line 3 def queue_poller @queue_poller end |
#queue_url ⇒ Object
Returns the value of attribute queue_url.
3 4 5 |
# File 'lib/pheme/queue_poller.rb', line 3 def queue_url @queue_url end |
Instance Method Details
#handle(message) ⇒ Object
66 67 68 |
# File 'lib/pheme/queue_poller.rb', line 66 def handle() raise NotImplementedError end |
#parse_csv(message_contents) ⇒ Object
56 57 58 59 |
# File 'lib/pheme/queue_poller.rb', line 56 def parse_csv() parsed_body = SmarterCSV.process(StringIO.new()) parsed_body.map{ |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) } end |
#parse_json(message_contents) ⇒ Object
61 62 63 64 |
# File 'lib/pheme/queue_poller.rb', line 61 def parse_json() parsed_body = JSON.parse() RecursiveOpenStruct.new({wrapper: parsed_body}, recurse_over_arrays: true).wrapper end |
#parse_message(message) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pheme/queue_poller.rb', line 43 def () Pheme.log(:info, "Received JSON payload: #{message.body}") body = JSON.parse(.body) case format when :csv parse_csv(body['Message']) when :json parse_json(body['Message']) else raise ArgumentError.new("Unknown format #{format}. Valid formats: :csv, :json.") end end |
#poll ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/pheme/queue_poller.rb', line 25 def poll Pheme.log(:info, "Long-polling for messages on #{queue_url}") with_optional_connection_pool_block do queue_poller.poll(poller_configuration) do || data = () begin handle(data) queue_poller.() rescue => e Pheme.log(:error, "Exception: #{e.inspect}") Pheme.log(:error, e.backtrace.join("\n")) Pheme.(e, "#{self.class} failed to process message", data) end end end Pheme.log(:info, "Finished long-polling after #{@poller_configuration[:idle_timeout]} seconds.") end |