Class: SQSProcessor::Processor
- Inherits:
-
Object
- Object
- SQSProcessor::Processor
- Defined in:
- lib/sqs_processor/processor.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#queue_url ⇒ Object
readonly
Returns the value of attribute queue_url.
-
#sqs_client ⇒ Object
readonly
Returns the value of attribute sqs_client.
Instance Method Summary collapse
- #delete_message(message) ⇒ Object
- #get_queue_attributes ⇒ Object
-
#handle_message(message, body) ⇒ Object
Hook method that should be implemented by the host application Override this method in your subclass to implement custom message processing.
-
#initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1', aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil) ⇒ Processor
constructor
A new instance of Processor.
- #process_messages ⇒ Object
- #process_single_message(message) ⇒ Object
- #receive_messages ⇒ Object
- #setup_logger(custom_logger = nil) ⇒ Object
- #setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) ⇒ Object
Constructor Details
#initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1', aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil) ⇒ Processor
Returns a new instance of Processor.
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/sqs_processor/processor.rb', line 14 def initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1', aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil) @queue_url = queue_url @max_messages = @visibility_timeout = visibility_timeout setup_logger(logger) setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) raise 'Queue URL is required.' unless @queue_url end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/sqs_processor/processor.rb', line 12 def logger @logger end |
#queue_url ⇒ Object (readonly)
Returns the value of attribute queue_url.
12 13 14 |
# File 'lib/sqs_processor/processor.rb', line 12 def queue_url @queue_url end |
#sqs_client ⇒ Object (readonly)
Returns the value of attribute sqs_client.
12 13 14 |
# File 'lib/sqs_processor/processor.rb', line 12 def sqs_client @sqs_client end |
Instance Method Details
#delete_message(message) ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/sqs_processor/processor.rb', line 114 def () @sqs_client.( queue_url: @queue_url, receipt_handle: .receipt_handle ) rescue StandardError => e logger.error "Failed to delete message #{.}: #{e.}" end |
#get_queue_attributes ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/sqs_processor/processor.rb', line 123 def get_queue_attributes response = @sqs_client.get_queue_attributes( queue_url: @queue_url, attribute_names: ['All'] ) attributes = response.attributes logger.info 'Queue attributes:' logger.info " Approximate number of messages: #{attributes['ApproximateNumberOfMessages']}" logger.info " Approximate number of messages not visible: #{attributes['ApproximateNumberOfMessagesNotVisible']}" logger.info " Approximate number of messages delayed: #{attributes['ApproximateNumberOfMessagesDelayed']}" attributes end |
#handle_message(message, body) ⇒ Object
Hook method that should be implemented by the host application Override this method in your subclass to implement custom message processing
109 110 111 112 |
# File 'lib/sqs_processor/processor.rb', line 109 def (, body) logger.warn 'handle_message method not implemented. Override this method in your subclass.' false end |
#process_messages ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/sqs_processor/processor.rb', line 42 def logger.info 'Starting SQS message processing...' logger.info "Queue URL: #{@queue_url}" logger.info "Max messages per batch: #{@max_messages}" logger.info "Visibility timeout: #{@visibility_timeout} seconds" loop do sleep 1 # Small delay between polling cycles rescue StandardError => e logger.error "Error in message processing loop: #{e.}" logger.error e.backtrace.join("\n") sleep 5 # Longer delay on error end end |
#process_single_message(message) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/sqs_processor/processor.rb', line 78 def () logger.info "Processing message: #{.}" begin # Parse message body body = JSON.parse(.body) logger.info "Message body: #{body}" # Call the hook method that should be implemented by the host application result = (JSON.parse(body['Message'])) if result # Delete the message from queue after successful processing () logger.info "Successfully processed and deleted message: #{.}" else logger.warn "Message processing returned false, keeping message in queue: #{.}" end rescue JSON::ParserError => e logger.error "Failed to parse message body as JSON: #{e.}" logger.error "Raw message body: #{.body}" # Keep message in queue for manual inspection rescue StandardError => e logger.error "Error processing message #{.}: #{e.}" logger.error e.backtrace.join("\n") # Keep message in queue for retry end end |
#receive_messages ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/sqs_processor/processor.rb', line 58 def response = @sqs_client.( queue_url: @queue_url, max_number_of_messages: @max_messages, visibility_timeout: @visibility_timeout, wait_time_seconds: 20 # Long polling ) if response..empty? logger.debug 'No messages received' return end logger.info "Received #{response..length} message(s)" response..each do || () end end |
#setup_logger(custom_logger = nil) ⇒ Object
26 27 28 29 30 31 32 |
# File 'lib/sqs_processor/processor.rb', line 26 def setup_logger(custom_logger = nil) @logger = custom_logger || Logger.new(STDOUT) @logger.level = Logger::INFO @logger.formatter = proc do |severity, datetime, progname, msg| "#{datetime.strftime('%Y-%m-%d %H:%M:%S')} [#{severity}] #{msg}\n" end end |
#setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/sqs_processor/processor.rb', line 34 def setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) credentials = Aws::Credentials.new(aws_access_key_id, aws_secret_access_key, aws_session_token) @sqs_client = Aws::SQS::Client.new( region: aws_region, credentials: credentials ) end |