Module: Legion::Extensions::Tasker::Runners::FetchDelayed

Extended by:
Helpers::FetchDelayed
Includes:
Helpers::Lex, Helpers::Task
Defined in:
lib/legion/extensions/tasker/runners/fetch_delayed.rb

Instance Method Summary collapse

Methods included from Helpers::FetchDelayed

find_delayed, find_subtasks, find_trigger

Instance Method Details

#fetch(**_opts) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 6

def fetch(**_opts)
  find_delayed.each do |task|
    if task[:relationship_delay].is_a?(Integer) && task[:relationship_delay].positive?
      next if Time.now < task[:created] + task[:relationship_delay] # rubocop:disable Style/SoleNestedConditional
    end

    if task[:task_delay].is_a?(Integer) && task[:task_delay].positive?
      next if Time.now < task[:created] + task[:task_delay] # rubocop:disable Style/SoleNestedConditional
    end

    subtask_hash = {
      relationship_id: task[:relationship_id],
      chain_id:        task[:chain_id],
      function_id:     task[:function_id],
      function:        task[:function_name],
      runner_id:       task[:runner_id],
      runner_class:    task[:runner_class],
      task_id:         task[:id],
      exchange:        task[:exchange],
      queue:           task[:queue]
    }

    subtask_hash[:conditions] = task[:conditions] if task[:conditions].is_a?(String)
    subtask_hash[:transformation] = task[:transformation] if task[:transformation].is_a?(String)

    subtask_hash[:routing_key] = if task[:conditions].is_a?(String) && task[:conditions].length > 4
                                   'task.subtask.conditioner'
                                 elsif task[:transformation].is_a?(String) && task[:transformation].length > 4
                                   'task.subtask.transformation'
                                 else
                                   task[:runner_routing_key]
                                 end

    send_task(**subtask_hash)
    case subtask_hash[:routing_key]
    when 'task.subtask.conditioner'
      task_update(task[:id], 'conditioner.queued')
    when 'task.subtask.transformation'
      task_update(task[:id], 'transformer.queued')
    else
      task_update(task[:id], 'task.queued')
    end
  end
end

#push(**_opts) ⇒ Object



64
65
66
67
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 64

def push(**_opts)
  Legion::Extensions::Tasker::Transport::Messages::FetchDelayed.new.publish
  { success: true }
end

#send_task(**opts) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/legion/extensions/tasker/runners/fetch_delayed.rb', line 51

def send_task(**opts)
  opts[:results] = opts[:result] if opts.key?(:result) && !opts.key?(:results)
  opts[:success] = if opts.key?(:result) && opts.key?(:success)
                     opts[:result][:success]
                   elsif opts.key?(:success)
                     opts[:success]
                   else
                     1
                   end
  log.debug 'pushing delayed task to worker'
  Legion::Transport::Messages::Task.new(**opts).publish
end