SQS Processor Gem
A Ruby gem for processing messages from Amazon SQS queues with configurable message handling and error recovery.
Features
- Long Polling: Efficiently polls SQS with configurable wait times
- Batch Processing: Processes multiple messages per batch
- Error Handling: Robust error handling with message retention on failure
- Customizable: Extensible message processing logic
- Logging: Comprehensive logging with configurable levels
- Command Line Options: Flexible configuration via command line arguments
- Environment Variables: Support for AWS credentials and configuration via environment variables
Prerequisites
- Ruby 2.6 or higher
- AWS credentials configured (via environment variables, IAM roles, or AWS CLI)
- SQS queue URL
Installation
As a Gem
gem install sqs_processor
From Source
- Clone the repository
- Install dependencies:
bundle install
- Build and install the gem:
bundle exec rake install
- Copy the environment example and configure your settings:
cp env.example .env
- Edit
.envwith your AWS credentials and SQS queue URL:
DATA_SYNC_AWS_ACCESS_KEY_ID=your_access_key_here
DATA_SYNC_AWS_SECRET_ACCESS_KEY=your_secret_key_here
DATA_SYNC_AWS_REGION=us-east-1
DATA_SYNC_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/your-queue-name
Usage
The gem is designed to be used programmatically in your Ruby applications.
Programmatic Usage
Basic Usage
require 'sqs_processor'
# Create a processor instance with AWS credentials
processor = SQSProcessor::Processor.new(
queue_url: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
aws_access_key_id: 'your-access-key',
aws_secret_access_key: 'your-secret-key',
aws_region: 'us-east-1',
max_messages: 10,
visibility_timeout: 30
)
# Start processing messages
processor.
Message Processing
The gem uses a hook-based approach for message processing. You must implement the handle_message method in your subclass to define how messages should be processed.
Hook Method
The handle_message(body) method receives:
body: The parsed JSON body of the message
Return true if processing was successful (message will be deleted from queue), or false if processing failed (message will remain in queue for retry).
Example Message Format
{
"event_type": "data_sync",
"dataset_id": "12345",
"timestamp": "2024-01-01T00:00:00Z"
}
Custom Processing
To implement custom message processing logic, create a subclass of SQSProcessor::Processor and override the handle_message method:
require 'sqs_processor'
class MyCustomProcessor < SQSProcessor::Processor
def (body)
# This method receives the parsed message body
# Return true if processing was successful, false otherwise
case body['event_type']
when 'data_sync'
process_data_sync(body)
when 'report_generation'
process_report_generation(body)
else
logger.warn "Unknown event type: #{body['event_type']}"
false
end
end
private
def process_data_sync(body)
logger.info "Processing data sync for dataset: #{body['dataset_id']}"
# Your custom logic here
true
end
def process_report_generation(body)
logger.info "Processing report generation for report: #{body['report_id']}"
# Your custom logic here
true
end
end
# Usage
processor = MyCustomProcessor.new(
queue_url: 'your-queue-url',
aws_access_key_id: 'your-access-key',
aws_secret_access_key: 'your-secret-key',
aws_region: 'us-east-1'
)
processor.
Configuration
Constructor Parameters
The SQSProcessor::Processor.new method accepts the following parameters:
queue_url:(required) - The SQS queue URLaws_access_key_id:(required) - AWS access key IDaws_secret_access_key:(required) - AWS secret access keyaws_region:(optional, default: 'us-east-1') - AWS regionaws_session_token:(optional) - AWS session token for temporary credentialsmax_messages:(optional, default: 10) - Maximum messages per batchvisibility_timeout:(optional, default: 30) - Message visibility timeout in secondslogger:(optional) - Custom logger instance
Environment Variables
DATA_SYNC_AWS_ACCESS_KEY_ID: Your AWS access keyDATA_SYNC_AWS_SECRET_ACCESS_KEY: Your AWS secret keyDATA_SYNC_AWS_SESSION_TOKEN: Your AWS session token (optional, for temporary credentials)DATA_SYNC_AWS_REGION: AWS region (default: us-east-1)DATA_SYNC_SQS_QUEUE_URL: Your SQS queue URL
AWS Credentials
The gem supports multiple ways to provide AWS credentials:
- Initializer Configuration: Set credentials in the initializer block
- Environment Variables: Use the
DATA_SYNC_prefixed environment variables - AWS SDK Default Chain: If no credentials are provided, the AWS SDK will use its default credential provider chain (IAM roles, AWS CLI, etc.)
- Direct Parameter: Pass credentials directly to the processor constructor
The script supports multiple ways to provide AWS credentials:
- Environment Variables: Set
AWS_ACCESS_KEY_IDandAWS_SECRET_ACCESS_KEY - IAM Roles: If running on EC2 with IAM roles
- AWS CLI: If you have AWS CLI configured
- AWS SDK Default Credential Provider Chain: Automatic credential resolution
Error Handling
- JSON Parse Errors: Messages with invalid JSON are logged but kept in queue
- Processing Errors: Failed messages remain in queue for retry
- Network Errors: Automatic retry with exponential backoff
- Queue Errors: Comprehensive error logging with stack traces
Monitoring
The script provides detailed logging including:
- Queue attributes (message counts)
- Message processing status
- Error details with stack traces
- Processing performance metrics
Best Practices
- Set appropriate visibility timeout: Should be longer than your processing time
- Use long polling: Reduces API calls and costs
- Handle errors gracefully: Return
falsefrom processing methods to keep messages in queue - Monitor queue depth: Use the built-in queue attribute reporting
- Use appropriate batch sizes: Balance between throughput and memory usage
Troubleshooting
Common Issues
- "Queue URL is required": Set
DATA_SYNC_SQS_QUEUE_URLenvironment variable or use-qoption - "Access Denied": Check AWS credentials and SQS permissions
- "Queue does not exist": Verify queue URL and region
- Messages not being processed: Check visibility timeout and processing logic
Debug Mode
To enable debug logging, modify the logger level in the script:
@logger.level = Logger::DEBUG
Development
Running Tests
bundle exec rspec
Code Style
bundle exec rubocop
Building the Gem
bundle exec rake build
Publishing the Gem
bundle exec rake release
License
This gem is licensed under the MIT License - see the LICENSE file for details.