Module: Legion::Extensions::Tasker::Runners::CheckSubtask

Includes:
Helpers::Lex
Defined in:
lib/legion/extensions/tasker/runners/check_subtask.rb

Instance Method Summary collapse

Instance Method Details

#check_subtasks(runner_class:, function:, **opts) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 8

def check_subtasks(runner_class:, function:, **opts)
  runner_record = Legion::Data::Model::Runner[namespace: runner_class]
  return if runner_record.nil?

  function_record = runner_record.functions_dataset[name: function]
  return if function_record.nil?

  relationships = function_record.trigger_relationships_dataset.where(:active)
  relationships.where(chain_id: opts[:chain_id] || :allow_new_chains) if opts.key? :chain_id
  return { success: true, count: relationships.count } if relationships.count.zero?

  relationships.each do |relationship|
    unless relationship.values[:allow_new_chains]
      next if relationship.chain.nil?
      next unless opts.key? :chain_id
      next unless relationship.values[:chain_id] == opts[:chain_id]
    end

    action_function = relationship.action
    action_runner = action_function.runner

    status = relationship.values[:delay].zero? ? 'conditioner.queued' : 'task.delayed'

    task_id_hash = { runner_class:    action_runner.values[:namespace],
                     function:        action_function.values[:name],
                     status:          status,
                     relationship_id: relationship.values[:id] }
    task_id_hash[:payload] = opts

    if opts.key? :master_id
      task_id_hash[:master_id] = opts[:master_id]
    elsif opts.key? :parent_id
      task_id_hash[:master_id] = opts[:parent_id]
    elsif opts.key? :task_id
      task_id_hash[:master_id] = opts[:task_id]
    end

    task_id_hash[:parent_id] = opts[:task_id] if opts.key? :task_id
    if opts[:result].is_a? Array
      opts[:result].each do |result|
        send_task(task_id_hash,
                  relationship:    relationship,
                  runner_record:   runner_record,
                  function_record: function_record,
                  action_function: action_function,
                  action_runner:   action_runner,
                  result:          result)
      end
    else
      send_task(task_id_hash,
                relationship:    relationship,
                runner_record:   runner_record,
                function_record: function_record,
                action_function: action_function,
                action_runner:   action_runner,
                **opts)
    end
  end
rescue StandardError => e
  Legion::Logging.fatal e.message
  Legion::Logging.fatal e.backtrace
  Legion::Logging.fatal runner_class
  Legion::Logging.fatal function
  Legion::Logging.fatal opts.keys
  Legion::Logging.fatal opts[:entry]
end

#send_task(task_id_hash, relationship:, runner_record:, function_record:, action_function:, action_runner:, **opts) ⇒ Object

rubocop:disable Layout/LineLength



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/legion/extensions/tasker/runners/check_subtask.rb', line 75

def send_task(task_id_hash, relationship:, runner_record:, function_record:, action_function:, action_runner:, **opts) # rubocop:disable Layout/LineLength
  task_id = Legion::Runner::Status.generate_task_id(**task_id_hash)[:task_id]

  return { status: true } unless relationship.values[:delay].zero?

  subtask_hash = {
    relationship_id:     relationship.values[:id],
    chain_id:            relationship.values[:chain_id],
    trigger_runner_id:   runner_record.values[:id],
    trigger_function_id: function_record.values[:id],
    function_id:         action_function.values[:id],
    function:            action_function.values[:name],
    runner_id:           action_runner.values[:id],
    runner_class:        action_runner.values[:namespace],
    conditions:          relationship.values[:conditions],
    transformation:      relationship.values[:transformation],
    debug:               relationship.values[:debug] && 1 || 0,
    task_id:             task_id,
    results:             opts[:result]
  }

  subtask_hash[:routing_key] = if subtask_hash[:conditions].is_a?(String) && subtask_hash[:conditioners].length > 4 # rubocop:disable Layout/LineLength
                                 'task.subtask.conditioner'
                               elsif subtask_hash[:transformation].is_a?(String) && subtask_hash[:transformation].length > 4 # rubocop:disable Layout/LineLength
                                 'task.subtask.transform'
                               else
                                 "#{runner_record.extension.values[:exchange]}.#{runner_record.values[:queue]}.#{subtask_hash[:function]}" # rubocop:disable Layout/LineLength
                               end

  subtask_hash[:success] = if opts.nil?
                             1
                           elsif opts.key?(:result)
                             # opts[:result][:success]
                             1
                           elsif opts.key?(:success)
                             opts[:success]
                           else
                             1
                           end
  Legion::Transport::Messages::SubTask.new(**subtask_hash).publish
end