Class: Pheme::QueuePoller
- Inherits:
-
Object
- Object
- Pheme::QueuePoller
- Includes:
- Compression
- Defined in:
- lib/pheme/queue_poller.rb
Instance Attribute Summary collapse
-
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
-
#format ⇒ Object
Returns the value of attribute format.
-
#max_messages ⇒ Object
Returns the value of attribute max_messages.
-
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
-
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
-
#queue_url ⇒ Object
Returns the value of attribute queue_url.
Instance Method Summary collapse
- #get_content(body) ⇒ Object
- #get_metadata(message_body) ⇒ Object
- #handle(_message, _metadata) ⇒ Object
-
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil) ⇒ QueuePoller
constructor
A new instance of QueuePoller.
-
#parse_body(queue_message) ⇒ Object
returns queue_message.body as hash, stores and parses get_content to body.
- #parse_csv(message_contents) ⇒ Object
- #parse_json(message_contents) ⇒ Object
- #parse_metadata(queue_message) ⇒ Object
- #poll ⇒ Object
Methods included from Compression
Constructor Details
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil) ⇒ QueuePoller
Returns a new instance of QueuePoller.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/pheme/queue_poller.rb', line 9 def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil) raise ArgumentError, "must specify non-nil queue_url" if queue_url.blank? @queue_url = queue_url @queue_poller = Aws::SQS::QueuePoller.new(queue_url, client: sqs_client) @connection_pool_block = connection_pool_block @messages_processed = 0 @messages_received = 0 @format = format @max_messages = @poller_configuration = { wait_time_seconds: 10, # amount of time a long polling receive call can wait for a message before receiving a empty response (which will trigger another polling request) idle_timeout: 20, # disconnects poller after 20 seconds of idle time skip_delete: true, # manually delete messages }.merge(poller_configuration || {}) if queue_poller.before_request do |stats| throw :stop_polling if stats. >= end end end |
Instance Attribute Details
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def connection_pool_block @connection_pool_block end |
#format ⇒ Object
Returns the value of attribute format.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def format @format end |
#max_messages ⇒ Object
Returns the value of attribute max_messages.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def @max_messages end |
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def poller_configuration @poller_configuration end |
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def queue_poller @queue_poller end |
#queue_url ⇒ Object
Returns the value of attribute queue_url.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def queue_url @queue_url end |
Instance Method Details
#get_content(body) ⇒ Object
92 93 94 |
# File 'lib/pheme/queue_poller.rb', line 92 def get_content(body) decompress(body['Message']) end |
#get_metadata(message_body) ⇒ Object
88 89 90 |
# File 'lib/pheme/queue_poller.rb', line 88 def () .except('Message', 'Records') end |
#handle(_message, _metadata) ⇒ Object
106 107 108 |
# File 'lib/pheme/queue_poller.rb', line 106 def handle(, ) raise NotImplementedError end |
#parse_body(queue_message) ⇒ Object
returns queue_message.body as hash, stores and parses get_content to body
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pheme/queue_poller.rb', line 59 def parse_body() = JSON.parse(.body) raw_content = get_content() body = () case format when :csv parsed_content = parse_csv(raw_content) body['Message'] = raw_content when :json parsed_content = parse_json(raw_content) body['Message'] = parsed_content else method_name = "parse_#{format}".to_sym raise ArgumentError, "Unknown format #{format}" unless respond_to?(method_name) parsed_content = __send__(method_name, raw_content) body['Records'] = parsed_content end (, body) parsed_content end |
#parse_csv(message_contents) ⇒ Object
96 97 98 99 |
# File 'lib/pheme/queue_poller.rb', line 96 def parse_csv() parsed_body = SmarterCSV.process(StringIO.new()) parsed_body.map { |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) } end |
#parse_json(message_contents) ⇒ Object
101 102 103 104 |
# File 'lib/pheme/queue_poller.rb', line 101 def parse_json() parsed_body = JSON.parse() RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper end |
#parse_metadata(queue_message) ⇒ Object
83 84 85 86 |
# File 'lib/pheme/queue_poller.rb', line 83 def () = JSON.parse(.body) { timestamp: ['Timestamp'] } end |
#poll ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pheme/queue_poller.rb', line 32 def poll time_start = log_polling_start with_optional_connection_pool_block do queue_poller.poll(poller_configuration) do || @messages_received += 1 Pheme.logger.tagged(.) do begin content = parse_body() = () handle(content, ) queue_poller.() log_delete() @messages_processed += 1 rescue SignalException throw :stop_polling rescue StandardError => e Pheme.logger.error(e) Pheme.(e, "#{self.class} failed to process message", { message: content }) end end end end log_polling_end(time_start) end |