Class: Sidekiq::ActiveRecord::ManagerWorker

Inherits:
Object
  • Object
show all
Includes:
Worker
Defined in:
lib/sidekiq/active_record/manager_worker.rb

Constant Summary collapse

DEFAULT_IDENTIFIER_KEY =
:id
DEFAULT_BATCH_SIZE =
1000

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.additional_keysObject



130
131
132
# File 'lib/sidekiq/active_record/manager_worker.rb', line 130

def additional_keys
  manager_options[:additional_keys]
end

.batch_sizeObject



134
135
136
# File 'lib/sidekiq/active_record/manager_worker.rb', line 134

def batch_size
  manager_options[:batch_size]
end

.default_models_query(query) ⇒ Object

The default of query to run, when the workers runs perform example

class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker
  sidekiq_delegate_task_to UserTaskWorker
  default_models_query -> { User.active }
end

UserManagerWorker.perform_async(:batch_size => 300)


92
93
94
# File 'lib/sidekiq/active_record/manager_worker.rb', line 92

def default_models_query(query)
  @query = query
end

.default_worker_manager_optionsObject



100
101
102
103
104
105
106
# File 'lib/sidekiq/active_record/manager_worker.rb', line 100

def default_worker_manager_options
  {
      identifier_key: DEFAULT_IDENTIFIER_KEY,
      additional_keys: [],
      batch_size: DEFAULT_BATCH_SIZE
  }
end

.get_default_models_queryObject



96
97
98
# File 'lib/sidekiq/active_record/manager_worker.rb', line 96

def get_default_models_query
  @query.call() if @query.present?
end

.get_sidekiq_manager_optionsObject



142
143
144
# File 'lib/sidekiq/active_record/manager_worker.rb', line 142

def get_sidekiq_manager_options
  @sidekiq_manager_options_hash ||= default_worker_manager_options
end

.identifier_keyObject



126
127
128
# File 'lib/sidekiq/active_record/manager_worker.rb', line 126

def identifier_key
  manager_options[:identifier_key]
end

.manager_optionsObject



138
139
140
# File 'lib/sidekiq/active_record/manager_worker.rb', line 138

def manager_options
  get_sidekiq_manager_options.merge(runtime_options)
end

.model_attributes(model) ⇒ Object

returns the model attributes array:

model_id, attr1, attr2, …


110
111
112
113
114
# File 'lib/sidekiq/active_record/manager_worker.rb', line 110

def model_attributes(model)
  additional_attributes = additional_keys.map { |key| model.send(key) }
  id_attribute = model.send(identifier_key)
  additional_attributes.unshift(id_attribute)
end

.perform_query_async(models_query, options = {}) ⇒ Object

For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker) Specify the TaskWorker with the ‘sidekiq_delegate_task_to` method.

@example:

class UserTaskWorker
  def perform(user_id)
    # user task logic
  end
end

class UserSyncer
  include Sidekiq::ActiveRecord::ManagerWorker

  sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker
  sidekiq_manager_options :batch_size => 500,
                          :identifier_key => :user_token,
                          :additional_keys => [:status]
end

UserSyncer.perform_query_async(User.active, :batch_size => 300)

is equivalent to doing:

User.active.each {|user| UserTaskWorker.perform(user.id) }

Parameters:

  • models_query

    ActiveRecord::Relation

  • options (defaults to: {})

    Hash :worker_class - the worker class to delegate the task to. Alternative to the default ‘sidekiq_delegate_task_to` :identifier_key - the model identifier column. Default ’id’ :additional_keys - additional model keys :batch_size - Specifies the size of each batch to push in bulk.

    This is also the number of models to fetch in each find_in_batches query.
    Default is 1000.
    


51
52
53
54
55
56
57
58
# File 'lib/sidekiq/active_record/manager_worker.rb', line 51

def perform_query_async(models_query, options = {})
  set_runtime_options(options)
  models = prepare_models_query(models_query)
  models.find_in_batches(batch_size: batch_size) do |models_batch|
    model_attributes = models_batch.map { |model| model_attributes(model) }
    Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes)
  end
end

.prepare_models_query(models_query) ⇒ Object



116
117
118
119
# File 'lib/sidekiq/active_record/manager_worker.rb', line 116

def prepare_models_query(models_query)
  selected_attributes = [models_query.primary_key.to_sym, identifier_key, additional_keys].uniq
  models_query.select(selected_attributes)
end

.runtime_optionsObject



146
147
148
# File 'lib/sidekiq/active_record/manager_worker.rb', line 146

def runtime_options
  @sidekiq_manager_runtime_options || {}
end

.set_runtime_options(options = {}) ⇒ Object



150
151
152
# File 'lib/sidekiq/active_record/manager_worker.rb', line 150

def set_runtime_options(options={})
  @sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' }
end

.sidekiq_delegate_task_to(worker_klass) ⇒ Object

The task worker to delegate to.

Parameters:

  • worker_klass (Sidekiq::Worker, Symbol)
    • UserTaskWorker or :user_task_worker



63
64
65
66
67
68
69
70
71
# File 'lib/sidekiq/active_record/manager_worker.rb', line 63

def sidekiq_delegate_task_to(worker_klass)
  case worker_klass
  when String, Symbol
    worker_klass.to_s.split('_').map(&:capitalize).join.constantize
  else
    worker_klass
  end
  get_sidekiq_manager_options[:worker_class] = worker_klass
end

.sidekiq_manager_options(opts = {}) ⇒ Object

Allows customization for this type of ManagerWorker. Legal options:

:worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to`
:identifier_key - the model identifier column. Default 'id'
:additional_keys - additional model keys
:batch_size - Specifies the size of the batch. Default to 1000.


80
81
82
# File 'lib/sidekiq/active_record/manager_worker.rb', line 80

def sidekiq_manager_options(opts = {})
  @sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {}))
end

.worker_classObject



121
122
123
124
# File 'lib/sidekiq/active_record/manager_worker.rb', line 121

def worker_class
  fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present?
  manager_options[:worker_class]
end

Instance Method Details

#perform(options = {}) ⇒ Object



9
10
11
12
# File 'lib/sidekiq/active_record/manager_worker.rb', line 9

def perform(options = {})
  default_query = self.class.get_default_models_query
  self.class.perform_query_async(default_query, options)
end