Class: Qyu::Workers::Split
- Includes:
- Concerns::Split
- Defined in:
- lib/qyu/workers/split.rb
Overview
Qyu::Workers::Split
Starts a worker to split a certain payload key into multiple jobs
Qyu::SplitWorker.new do
slice_size 25
payload_key 'array'
end
Instance Attribute Summary collapse
-
#splittable ⇒ Object
Returns the value of attribute splittable.
Attributes inherited from Base
Instance Method Summary collapse
-
#work(queue_name) ⇒ Object
Assign a splittable variable by being at the end of a block worker.work(‘queue’) do # do anything # splittable variable must be at the end [1, 2, 3, 4, 5, 6, 6] end.
Methods included from Concerns::Split
#payload_key, #sample, #slice_size
Methods inherited from Base
Methods included from Concerns::Timeout
Methods included from Concerns::PayloadValidator
#validate_payload!, #validates
Methods included from Concerns::FailureQueue
Methods included from Concerns::Callback
Constructor Details
This class inherits a constructor from Qyu::Workers::Base
Instance Attribute Details
#splittable ⇒ Object
Returns the value of attribute splittable.
17 18 19 |
# File 'lib/qyu/workers/split.rb', line 17 def splittable @splittable end |
Instance Method Details
#work(queue_name) ⇒ Object
Assign a splittable variable by being at the end of a block worker.work(‘queue’) do
# do anything
# splittable variable must be at the end
[1, 2, 3, 4, 5, 6, 6]
end
or by passing payload_key ‘array’ to worker initializer then just worker.work(‘queue’)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/qyu/workers/split.rb', line 32 def work(queue_name) validate_split_parameters! super do |task| if block_given? @splittable = yield(task) else # or by passing # payload_key 'array' # to worker initializer @splittable = task.payload[@payload_key] end @splittable.each_slice(@slice_size).with_index do |slice, i| log(:debug, "Split started for queue '#{queue_name}'") input = @sample ? slice.sample : slice new_payload = task.payload.merge({ @payload_key => input }) task_names_to_start = task.descriptor['starts_parallel'] || task.descriptor['starts_manually'] task_names_to_start.each do |task_name_to_start| task.job.create_task(task, task_name_to_start, new_payload) end end end end |