Module: Legion::Extensions::Tasker::Runners::CheckSubtask
- Extended by:
- Helpers::FindSubtask
- Includes:
- Helpers::Lex
- Defined in:
- lib/legion/extensions/tasker/runners/check_subtask.rb
Instance Method Summary
collapse
find_subtasks, find_trigger
Instance Method Details
#check_subtasks(runner_class:, function:, **opts) ⇒ Object
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 9
def check_subtasks(runner_class:, function:, **opts)
trigger = find_trigger(runner_class: runner_class, function: function)
find_subtasks(trigger_id: trigger[:function_id]).each do |relationship|
unless relationship[:allow_new_chains]
next if relationship[:chain_id].nil?
next unless opts.key? :chain_id
next unless relationship[:chain_id] == opts[:chain_id]
end
task_hash = relationship
task_hash[:status] = relationship[:delay].zero? ? 'conditioner.queued' : 'task.delayed'
task_hash[:payload] = opts
if opts.key? :master_id
task_hash[:master_id] = opts[:master_id]
elsif opts.key? :parent_id
task_hash[:master_id] = opts[:parent_id]
elsif opts.key? :task_id
task_hash[:master_id] = opts[:task_id]
end
task_hash[:parent_id] = opts[:task_id] if opts.key? :task_id
task_hash[:routing_key] = if relationship[:conditions].is_a?(String) && relationship[:conditions].length > 4
'task.subtask.conditioner'
elsif relationship[:transformation].is_a?(String) && relationship[:transformation].length > 4 'task.subtask.transformation'
else
relationship[:runner_routing_key]
end
if opts[:result].is_a? Array
opts[:result].each do |result|
send_task(results: result,
trigger_runner_id: trigger[:runner_id],
trigger_function_id: trigger[:function_id],
**task_hash)
end
else
results = if opts[:results].is_a? Hash
opts[:results]
elsif opts[:result].is_a? Hash
opts[:result]
else
opts
end
send_task(
results: results,
trigger_runner_id: trigger[:runner_id],
trigger_function_id: trigger[:function_id],
**task_hash
)
end
end
end
|
#insert_task(relationship_id:, function_id:, status: 'task.queued', master_id: nil, parent_id: nil, **opts) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 82
def insert_task(relationship_id:, function_id:, status: 'task.queued', master_id: nil, parent_id: nil, **opts)
insert_hash = { relationship_id: relationship_id, function_id: function_id, status: status }
insert_hash[:master_id] = if master_id.is_a? Integer
master_id
elsif parent_id.is_a? Integer
parent_id
end
insert_hash[:parent_id] = parent_id if parent_id.is_a? Integer
insert_hash[:payload] = Legion::JSON.dump(opts)
Legion::Data::Model::Task.insert(insert_hash)
end
|
#send_task(**opts) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 65
def send_task(**opts)
opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results)
opts[:success] = if opts.key?(:result) && opts.key?(:success)
opts[:result][:success]
elsif opts.key?(:success)
opts[:success]
else
1
end
opts[:task_id] = insert_task(**opts)
return { status: true } unless opts[:delay].zero?
Legion::Transport::Messages::SubTask.new(**opts).publish
end
|