Module: Shoryuken::Middleware
- Defined in:
- lib/shoryuken/middleware/chain.rb,
lib/shoryuken/middleware/entry.rb,
lib/shoryuken/middleware/server/timing.rb,
lib/shoryuken/middleware/server/auto_delete.rb,
lib/shoryuken/middleware/server/active_record.rb,
lib/shoryuken/middleware/server/auto_extend_visibility.rb,
lib/shoryuken/middleware/server/exponential_backoff_retry.rb
Overview
Middleware provides a way to wrap message processing with custom logic, similar to Rack middleware in web applications. Middleware runs on the server side and can perform setup, teardown, error handling, and monitoring around job execution.
Middleware classes must implement a ‘call` method that accepts the worker instance, queue name, and SQS message, and must yield to continue the middleware chain.
## Global Middleware Configuration
Configure middleware globally for all workers:
Shoryuken.configure_server do |config|
config.server_middleware do |chain|
chain.add MyServerHook
chain.remove Shoryuken::Middleware::Server::ActiveRecord
end
end
## Per-Worker Middleware Configuration
Configure middleware for specific workers:
class MyWorker
include Shoryuken::Worker
server_middleware do |chain|
chain.add MyWorkerSpecificMiddleware
end
end
## Middleware Ordering
Insert middleware at specific positions in the chain:
# Insert before existing middleware
chain.insert_before Shoryuken::Middleware::Server::ActiveRecord, MyDatabaseSetup
# Insert after existing middleware
chain.insert_after Shoryuken::Middleware::Server::Timing, MyMetricsCollector
# Add to beginning of chain
chain.prepend MyFirstMiddleware
## Example Middleware Implementations
# Basic logging middleware
class LoggingMiddleware
def call(worker_instance, queue, sqs_msg, body)
puts "Processing #{sqs_msg.} on #{queue}"
start_time = Time.now
yield
puts "Completed in #{Time.now - start_time}s"
end
end
# Error reporting middleware
class ErrorReportingMiddleware
def call(worker_instance, queue, sqs_msg, body)
yield
rescue => error
ErrorReporter.notify(error, {
worker: worker_instance.class.name,
queue: queue,
message_id: sqs_msg.
})
raise
end
end
# Performance monitoring middleware
class MetricsMiddleware
def call(worker_instance, queue, sqs_msg, body)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
yield
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
StatsD.timing("shoryuken.#{worker_instance.class.name.underscore}.duration", duration)
end
end