Class: SQSProcessor::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/sqs_processor/processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1', aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil) ⇒ Processor

Returns a new instance of Processor.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/sqs_processor/processor.rb', line 14

def initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1',
               aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil)
  @queue_url = queue_url
  @max_messages = max_messages
  @visibility_timeout = visibility_timeout

  setup_logger(logger)
  setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region)

  raise 'Queue URL is required.' unless @queue_url
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/sqs_processor/processor.rb', line 12

def logger
  @logger
end

#queue_urlObject (readonly)

Returns the value of attribute queue_url.



12
13
14
# File 'lib/sqs_processor/processor.rb', line 12

def queue_url
  @queue_url
end

#sqs_clientObject (readonly)

Returns the value of attribute sqs_client.



12
13
14
# File 'lib/sqs_processor/processor.rb', line 12

def sqs_client
  @sqs_client
end

Instance Method Details

#delete_message(message) ⇒ Object



114
115
116
117
118
119
120
121
# File 'lib/sqs_processor/processor.rb', line 114

def delete_message(message)
  @sqs_client.delete_message(
    queue_url: @queue_url,
    receipt_handle: message.receipt_handle
  )
rescue StandardError => e
  logger.error "Failed to delete message #{message.message_id}: #{e.message}"
end

#get_queue_attributesObject



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/sqs_processor/processor.rb', line 123

def get_queue_attributes
  response = @sqs_client.get_queue_attributes(
    queue_url: @queue_url,
    attribute_names: ['All']
  )

  attributes = response.attributes
  logger.info 'Queue attributes:'
  logger.info "  Approximate number of messages: #{attributes['ApproximateNumberOfMessages']}"
  logger.info "  Approximate number of messages not visible: #{attributes['ApproximateNumberOfMessagesNotVisible']}"
  logger.info "  Approximate number of messages delayed: #{attributes['ApproximateNumberOfMessagesDelayed']}"

  attributes
end

#handle_message(message, body) ⇒ Object

Hook method that should be implemented by the host application Override this method in your subclass to implement custom message processing



109
110
111
112
# File 'lib/sqs_processor/processor.rb', line 109

def handle_message(message, body)
  logger.warn 'handle_message method not implemented. Override this method in your subclass.'
  false
end

#process_messagesObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/sqs_processor/processor.rb', line 42

def process_messages
  logger.info 'Starting SQS message processing...'
  logger.info "Queue URL: #{@queue_url}"
  logger.info "Max messages per batch: #{@max_messages}"
  logger.info "Visibility timeout: #{@visibility_timeout} seconds"

  loop do
    receive_messages
    sleep 1 # Small delay between polling cycles
  rescue StandardError => e
    logger.error "Error in message processing loop: #{e.message}"
    logger.error e.backtrace.join("\n")
    sleep 5 # Longer delay on error
  end
end

#process_single_message(message) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/sqs_processor/processor.rb', line 78

def process_single_message(message)
  logger.info "Processing message: #{message.message_id}"

  begin
    # Parse message body
    body = JSON.parse(message.body)
    logger.info "Message body: #{body}"

    # Call the hook method that should be implemented by the host application
    result = handle_message(JSON.parse(body['Message']))

    if result
      # Delete the message from queue after successful processing
      delete_message(message)
      logger.info "Successfully processed and deleted message: #{message.message_id}"
    else
      logger.warn "Message processing returned false, keeping message in queue: #{message.message_id}"
    end
  rescue JSON::ParserError => e
    logger.error "Failed to parse message body as JSON: #{e.message}"
    logger.error "Raw message body: #{message.body}"
    # Keep message in queue for manual inspection
  rescue StandardError => e
    logger.error "Error processing message #{message.message_id}: #{e.message}"
    logger.error e.backtrace.join("\n")
    # Keep message in queue for retry
  end
end

#receive_messagesObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/sqs_processor/processor.rb', line 58

def receive_messages
  response = @sqs_client.receive_message(
    queue_url: @queue_url,
    max_number_of_messages: @max_messages,
    visibility_timeout: @visibility_timeout,
    wait_time_seconds: 20 # Long polling
  )

  if response.messages.empty?
    logger.debug 'No messages received'
    return
  end

  logger.info "Received #{response.messages.length} message(s)"

  response.messages.each do |message|
    process_single_message(message)
  end
end

#setup_logger(custom_logger = nil) ⇒ Object



26
27
28
29
30
31
32
# File 'lib/sqs_processor/processor.rb', line 26

def setup_logger(custom_logger = nil)
  @logger = custom_logger || Logger.new(STDOUT)
  @logger.level = Logger::INFO
  @logger.formatter = proc do |severity, datetime, progname, msg|
    "#{datetime.strftime('%Y-%m-%d %H:%M:%S')} [#{severity}] #{msg}\n"
  end
end

#setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) ⇒ Object



34
35
36
37
38
39
40
# File 'lib/sqs_processor/processor.rb', line 34

def setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region)
  credentials = Aws::Credentials.new(aws_access_key_id, aws_secret_access_key, aws_session_token)
  @sqs_client = Aws::SQS::Client.new(
    region: aws_region,
    credentials: credentials
  )
end