Class: SQSProcessor::Processor
- Inherits:
-
Object
- Object
- SQSProcessor::Processor
- Defined in:
- lib/sqs_processor/processor.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#queue_url ⇒ Object
readonly
Returns the value of attribute queue_url.
-
#sqs_client ⇒ Object
readonly
Returns the value of attribute sqs_client.
Instance Method Summary collapse
- #delete_message(message) ⇒ Object
- #get_queue_attributes ⇒ Object
-
#handle_message(message, body) ⇒ Object
Hook method that should be implemented by the host application Override this method in your subclass to implement custom message processing.
-
#initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1', aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil) ⇒ Processor
constructor
A new instance of Processor.
- #process_messages ⇒ Object
- #process_single_message(message) ⇒ Object
- #receive_messages ⇒ Object
- #request_shutdown ⇒ Object
- #setup_logger(custom_logger = nil) ⇒ Object
- #setup_signal_handlers ⇒ Object
- #setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) ⇒ Object
- #shutdown_requested? ⇒ Boolean
Constructor Details
#initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1', aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil) ⇒ Processor
Returns a new instance of Processor.
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/sqs_processor/processor.rb', line 14 def initialize(queue_url:, aws_access_key_id:, aws_secret_access_key:, aws_region: 'us-east-1', aws_session_token: nil, max_messages: 10, visibility_timeout: 30, logger: nil) @queue_url = queue_url = @visibility_timeout = visibility_timeout @shutdown_requested = false setup_logger(logger) setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) setup_signal_handlers raise 'Queue URL is required.' unless @queue_url end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/sqs_processor/processor.rb', line 12 def logger @logger end |
#queue_url ⇒ Object (readonly)
Returns the value of attribute queue_url.
12 13 14 |
# File 'lib/sqs_processor/processor.rb', line 12 def queue_url @queue_url end |
#sqs_client ⇒ Object (readonly)
Returns the value of attribute sqs_client.
12 13 14 |
# File 'lib/sqs_processor/processor.rb', line 12 def sqs_client @sqs_client end |
Instance Method Details
#delete_message(message) ⇒ Object
152 153 154 155 156 157 158 159 |
# File 'lib/sqs_processor/processor.rb', line 152 def () @sqs_client.( queue_url: @queue_url, receipt_handle: .receipt_handle ) rescue StandardError => e logger.error "Failed to delete message #{message.message_id}: #{e.message}" end |
#get_queue_attributes ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/sqs_processor/processor.rb', line 161 def get_queue_attributes response = @sqs_client.get_queue_attributes( queue_url: @queue_url, attribute_names: ['All'] ) attributes = response.attributes logger.info 'Queue attributes:' logger.info " Approximate number of messages: #{attributes['ApproximateNumberOfMessages']}" logger.info " Approximate number of messages not visible: #{attributes['ApproximateNumberOfMessagesNotVisible']}" logger.info " Approximate number of messages delayed: #{attributes['ApproximateNumberOfMessagesDelayed']}" attributes end |
#handle_message(message, body) ⇒ Object
Hook method that should be implemented by the host application Override this method in your subclass to implement custom message processing
147 148 149 150 |
# File 'lib/sqs_processor/processor.rb', line 147 def (, body) logger.warn 'handle_message method not implemented. Override this method in your subclass.' false end |
#process_messages ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/sqs_processor/processor.rb', line 67 def logger.info 'Starting SQS message processing...' logger.info "Queue URL: #{@queue_url}" logger.info "Max messages per batch: #{@max_messages}" logger.info "Visibility timeout: #{@visibility_timeout} seconds" loop do break if @shutdown_requested sleep 1 # Small delay between polling cycles rescue Interrupt logger.info 'Interrupted, initiating graceful shutdown...' @shutdown_requested = true break rescue StandardError => e logger.error "Error in message processing loop: #{e.message}" logger.error e.backtrace.join("\n") sleep 5 # Longer delay on error end logger.info 'Graceful shutdown completed.' end |
#process_single_message(message) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/sqs_processor/processor.rb', line 116 def () logger.info "Processing message: #{message.message_id}" begin # Parse message body body = JSON.parse(.body) logger.info "Message body: #{body}" # Call the hook method that should be implemented by the host application result = (body) if result # Delete the message from queue after successful processing () logger.info "Successfully processed and deleted message: #{message.message_id}" else logger.warn "Message processing returned false, keeping message in queue: #{message.message_id}" end rescue JSON::ParserError => e logger.error "Failed to parse message body as JSON: #{e.message}" logger.error "Raw message body: #{message.body}" # Keep message in queue for manual inspection rescue StandardError => e logger.error "Error processing message #{message.message_id}: #{e.message}" logger.error e.backtrace.join("\n") # Keep message in queue for retry end end |
#receive_messages ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/sqs_processor/processor.rb', line 91 def response = @sqs_client.( queue_url: @queue_url, max_number_of_messages: , visibility_timeout: @visibility_timeout, wait_time_seconds: 20 # Long polling ) if response..empty? logger.debug 'No messages received' return end logger.info "Received #{response.messages.length} message(s)" response..each do || break if @shutdown_requested () end rescue Interrupt logger.info 'Interrupted during message reception...' raise end |
#request_shutdown ⇒ Object
180 181 182 183 |
# File 'lib/sqs_processor/processor.rb', line 180 def request_shutdown logger.info 'Shutdown requested manually...' @shutdown_requested = true end |
#setup_logger(custom_logger = nil) ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/sqs_processor/processor.rb', line 28 def setup_logger(custom_logger = nil) @logger = custom_logger || Logger.new(STDOUT) @logger.level = Logger::INFO @logger.formatter = proc do |severity, datetime, progname, msg| "#{datetime.strftime('%Y-%m-%d %H:%M:%S')} [#{severity}] #{msg}\n" end end |
#setup_signal_handlers ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/sqs_processor/processor.rb', line 44 def setup_signal_handlers Signal.trap('TERM') do logger.info 'Received SIGTERM, initiating graceful shutdown...' @shutdown_requested = true # Immediately interrupt any blocking operations Thread.main.raise Interrupt.new end Signal.trap('INT') do logger.info 'Received SIGINT, initiating graceful shutdown...' @shutdown_requested = true # Immediately interrupt any blocking operations Thread.main.raise Interrupt.new end Signal.trap('QUIT') do logger.info 'Received SIGQUIT, initiating graceful shutdown...' @shutdown_requested = true # Immediately interrupt any blocking operations Thread.main.raise Interrupt.new end end |
#setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) ⇒ Object
36 37 38 39 40 41 42 |
# File 'lib/sqs_processor/processor.rb', line 36 def setup_sqs_client(aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region) credentials = Aws::Credentials.new(aws_access_key_id, aws_secret_access_key, aws_session_token) @sqs_client = Aws::SQS::Client.new( region: aws_region, credentials: credentials ) end |
#shutdown_requested? ⇒ Boolean
176 177 178 |
# File 'lib/sqs_processor/processor.rb', line 176 def shutdown_requested? @shutdown_requested end |