Class: Sidekiq::ActiveRecord::ManagerWorker
- Inherits:
-
Object
- Object
- Sidekiq::ActiveRecord::ManagerWorker
- Includes:
- Worker
- Defined in:
- lib/sidekiq/active_record/manager_worker.rb
Constant Summary collapse
- DEFAULT_IDENTIFIER_KEY =
:id- DEFAULT_BATCH_SIZE =
1000
Class Method Summary collapse
- .additional_keys ⇒ Object
- .batch_size ⇒ Object
-
.default_models_query(query) ⇒ Object
The default of query to run, when the workers runs perform example class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker sidekiq_delegate_task_to UserTaskWorker default_models_query -> { User.active } end.
- .default_worker_manager_options ⇒ Object
- .get_default_models_query ⇒ Object
- .get_sidekiq_manager_options ⇒ Object
- .identifier_key ⇒ Object
- .manager_options ⇒ Object
-
.model_attributes(model) ⇒ Object
returns the model attributes array: [model_id, attr1, attr2, …].
-
.perform_query_async(models_query, options = {}) ⇒ Object
For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker) Specify the TaskWorker with the ‘sidekiq_delegate_task_to` method.
- .prepare_models_query(models_query) ⇒ Object
- .runtime_options ⇒ Object
- .set_runtime_options(options = {}) ⇒ Object
-
.sidekiq_delegate_task_to(worker_klass) ⇒ Object
The task worker to delegate to.
-
.sidekiq_manager_options(opts = {}) ⇒ Object
Allows customization for this type of ManagerWorker.
- .worker_class ⇒ Object
Instance Method Summary collapse
Class Method Details
.additional_keys ⇒ Object
130 131 132 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 130 def additional_keys [:additional_keys] end |
.batch_size ⇒ Object
134 135 136 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 134 def batch_size [:batch_size] end |
.default_models_query(query) ⇒ Object
The default of query to run, when the workers runs perform example
class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker
sidekiq_delegate_task_to UserTaskWorker
default_models_query -> { User.active }
end
UserManagerWorker.perform_async(:batch_size => 300)
92 93 94 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 92 def default_models_query(query) @query = query end |
.default_worker_manager_options ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 100 def { identifier_key: DEFAULT_IDENTIFIER_KEY, additional_keys: [], batch_size: DEFAULT_BATCH_SIZE } end |
.get_default_models_query ⇒ Object
96 97 98 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 96 def get_default_models_query @query.call() if @query.present? end |
.get_sidekiq_manager_options ⇒ Object
142 143 144 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 142 def @sidekiq_manager_options_hash ||= end |
.identifier_key ⇒ Object
126 127 128 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 126 def identifier_key [:identifier_key] end |
.manager_options ⇒ Object
138 139 140 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 138 def .merge() end |
.model_attributes(model) ⇒ Object
returns the model attributes array:
- model_id, attr1, attr2, …
110 111 112 113 114 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 110 def model_attributes(model) additional_attributes = additional_keys.map { |key| model.send(key) } id_attribute = model.send(identifier_key) additional_attributes.unshift(id_attribute) end |
.perform_query_async(models_query, options = {}) ⇒ Object
For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker) Specify the TaskWorker with the ‘sidekiq_delegate_task_to` method.
@example:
class UserTaskWorker
def perform(user_id)
# user task logic
end
end
class UserSyncer
include Sidekiq::ActiveRecord::ManagerWorker
sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker
:batch_size => 500,
:identifier_key => :user_token,
:additional_keys => [:status]
end
UserSyncer.perform_query_async(User.active, :batch_size => 300)
is equivalent to doing:
User.active.each {|user| UserTaskWorker.perform(user.id) }
51 52 53 54 55 56 57 58 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 51 def perform_query_async(models_query, = {}) () models = prepare_models_query(models_query) models.find_in_batches(batch_size: batch_size) do |models_batch| model_attributes = models_batch.map { |model| model_attributes(model) } Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes) end end |
.prepare_models_query(models_query) ⇒ Object
116 117 118 119 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 116 def prepare_models_query(models_query) selected_attributes = [models_query.primary_key.to_sym, identifier_key, additional_keys].uniq models_query.select(selected_attributes) end |
.runtime_options ⇒ Object
146 147 148 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 146 def @sidekiq_manager_runtime_options || {} end |
.set_runtime_options(options = {}) ⇒ Object
150 151 152 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 150 def (={}) @sidekiq_manager_runtime_options = .delete_if { |_, v| v.to_s.strip == '' } end |
.sidekiq_delegate_task_to(worker_klass) ⇒ Object
The task worker to delegate to.
63 64 65 66 67 68 69 70 71 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 63 def sidekiq_delegate_task_to(worker_klass) case worker_klass when String, Symbol worker_klass.to_s.split('_').map(&:capitalize).join.constantize else worker_klass end [:worker_class] = worker_klass end |
.sidekiq_manager_options(opts = {}) ⇒ Object
Allows customization for this type of ManagerWorker. Legal options:
:worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to`
:identifier_key - the model identifier column. Default 'id'
:additional_keys - additional model keys
:batch_size - Specifies the size of the batch. Default to 1000.
80 81 82 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 80 def (opts = {}) @sidekiq_manager_options_hash = .merge((opts || {})) end |
.worker_class ⇒ Object
121 122 123 124 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 121 def worker_class fail NotImplementedError.new('`worker_class` was not specified') unless [:worker_class].present? [:worker_class] end |
Instance Method Details
#perform(options = {}) ⇒ Object
9 10 11 12 |
# File 'lib/sidekiq/active_record/manager_worker.rb', line 9 def perform( = {}) default_query = self.class.get_default_models_query self.class.perform_query_async(default_query, ) end |