Module: Legion::Extensions::Tasker::Runners::FetchDelayed
- Extended by:
- Helpers::FetchDelayed
- Includes:
- Helpers::Lex, Helpers::Task
- Defined in:
- lib/legion/extensions/tasker/runners/fetch_delayed.rb
Instance Method Summary
collapse
find_delayed, find_subtasks, find_trigger
Instance Method Details
#fetch(**_opts) ⇒ Object
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 6
def fetch(**_opts)
find_delayed.each do |task|
if task[:relationship_delay].is_a?(Integer) && task[:relationship_delay].positive?
next if Time.now < task[:created] + task[:relationship_delay] end
if task[:task_delay].is_a?(Integer) && task[:task_delay].positive?
next if Time.now < task[:created] + task[:task_delay] end
subtask_hash = {
relationship_id: task[:relationship_id],
chain_id: task[:chain_id],
function_id: task[:function_id],
function: task[:function_name],
runner_id: task[:runner_id],
runner_class: task[:runner_class],
task_id: task[:id],
exchange: task[:exchange],
queue: task[:queue]
}
subtask_hash[:conditions] = task[:conditions] if task[:conditions].is_a?(String)
subtask_hash[:transformation] = task[:transformation] if task[:transformation].is_a?(String)
subtask_hash[:routing_key] = if task[:conditions].is_a?(String) && task[:conditions].length > 4
'task.subtask.conditioner'
elsif task[:transformation].is_a?(String) && task[:transformation].length > 4
'task.subtask.transformation'
else
task[:runner_routing_key]
end
send_task(**subtask_hash)
case subtask_hash[:routing_key]
when 'task.subtask.conditioner'
task_update(task[:id], 'conditioner.queued')
when 'task.subtask.transformation'
task_update(task[:id], 'transformer.queued')
else
task_update(task[:id], 'task.queued')
end
end
end
|
#send_task(**opts) ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 51
def send_task(**opts)
opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results)
opts[:success] = if opts.key?(:result) && opts.key?(:success)
opts[:result][:success]
elsif opts.key?(:success)
opts[:success]
else
1
end
log.debug 'pushing delayed task to worker'
Legion::Transport::Messages::Task.new(**opts).publish
end
|