Module: Shoryuken::Worker::ClassMethods

Defined in:
lib/shoryuken/worker.rb

Instance Method Summary collapse

Instance Method Details

#auto_delete?Boolean

Checks if automatic message deletion is enabled for this worker. When enabled, successfully processed messages are automatically deleted from the SQS queue. When disabled, you must manually delete messages or they will become visible again after the visibility timeout.

Examples:

Manual message deletion when auto_delete is false

def perform(sqs_msg, body)
  process_message(body)
  # Manually delete the message after successful processing
  sqs_msg.delete
end

Returns:

  • (Boolean)

    true if auto delete is enabled

See Also:



211
212
213
# File 'lib/shoryuken/worker.rb', line 211

def auto_delete?
  !!(get_shoryuken_options['delete'] || get_shoryuken_options['auto_delete'])
end

#auto_visibility_timeout?Boolean

Checks if automatic visibility timeout extension is enabled for this worker. When enabled, Shoryuken automatically extends the message visibility timeout during processing to prevent the message from becoming visible to other consumers.

Returns:

  • (Boolean)

    true if auto visibility timeout is enabled

See Also:



177
178
179
# File 'lib/shoryuken/worker.rb', line 177

def auto_visibility_timeout?
  !!get_shoryuken_options['auto_visibility_timeout']
end

#exponential_backoff?Boolean

Checks if exponential backoff retry is configured for this worker. When retry intervals are specified, failed jobs will be retried with increasing delays between attempts.

Examples:

Configuring exponential backoff

shoryuken_options retry_intervals: [1, 5, 25, 125, 625]
# Will retry after 1s, 5s, 25s, 125s, then 625s before giving up

Returns:

  • (Boolean)

    true if retry intervals are configured

See Also:



192
193
194
# File 'lib/shoryuken/worker.rb', line 192

def exponential_backoff?
  !!get_shoryuken_options['retry_intervals']
end

#get_shoryuken_optionsObject

:nodoc:



215
216
217
# File 'lib/shoryuken/worker.rb', line 215

def get_shoryuken_options # :nodoc:
  shoryuken_options_hash || Shoryuken.default_worker_options
end

#perform_async(body, options = {}) ⇒ String

Enqueues a job to be processed asynchronously by a Shoryuken worker.

Examples:

Basic job enqueueing

MyWorker.perform_async({ user_id: 123, action: 'send_email' })

FIFO queue with ordering

MyWorker.perform_async(data, message_group_id: 'user_123')

Parameters:

  • body (Object)

    The job payload that will be passed to the worker’s perform method

  • options (Hash) (defaults to: {})

    Additional options for job enqueueing

Options Hash (options):

  • :message_group_id (String)

    FIFO queue group ID for message ordering

  • :message_deduplication_id (String)

    FIFO queue deduplication ID

  • :message_attributes (Hash)

    Custom SQS message attributes

Returns:

  • (String)

    The message ID of the enqueued job



65
66
67
# File 'lib/shoryuken/worker.rb', line 65

def perform_async(body, options = {})
  Shoryuken.worker_executor.perform_async(self, body, options)
end

#perform_in(interval, body, options = {}) ⇒ String Also known as: perform_at

Enqueues a job to be processed after a specified time interval.

Examples:

Delay job by 5 minutes

MyWorker.perform_in(5.minutes, { user_id: 123 })

Delay job by specific number of seconds

MyWorker.perform_in(300, { user_id: 123 })

Parameters:

  • interval (Integer, ActiveSupport::Duration)

    Delay in seconds, or duration object

  • body (Object)

    The job payload that will be passed to the worker’s perform method

  • options (Hash) (defaults to: {})

    Additional options for job enqueueing (see #perform_async)

Returns:

  • (String)

    The message ID of the enqueued job



81
82
83
# File 'lib/shoryuken/worker.rb', line 81

def perform_in(interval, body, options = {})
  Shoryuken.worker_executor.perform_in(self, interval, body, options)
end

#server_middleware {|Shoryuken::Middleware::Chain| ... } ⇒ Shoryuken::Middleware::Chain

Configures server-side middleware chain for this worker class. Middleware runs before and after job processing, similar to Rack middleware.

Examples:

Adding custom middleware

class MyWorker
  include Shoryuken::Worker

  server_middleware do |chain|
    chain.add MyCustomMiddleware
    chain.remove Shoryuken::Middleware::Server::ActiveRecord
  end
end

Yields:

Returns:



102
103
104
105
106
# File 'lib/shoryuken/worker.rb', line 102

def server_middleware
  @_server_chain ||= Shoryuken.server_middleware.dup
  yield @_server_chain if block_given?
  @_server_chain
end

#shoryuken_class_attribute(*attrs) ⇒ Object

:nodoc:



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/shoryuken/worker.rb', line 225

def shoryuken_class_attribute(*attrs) # :nodoc:
  attrs.each do |name|
    singleton_class.instance_eval do
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
    end
    define_singleton_method(name) { nil }

    ivar = "@#{name}"

    singleton_class.instance_eval do
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
    end

    define_singleton_method("#{name}=") do |val|
      singleton_class.class_eval do
        undef_method(name) if method_defined?(name) || private_method_defined?(name)
        define_method(name) { val }
      end

      # singleton? backwards compatibility for ruby < 2.1
      singleton_klass = respond_to?(:singleton?) ? singleton? : self != ancestors.first

      if singleton_klass
        class_eval do
          undef_method(name) if method_defined?(name) || private_method_defined?(name)
          define_method(name) do
            if instance_variable_defined? ivar
              instance_variable_get ivar
            else
              singleton_class.send name
            end
          end
        end
      end
      val
    end

    # instance reader
    undef_method(name) if method_defined?(name) || private_method_defined?(name)
    define_method(name) do
      if instance_variable_defined?(ivar)
        instance_variable_get ivar
      else
        self.class.public_send name
      end
    end

    # instance writer
    m = "#{name}="
    undef_method(m) if method_defined?(m) || private_method_defined?(m)
    attr_writer name
  end
end

#shoryuken_options(opts = {}) ⇒ Object

Configures worker options including queue assignment, processing behavior, and SQS-specific settings. This is the main configuration method for workers.

Examples:

Basic worker configuration

class MyWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'my_queue'

  def perform(sqs_msg, body)
    # Process the message
  end
end

Worker with auto-delete and retries

class ReliableWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'important_queue',
                    auto_delete: true,
                    retry_intervals: [1, 5, 25, 125]
end

Batch processing worker

class BatchWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'batch_queue', batch: true

  def perform(sqs_msgs, bodies)
    # Process array of up to 10 messages
    bodies.each { |body| process_item(body) }
  end
end

Multiple queues with priorities

class MultiQueueWorker
  include Shoryuken::Worker
  shoryuken_options queue: ['high_priority', 'low_priority']
end

Auto-extending visibility timeout for long-running jobs

class LongRunningWorker
  include Shoryuken::Worker
  shoryuken_options queue: 'slow_queue',
                    auto_visibility_timeout: true

  def perform(sqs_msg, body)
    # Long processing that might exceed visibility timeout
    complex_processing(body)
  end
end

Parameters:

  • opts (Hash) (defaults to: {})

    Configuration options for the worker

Options Hash (opts):

  • :queue (String, Array<String>)

    Queue name(s) this worker processes

  • :batch (Boolean) — default: false

    Process messages in batches of up to 10

  • :auto_delete (Boolean) — default: false

    Automatically delete messages after processing

  • :auto_visibility_timeout (Boolean) — default: false

    Automatically extend message visibility

  • :retry_intervals (Array<Integer>)

    Exponential backoff retry intervals in seconds

  • :sqs (Hash)

    Additional SQS client options



165
166
167
168
# File 'lib/shoryuken/worker.rb', line 165

def shoryuken_options(opts = {})
  self.shoryuken_options_hash = get_shoryuken_options.merge(stringify_keys(opts || {}))
  normalize_worker_queue!
end

#stringify_keys(hash) ⇒ Object

:nodoc:



219
220
221
222
223
# File 'lib/shoryuken/worker.rb', line 219

def stringify_keys(hash) # :nodoc:
  new_hash = {}
  hash.each { |key, value| new_hash[key.to_s] = value }
  new_hash
end