Class: Qyu::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/qyu/models/job.rb

Overview

Qyu::Job

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#created_atObject (readonly)

Returns the value of attribute created_at


6
7
8
# File 'lib/qyu/models/job.rb', line 6

def created_at
  @created_at
end

#descriptorObject (readonly)

Returns the value of attribute descriptor


6
7
8
# File 'lib/qyu/models/job.rb', line 6

def descriptor
  @descriptor
end

#idObject (readonly)

Returns the value of attribute id


6
7
8
# File 'lib/qyu/models/job.rb', line 6

def id
  @id
end

#payloadObject (readonly)

Returns the value of attribute payload


6
7
8
# File 'lib/qyu/models/job.rb', line 6

def payload
  @payload
end

#updated_atObject (readonly)

Returns the value of attribute updated_at


6
7
8
# File 'lib/qyu/models/job.rb', line 6

def updated_at
  @updated_at
end

#workflowObject (readonly)

Returns the value of attribute workflow


6
7
8
# File 'lib/qyu/models/job.rb', line 6

def workflow
  @workflow
end

Class Method Details

.clear_completedInteger

Clears completed jobs


59
60
61
# File 'lib/qyu/models/job.rb', line 59

def self.clear_completed
  Qyu.store.clear_completed_jobs
end

.countInteger

Counts job in state store


44
45
46
# File 'lib/qyu/models/job.rb', line 44

def self.count
  Qyu.store.count_jobs
end

.create(workflow:, payload:) ⇒ Qyu::Job

Creates a new job via a workflow name / object and a payload


15
16
17
18
19
20
# File 'lib/qyu/models/job.rb', line 15

def self.create(workflow:, payload:)
  workflow = Workflow.find_by(name: workflow) if workflow.is_a?(String)
  id = persist(workflow, payload)
  time = Time.now
  new(id, workflow, payload, time, time)
end

.delete(id) ⇒ Object

Deletes job from state store by ID


52
53
54
# File 'lib/qyu/models/job.rb', line 52

def self.delete(id)
  Qyu.store.delete_job(id)
end

.find(id) ⇒ Qyu::Job?

Finds Job by ID. Returns `nil` if job is not present in store


25
26
27
28
29
30
31
# File 'lib/qyu/models/job.rb', line 25

def self.find(id)
  job_attrs = Qyu.store.find_job(id)
  if job_attrs
    new(id, job_attrs['workflow'], job_attrs['payload'],
        job_attrs['created_at'], job_attrs['updated_at'])
  end
end

.select(limit: 30, offset: 0, order: :asc) ⇒ Object


33
34
35
36
37
38
39
# File 'lib/qyu/models/job.rb', line 33

def self.select(limit: 30, offset: 0, order: :asc)
  job_records = Qyu.store.select_jobs(limit, offset, order)
  job_records.map do |record|
    new(record['id'], record['workflow'], record['payload'],
        record['created_at'], record['updated_at'])
  end
end

Instance Method Details

#[](attribute) ⇒ Object


164
165
166
# File 'lib/qyu/models/job.rb', line 164

def [](attribute)
  public_send(attribute)
end

#create_next_tasks(parent_task, payload) ⇒ Object


107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/qyu/models/job.rb', line 107

def create_next_tasks(parent_task, payload)
  Qyu.logger.debug "Creating next tasks for task (ID=#{parent_task.id})"
  next_tasks = next_task_names(parent_task.name)
  Qyu.logger.debug "Next task names: #{next_tasks}"

  next_tasks['without_params']&.each do |next_task_name|
    create_task(parent_task, next_task_name, payload)
  end

  next_tasks['with_params']&.each do |next_task_name, params|
    updated_payload = payload.dup
    params.each do |param_name, value_eqs|
      f = value_eqs.keys[0]
      x = value_eqs.values[0]
      updated_payload[param_name] = calc_func_x(parent_task, f, x)
    end
    create_task(parent_task, next_task_name, updated_payload)
  end
end

#create_task(parent_task, task_name, payload) ⇒ Object


94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/qyu/models/job.rb', line 94

def create_task(parent_task, task_name, payload)
  parent_task_id = parent_task.nil? ? nil : parent_task.id
  Qyu.logger.debug "Task (ID=#{parent_task_id}) created a new task"
  Qyu::Task.create(
    queue_name: queue_name(task_name),
    attributes: {
                  'name' => task_name,
                  'parent_task_id' => parent_task_id,
                  'job_id' => id,
                  'payload' => task_payload(payload, task_name)
                })
end

#find_task_ids_by_name(task_name) ⇒ Object


127
128
129
# File 'lib/qyu/models/job.rb', line 127

def find_task_ids_by_name(task_name)
  Qyu.store.find_task_ids_by_job_id_and_name(id, task_name)
end

#find_task_ids_by_name_and_ancestor_task_id(task_name, ancestor_task_id) ⇒ Object


131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/qyu/models/job.rb', line 131

def find_task_ids_by_name_and_ancestor_task_id(task_name, ancestor_task_id)
  ancestor_task_name = Qyu.store.find_task(ancestor_task_id)['name']
  tasks_path = [task_name]
  key_idx = 0

  while tasks_path[-1] != ancestor_task_name
    found_task = descriptor['tasks'].detect do |_, desc|
      all_task_names = []
      all_task_names.concat(desc['starts'] || [])
      all_task_names.concat((desc['starts_with_params'] || {}).keys)
      all_task_names.concat(desc['starts_parallel'] || [])
      all_task_names.concat(desc['starts_manually'] || [])
      all_task_names.include?(tasks_path[-1])
    end
    tasks_path << found_task[key_idx] if found_task
  end

  tasks_topdown_path = tasks_path.reverse
  # remove topmost task (ancestor_task) from the path
  tasks_topdown_path.shift

  # traverse task tree from top down, and find the <task_name> "descendants" of <ancestor_task>
  parent_task_ids = [ancestor_task_id]
  tasks_topdown_path.each do |t_name|
    parent_task_ids = Qyu.store.find_task_ids_by_job_id_name_and_parent_task_ids(id, t_name, parent_task_ids)
  end
  parent_task_ids
end

#next_task_names(src_task_name) ⇒ Object


79
80
81
82
83
84
# File 'lib/qyu/models/job.rb', line 79

def next_task_names(src_task_name)
  {
    'without_params' => descriptor['tasks'][src_task_name]['starts'],
    'with_params' => descriptor['tasks'][src_task_name]['starts_with_params']
  }
end

#queue_name(task_name) ⇒ Object


75
76
77
# File 'lib/qyu/models/job.rb', line 75

def queue_name(task_name)
  descriptor['tasks'][task_name]['queue']
end

#startObject

Starts job execution Enqueues all tasks scheduled to start at the beginning (`starts` key in workflow descriptor)

#=> job.start


69
70
71
72
73
# File 'lib/qyu/models/job.rb', line 69

def start
  descriptor['starts'].each do |task_name|
    create_task(nil, task_name, payload)
  end
end

#sync_condition(task, next_task_name) ⇒ Object


90
91
92
# File 'lib/qyu/models/job.rb', line 90

def sync_condition(task, next_task_name)
  descriptor['tasks'][task.name]['waits_for'][next_task_name]['condition']
end

#task_status_countsObject


160
161
162
# File 'lib/qyu/models/job.rb', line 160

def task_status_counts
  Qyu.store.task_status_counts(id)
end

#tasks_to_wait_for(task) ⇒ Object


86
87
88
# File 'lib/qyu/models/job.rb', line 86

def tasks_to_wait_for(task)
  descriptor['tasks'][task.name]['waits_for'].keys
end