Class: SidekiqWorkflows::Worker
- Inherits:
-
Object
- Object
- SidekiqWorkflows::Worker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/sidekiq_workflows/worker.rb
Class Method Summary collapse
- .perform_async(workflow, *args) ⇒ Object
- .perform_workflow(workflow, on_complete: nil, on_complete_options: {}) ⇒ Object
Instance Method Summary collapse
Class Method Details
.perform_async(workflow, *args) ⇒ Object
45 46 47 |
# File 'lib/sidekiq_workflows/worker.rb', line 45 def self.perform_async(workflow, *args) set(queue: worker_queue).send(:perform_async, workflow.serialize, *args) end |
.perform_workflow(workflow, on_complete: nil, on_complete_options: {}) ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/sidekiq_workflows/worker.rb', line 49 def self.perform_workflow(workflow, on_complete: nil, on_complete_options: {}) batch = Sidekiq::Batch.new batch.callback_queue = SidekiqWorkflows.callback_queue unless SidekiqWorkflows.callback_queue.nil? batch.description = "Workflow #{workflow.workflow_uuid || '-'} root batch" batch.on(:complete, on_complete, .merge(workflow_uuid: workflow.workflow_uuid)) if on_complete batch.jobs do perform_async(workflow) end batch.bid end |
Instance Method Details
#on_complete(status, options) ⇒ Object
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/sidekiq_workflows/worker.rb', line 34 def on_complete(status, ) workflow = ensure_deserialized(['workflow']) if workflow.on_partial_complete klass, method = workflow.on_partial_complete.split('#') ActiveSupport::Inflector.constantize(klass).new.send(method, status, ) end perform_children(status.parent_batch, workflow) unless status.failures > 0 end |
#perform(workflow) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/sidekiq_workflows/worker.rb', line 9 def perform(workflow) workflow = ensure_deserialized(workflow) case workflow.class.name when 'SidekiqWorkflows::RootNode' perform_children(batch, workflow) when 'SidekiqWorkflows::WorkerNode' batch.jobs do child_batch = Sidekiq::Batch.new child_batch.callback_queue = SidekiqWorkflows.callback_queue unless SidekiqWorkflows.callback_queue.nil? child_batch.description = "Workflow #{workflow.workflow_uuid || '-'}" child_batch.on(:complete, 'SidekiqWorkflows::Worker#on_complete', workflow: workflow.serialize, workflow_uuid: workflow.workflow_uuid) child_batch.jobs do workflow.workers.each do |entry| if entry[:delay] entry[:worker].perform_in(entry[:delay], *entry[:payload]) else entry[:worker].perform_async(*entry[:payload]) end end end end end end |