Class: Pheme::QueuePoller
- Inherits:
-
Object
- Object
- Pheme::QueuePoller
- Includes:
- Compression
- Defined in:
- lib/pheme/queue_poller.rb
Instance Attribute Summary collapse
-
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
-
#format ⇒ Object
Returns the value of attribute format.
-
#max_messages ⇒ Object
Returns the value of attribute max_messages.
-
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
-
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
-
#queue_url ⇒ Object
Returns the value of attribute queue_url.
Instance Method Summary collapse
- #get_content(body) ⇒ Object
- #get_metadata(message_body) ⇒ Object
- #handle(message, metadata) ⇒ Object
-
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &block_message_handler) ⇒ QueuePoller
constructor
A new instance of QueuePoller.
-
#parse_body(queue_message) ⇒ Object
returns queue_message.body as hash, stores and parses get_content to body.
- #parse_csv(message_contents) ⇒ Object
- #parse_json(message_contents) ⇒ Object
- #parse_metadata(queue_message) ⇒ Object
- #poll ⇒ Object
Methods included from Compression
Constructor Details
#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &block_message_handler) ⇒ QueuePoller
Returns a new instance of QueuePoller.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/pheme/queue_poller.rb', line 9 def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &) raise ArgumentError, "must specify non-nil queue_url" if queue_url.blank? @queue_url = queue_url @queue_poller = Aws::SQS::QueuePoller.new(queue_url, client: sqs_client) @connection_pool_block = connection_pool_block @messages_processed = 0 @messages_received = 0 @format = format @max_messages = @poller_configuration = { wait_time_seconds: 10, # amount of time a long polling receive call can wait for a message before receiving a empty response (which will trigger another polling request) idle_timeout: 20, # disconnects poller after 20 seconds of idle time skip_delete: true, # manually delete messages }.merge(poller_configuration || {}) @poller_configuration[:idle_timeout] = idle_timeout unless idle_timeout.nil? if if .ancestors.include?(Pheme::MessageHandler) @message_handler = else raise ArgumentError, 'Invalid message handler, must inherit from Pheme::MessageHandler' end end @block_message_handler = raise ArgumentError, 'only provide a message_handler or a block, not both' if @message_handler && @block_message_handler if queue_poller.before_request do |stats| throw :stop_polling if stats. >= end end end |
Instance Attribute Details
#connection_pool_block ⇒ Object
Returns the value of attribute connection_pool_block.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def connection_pool_block @connection_pool_block end |
#format ⇒ Object
Returns the value of attribute format.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def format @format end |
#max_messages ⇒ Object
Returns the value of attribute max_messages.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def @max_messages end |
#poller_configuration ⇒ Object
Returns the value of attribute poller_configuration.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def poller_configuration @poller_configuration end |
#queue_poller ⇒ Object
Returns the value of attribute queue_poller.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def queue_poller @queue_poller end |
#queue_url ⇒ Object
Returns the value of attribute queue_url.
7 8 9 |
# File 'lib/pheme/queue_poller.rb', line 7 def queue_url @queue_url end |
Instance Method Details
#get_content(body) ⇒ Object
112 113 114 |
# File 'lib/pheme/queue_poller.rb', line 112 def get_content(body) decompress(body['Message']) end |
#get_metadata(message_body) ⇒ Object
108 109 110 |
# File 'lib/pheme/queue_poller.rb', line 108 def () .except('Message', 'Records') end |
#handle(message, metadata) ⇒ Object
126 127 128 129 130 131 132 133 134 |
# File 'lib/pheme/queue_poller.rb', line 126 def handle(, ) if @message_handler @message_handler.new(message: , metadata: ).handle elsif @block_message_handler @block_message_handler.call(, ) else raise NotImplementedError end end |
#parse_body(queue_message) ⇒ Object
returns queue_message.body as hash, stores and parses get_content to body
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/pheme/queue_poller.rb', line 79 def parse_body() = JSON.parse(.body) raw_content = get_content() body = () case format when :csv parsed_content = parse_csv(raw_content) body['Message'] = raw_content when :json parsed_content = parse_json(raw_content) body['Message'] = parsed_content else method_name = "parse_#{format}".to_sym raise ArgumentError, "Unknown format #{format}" unless respond_to?(method_name) parsed_content = __send__(method_name, raw_content) body['Records'] = parsed_content end (, body) parsed_content end |
#parse_csv(message_contents) ⇒ Object
116 117 118 119 |
# File 'lib/pheme/queue_poller.rb', line 116 def parse_csv() parsed_body = SmarterCSV.process(StringIO.new()) parsed_body.map { |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) } end |
#parse_json(message_contents) ⇒ Object
121 122 123 124 |
# File 'lib/pheme/queue_poller.rb', line 121 def parse_json() parsed_body = JSON.parse() RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper end |
#parse_metadata(queue_message) ⇒ Object
103 104 105 106 |
# File 'lib/pheme/queue_poller.rb', line 103 def () = JSON.parse(.body) { timestamp: ['Timestamp'], topic_arn: ['TopicArn'] } end |
#poll ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pheme/queue_poller.rb', line 54 def poll time_start = log_polling_start queue_poller.poll(poller_configuration) do || @messages_received += 1 Pheme.logger.tagged(.) do begin content = parse_body() = () with_optional_connection_pool_block { handle(content, ) } queue_poller.() log_delete() @messages_processed += 1 rescue SignalException throw :stop_polling rescue StandardError => e Pheme.logger.error(e) Pheme.(e, "#{self.class} failed to process message", { message: content }) end end end log_polling_end(time_start) end |