Class: Qyu::Workers::Split

Inherits:
Base
  • Object
show all
Includes:
Concerns::Split
Defined in:
lib/qyu/workers/split.rb

Overview

Qyu::Workers::Split

Starts a worker to split a certain payload key into multiple jobs

Qyu::SplitWorker.new do

slice_size 25
payload_key 'array'

end

Instance Attribute Summary collapse

Attributes inherited from Base

#id, #processed_tasks

Instance Method Summary collapse

Methods included from Concerns::Split

#payload_key, #sample, #slice_size

Methods inherited from Base

#initialize

Methods included from Concerns::Timeout

#timeout

Methods included from Concerns::PayloadValidator

#validate_payload!, #validates

Methods included from Concerns::FailureQueue

#failure_queue

Methods included from Concerns::Callback

#callback, #run_callbacks

Constructor Details

This class inherits a constructor from Qyu::Workers::Base

Instance Attribute Details

#splittableObject

Returns the value of attribute splittable


17
18
19
# File 'lib/qyu/workers/split.rb', line 17

def splittable
  @splittable
end

Instance Method Details

#work(queue_name, blocking: true) ⇒ Object

Assign a splittable variable by being at the end of a block worker.work('queue') do

# do anything
# splittable variable must be at the end
[1, 2, 3, 4, 5, 6, 6]

end

or by passing payload_key 'array' to worker initializer then just worker.work('queue')


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/qyu/workers/split.rb', line 32

def work(queue_name, blocking: true)
  validate_split_parameters!

  super do |task|
    if block_given?
      @splittable = yield(task)
    else
      # or by passing
      # payload_key 'array'
      # to worker initializer
      @splittable = task.payload[@payload_key]
    end

    @splittable.each_slice(@slice_size).with_index do |slice, i|
      log(:debug, "Split started for queue '#{queue_name}'")
      input = @sample ? slice.sample : slice
      new_payload = task.payload.merge({ @payload_key => input })
      task_names_to_start = task.descriptor['starts_parallel'] || task.descriptor['starts_manually']
      task_names_to_start.each do |task_name_to_start|
        task.job.create_task(task, task_name_to_start, new_payload)
      end
    end
  end
end