Module: Shoryuken::Middleware

Defined in:
lib/shoryuken/middleware/chain.rb,
lib/shoryuken/middleware/entry.rb,
lib/shoryuken/middleware/server/timing.rb,
lib/shoryuken/middleware/server/auto_delete.rb,
lib/shoryuken/middleware/server/active_record.rb,
lib/shoryuken/middleware/server/auto_extend_visibility.rb,
lib/shoryuken/middleware/server/exponential_backoff_retry.rb

Overview

Middleware provides a way to wrap message processing with custom logic, similar to Rack middleware in web applications. Middleware runs on the server side and can perform setup, teardown, error handling, and monitoring around job execution.

Middleware classes must implement a ‘call` method that accepts the worker instance, queue name, and SQS message, and must yield to continue the middleware chain.

## Global Middleware Configuration

Configure middleware globally for all workers:

Shoryuken.configure_server do |config|
  config.server_middleware do |chain|
    chain.add MyServerHook
    chain.remove Shoryuken::Middleware::Server::ActiveRecord
  end
end

## Per-Worker Middleware Configuration

Configure middleware for specific workers:

class MyWorker
  include Shoryuken::Worker

  server_middleware do |chain|
    chain.add MyWorkerSpecificMiddleware
  end
end

## Middleware Ordering

Insert middleware at specific positions in the chain:

# Insert before existing middleware
chain.insert_before Shoryuken::Middleware::Server::ActiveRecord, MyDatabaseSetup

# Insert after existing middleware
chain.insert_after Shoryuken::Middleware::Server::Timing, MyMetricsCollector

# Add to beginning of chain
chain.prepend MyFirstMiddleware

## Example Middleware Implementations

# Basic logging middleware
class LoggingMiddleware
  def call(worker_instance, queue, sqs_msg, body)
    puts "Processing #{sqs_msg.message_id} on #{queue}"
    start_time = Time.now
    yield
    puts "Completed in #{Time.now - start_time}s"
  end
end

# Error reporting middleware
class ErrorReportingMiddleware
  def call(worker_instance, queue, sqs_msg, body)
    yield
  rescue => error
    ErrorReporter.notify(error, {
      worker: worker_instance.class.name,
      queue: queue,
      message_id: sqs_msg.message_id
    })
    raise
  end
end

# Performance monitoring middleware
class MetricsMiddleware
  def call(worker_instance, queue, sqs_msg, body)
    start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    yield
    duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    StatsD.timing("shoryuken.#{worker_instance.class.name.underscore}.duration", duration)
  end
end

Defined Under Namespace

Modules: Server Classes: Chain, Entry