5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 5
def fetch(**_opts)
tasks = Legion::Data::Model::Task.where(status: 'task.delayed')
tasks_pushed = []
log.debug "tasks.count = #{tasks.count}"
tasks.each do |task|
relationship = task.relationship
next if !task.relationship.nil? && Time.now < task.values[:created] + relationship.values[:delay]
next if Time.now < task.values[:created] + task.values[:delay]
subtask = Legion::Transport::Messages::SubTask.new(
relationship_id: relationship.values[:id],
chain_id: relationship.values[:chain_id],
trigger_runner_id: relationship.trigger.runner.values[:id],
trigger_function_id: relationship.values[:trigger_id],
function_id: relationship.action.values[:id],
function: relationship.action.values[:name],
runner_id: relationship.action.values[:runner_id],
runner_class: relationship.action.runner.values[:namespace],
conditions: relationship.values[:conditions],
transformation: relationship.values[:transformation],
task_id: task.values[:id]
)
log.debug 'publishing task'
subtask.publish
task.update(status: 'conditioner.queued')
tasks_pushed.push(task.values[:id])
rescue StandardError => e
task.update(status: 'task.push_exception')
log.error e.message
log.error e.backtrace
end
{ success: true, count: tasks_pushed.count, tasks: tasks_pushed }
rescue StandardError => e
Legion::Logging.error e.message
Legion::Logging.error e.backtrace
end
|