Class: Simplekiq::OrchestrationExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/simplekiq/orchestration_executor.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.execute(args:, job:, workflow:) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/simplekiq/orchestration_executor.rb', line 5

def self.execute(args:, job:, workflow:)
  if workflow.empty?
    Simplekiq.run_empty_callbacks(job, args: args)
    return
  end

  orchestration_batch = Sidekiq::Batch.new
  orchestration_batch.description = "#{job.class.name} Simplekiq orchestration"
  Simplekiq.auto_define_callbacks(orchestration_batch, args: args, job: job)

  orchestration_batch.jobs do
    Simplekiq::BatchTrackerJob.perform_async(job.class.name, orchestration_batch.bid, args)

    new.run_step(workflow, 0)
  end
end

Instance Method Details

#on_success(status, options) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/simplekiq/orchestration_executor.rb', line 43

def on_success(status, options)
  return if options["step"] == options["orchestration_workflow"].length

  Sidekiq::Batch.new(status.parent_bid).jobs do
    run_step(options["orchestration_workflow"], options["step"])
  end
end

#run_step(workflow, step) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/simplekiq/orchestration_executor.rb', line 22

def run_step(workflow, step)
  *jobs = workflow.at(step)
  # This will never be empty because Orchestration#serialized_workflow skips inserting
  # a new step for in_parallel if there were no inner jobs specified.

  next_step = step + 1
  step_batch = Sidekiq::Batch.new
  step_batch.description = "Simplekiq orchestrated step #{next_step}"
  step_batch.on(
    "success",
    self.class,
    {"orchestration_workflow" => workflow, "step" => next_step}
  )

  step_batch.jobs do
    jobs.each do |job|
      Object.const_get(job["klass"]).perform_async(*job["args"])
    end
  end
end