Class: Pheme::QueuePoller

Inherits:
Object
  • Object
show all
Includes:
Compression
Defined in:
lib/pheme/queue_poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Compression

#compress, #decompress

Constructor Details

#initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {}, sqs_client: nil, idle_timeout: nil, message_handler: nil, &block_message_handler) ⇒ QueuePoller

Returns a new instance of QueuePoller.

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/pheme/queue_poller.rb', line 9

def initialize(queue_url:,
               connection_pool_block: false,
               max_messages: nil,
               format: :json,
               poller_configuration: {},
               sqs_client: nil,
               idle_timeout: nil,
               message_handler: nil,
               &block_message_handler)
  raise ArgumentError, "must specify non-nil queue_url" if queue_url.blank?

  @queue_url = queue_url
  @queue_poller = Aws::SQS::QueuePoller.new(queue_url, client: sqs_client)
  @connection_pool_block = connection_pool_block
  @messages_processed = 0
  @messages_received = 0
  @format = format
  @max_messages = max_messages
  @poller_configuration = {
    wait_time_seconds: 10, # amount of time a long polling receive call can wait for a message before receiving a empty response (which will trigger another polling request)
    idle_timeout: 20, # disconnects poller after 20 seconds of idle time
    skip_delete: true, # manually delete messages
  }.merge(poller_configuration || {})

  @poller_configuration[:idle_timeout] = idle_timeout unless idle_timeout.nil?

  if message_handler
    if message_handler.ancestors.include?(Pheme::MessageHandler)
      @message_handler = message_handler
    else
      raise ArgumentError, 'Invalid message handler, must inherit from Pheme::MessageHandler'
    end
  end

  @block_message_handler = block_message_handler

  raise ArgumentError, 'only provide a message_handler or a block, not both' if @message_handler && @block_message_handler

  if max_messages
    queue_poller.before_request do |stats|
      throw :stop_polling if stats.received_message_count >= max_messages
    end
  end
end

Instance Attribute Details

#connection_pool_blockObject

Returns the value of attribute connection_pool_block.



7
8
9
# File 'lib/pheme/queue_poller.rb', line 7

def connection_pool_block
  @connection_pool_block
end

#formatObject

Returns the value of attribute format.



7
8
9
# File 'lib/pheme/queue_poller.rb', line 7

def format
  @format
end

#max_messagesObject

Returns the value of attribute max_messages.



7
8
9
# File 'lib/pheme/queue_poller.rb', line 7

def max_messages
  @max_messages
end

#poller_configurationObject

Returns the value of attribute poller_configuration.



7
8
9
# File 'lib/pheme/queue_poller.rb', line 7

def poller_configuration
  @poller_configuration
end

#queue_pollerObject

Returns the value of attribute queue_poller.



7
8
9
# File 'lib/pheme/queue_poller.rb', line 7

def queue_poller
  @queue_poller
end

#queue_urlObject

Returns the value of attribute queue_url.



7
8
9
# File 'lib/pheme/queue_poller.rb', line 7

def queue_url
  @queue_url
end

Instance Method Details

#get_content(body) ⇒ Object



112
113
114
# File 'lib/pheme/queue_poller.rb', line 112

def get_content(body)
  decompress(body['Message'])
end

#get_metadata(message_body) ⇒ Object



108
109
110
# File 'lib/pheme/queue_poller.rb', line 108

def (message_body)
  message_body.except('Message', 'Records')
end

#handle(message, metadata) ⇒ Object



126
127
128
129
130
131
132
133
134
# File 'lib/pheme/queue_poller.rb', line 126

def handle(message, )
  if @message_handler
    @message_handler.new(message: message, metadata: ).handle
  elsif @block_message_handler
    @block_message_handler.call(message, )
  else
    raise NotImplementedError
  end
end

#parse_body(queue_message) ⇒ Object

returns queue_message.body as hash, stores and parses get_content to body



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/pheme/queue_poller.rb', line 79

def parse_body(queue_message)
  message_body = JSON.parse(queue_message.body)
  raw_content = get_content(message_body)
  body = (message_body)

  case format
  when :csv
    parsed_content = parse_csv(raw_content)
    body['Message'] = raw_content
  when :json
    parsed_content = parse_json(raw_content)
    body['Message'] = parsed_content
  else
    method_name = "parse_#{format}".to_sym
    raise ArgumentError, "Unknown format #{format}" unless respond_to?(method_name)

    parsed_content = __send__(method_name, raw_content)
    body['Records'] = parsed_content
  end

  log_message_received(queue_message, body)
  parsed_content
end

#parse_csv(message_contents) ⇒ Object



116
117
118
119
# File 'lib/pheme/queue_poller.rb', line 116

def parse_csv(message_contents)
  parsed_body = SmarterCSV.process(StringIO.new(message_contents))
  parsed_body.map { |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) }
end

#parse_json(message_contents) ⇒ Object



121
122
123
124
# File 'lib/pheme/queue_poller.rb', line 121

def parse_json(message_contents)
  parsed_body = JSON.parse(message_contents)
  RecursiveOpenStruct.new({ wrapper: parsed_body }, recurse_over_arrays: true).wrapper
end

#parse_metadata(queue_message) ⇒ Object



103
104
105
106
# File 'lib/pheme/queue_poller.rb', line 103

def (queue_message)
  message_body = JSON.parse(queue_message.body)
  { timestamp: message_body['Timestamp'], topic_arn: message_body['TopicArn'] }
end

#pollObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/pheme/queue_poller.rb', line 54

def poll
  time_start = log_polling_start
  queue_poller.poll(poller_configuration) do |queue_message|
    @messages_received += 1
    Pheme.logger.tagged(queue_message.message_id) do
      begin
        content = parse_body(queue_message)
         = (queue_message)
        with_optional_connection_pool_block { handle(content, ) }
        queue_poller.delete_message(queue_message)
        log_delete(queue_message)
        @messages_processed += 1
      rescue SignalException
        throw :stop_polling
      rescue StandardError => e
        Pheme.logger.error(e)
        Pheme.rollbar(e, "#{self.class} failed to process message", { message: content })
      end
    end
  end
  log_polling_end(time_start)
end