Class: Qyu::Job
- Inherits:
-
Object
- Object
- Qyu::Job
- Defined in:
- lib/qyu/models/job.rb
Instance Attribute Summary collapse
-
#created_at ⇒ Object
readonly
Returns the value of attribute created_at.
-
#descriptor ⇒ Object
readonly
Returns the value of attribute descriptor.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#payload ⇒ Object
readonly
Returns the value of attribute payload.
-
#updated_at ⇒ Object
readonly
Returns the value of attribute updated_at.
Class Method Summary collapse
- .clear_completed ⇒ Object
- .count ⇒ Object
- .create(workflow:, payload:) ⇒ Object
- .delete(id) ⇒ Object
- .find(id) ⇒ Object
- .select(limit: 30, offset: 0, order: :asc) ⇒ Object
Instance Method Summary collapse
- #[](attribute) ⇒ Object
- #create_next_tasks(parent_task, payload) ⇒ Object
- #create_task(parent_task, task_name, payload) ⇒ Object
- #find_task_ids_by_name(task_name) ⇒ Object
- #find_task_ids_by_name_and_ancestor_task_id(task_name, ancestor_task_id) ⇒ Object
- #next_task_names(src_task_name) ⇒ Object
- #queue_name(task_name) ⇒ Object
- #start ⇒ Object
- #sync_condition(task, task_name) ⇒ Object
- #task_status_counts ⇒ Object
- #tasks_to_wait_for(task) ⇒ Object
Instance Attribute Details
#created_at ⇒ Object (readonly)
Returns the value of attribute created_at.
5 6 7 |
# File 'lib/qyu/models/job.rb', line 5 def created_at @created_at end |
#descriptor ⇒ Object (readonly)
Returns the value of attribute descriptor.
5 6 7 |
# File 'lib/qyu/models/job.rb', line 5 def descriptor @descriptor end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
5 6 7 |
# File 'lib/qyu/models/job.rb', line 5 def id @id end |
#payload ⇒ Object (readonly)
Returns the value of attribute payload.
5 6 7 |
# File 'lib/qyu/models/job.rb', line 5 def payload @payload end |
#updated_at ⇒ Object (readonly)
Returns the value of attribute updated_at.
5 6 7 |
# File 'lib/qyu/models/job.rb', line 5 def updated_at @updated_at end |
Class Method Details
.clear_completed ⇒ Object
36 37 38 |
# File 'lib/qyu/models/job.rb', line 36 def self.clear_completed Qyu.store.clear_completed_jobs end |
.count ⇒ Object
28 29 30 |
# File 'lib/qyu/models/job.rb', line 28 def self.count Qyu.store.count_jobs end |
.create(workflow:, payload:) ⇒ Object
7 8 9 10 11 12 |
# File 'lib/qyu/models/job.rb', line 7 def self.create(workflow:, payload:) workflow = Workflow.find_by(name: workflow) if workflow.is_a?(String) id = persist(workflow, payload) time = Time.now new(id, workflow, payload, time, time) end |
.delete(id) ⇒ Object
32 33 34 |
# File 'lib/qyu/models/job.rb', line 32 def self.delete(id) Qyu.store.delete_job(id) end |
.find(id) ⇒ Object
14 15 16 17 18 |
# File 'lib/qyu/models/job.rb', line 14 def self.find(id) job_attrs = Qyu.store.find_job(id) new(id, job_attrs['workflow'], job_attrs['payload'], job_attrs['created_at'], job_attrs['updated_at']) end |
.select(limit: 30, offset: 0, order: :asc) ⇒ Object
20 21 22 23 24 25 26 |
# File 'lib/qyu/models/job.rb', line 20 def self.select(limit: 30, offset: 0, order: :asc) job_records = Qyu.store.select_jobs(limit, offset, order) job_records.map do |record| new(record['id'], record['workflow'], record['payload'], record['created_at'], record['updated_at']) end end |
Instance Method Details
#[](attribute) ⇒ Object
134 135 136 |
# File 'lib/qyu/models/job.rb', line 134 def [](attribute) public_send(attribute) end |
#create_next_tasks(parent_task, payload) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/qyu/models/job.rb', line 78 def create_next_tasks(parent_task, payload) Qyu.logger.debug "Creating next tasks for task (ID=#{parent_task.id})" next_tasks = next_task_names(parent_task.name) Qyu.logger.debug "Next task names: #{next_tasks}" next_tasks['without_params']&.each do |next_task_name| create_task(parent_task, next_task_name, payload) end next_tasks['with_params']&.each do |next_task_name, params| updated_payload = payload.dup params.each do |param_name, value_eqs| f = value_eqs.keys[0] x = value_eqs.values[0] updated_payload[param_name] = calc_func_x(parent_task, f, x) end create_task(parent_task, next_task_name, updated_payload) end end |
#create_task(parent_task, task_name, payload) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/qyu/models/job.rb', line 65 def create_task(parent_task, task_name, payload) parent_task_id = parent_task.nil? ? nil : parent_task.id Qyu.logger.debug "Task (ID=#{parent_task_id}) created a new task" Qyu::Task.create( queue_name: queue_name(task_name), attributes: { 'name' => task_name, 'parent_task_id' => parent_task_id, 'job_id' => id, 'payload' => task_payload(payload, task_name) }) end |
#find_task_ids_by_name(task_name) ⇒ Object
98 99 100 |
# File 'lib/qyu/models/job.rb', line 98 def find_task_ids_by_name(task_name) Qyu.store.find_task_ids_by_job_id_and_name(id, task_name) end |
#find_task_ids_by_name_and_ancestor_task_id(task_name, ancestor_task_id) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/qyu/models/job.rb', line 102 def find_task_ids_by_name_and_ancestor_task_id(task_name, ancestor_task_id) ancestor_task_name = Qyu.store.find_task(ancestor_task_id)['name'] tasks_path = [task_name] key_idx = 0 while tasks_path[-1] != ancestor_task_name found_task = descriptor['tasks'].detect do |_, desc| all_task_names = [] all_task_names.concat(desc['starts'] || []) all_task_names.concat((desc['starts_with_params'] || {}).keys) all_task_names.concat(desc['starts_manually'] || []) all_task_names.include?(tasks_path[-1]) end tasks_path << found_task[key_idx] if found_task end tasks_topdown_path = tasks_path.reverse # remove topmost task (ancestor_task) from the path tasks_topdown_path.shift # traverse task tree from top down, and find the <task_name> "descendants" of <ancestor_task> parent_task_ids = [ancestor_task_id] tasks_topdown_path.each do |t_name| parent_task_ids = Qyu.store.find_task_ids_by_job_id_name_and_parent_task_ids(id, t_name, parent_task_ids) end parent_task_ids end |
#next_task_names(src_task_name) ⇒ Object
50 51 52 53 54 55 |
# File 'lib/qyu/models/job.rb', line 50 def next_task_names(src_task_name) { 'without_params' => descriptor['tasks'][src_task_name]['starts'], 'with_params' => descriptor['tasks'][src_task_name]['starts_with_params'] } end |
#queue_name(task_name) ⇒ Object
46 47 48 |
# File 'lib/qyu/models/job.rb', line 46 def queue_name(task_name) descriptor['tasks'][task_name]['queue'] end |
#start ⇒ Object
40 41 42 43 44 |
# File 'lib/qyu/models/job.rb', line 40 def start descriptor['starts'].each do |task_name| create_task(nil, task_name, payload) end end |
#sync_condition(task, task_name) ⇒ Object
61 62 63 |
# File 'lib/qyu/models/job.rb', line 61 def sync_condition(task, task_name) descriptor['tasks'][task.name]['waits_for'][task_name]['condition'] end |
#task_status_counts ⇒ Object
130 131 132 |
# File 'lib/qyu/models/job.rb', line 130 def task_status_counts Qyu.store.task_status_counts(id) end |
#tasks_to_wait_for(task) ⇒ Object
57 58 59 |
# File 'lib/qyu/models/job.rb', line 57 def tasks_to_wait_for(task) descriptor['tasks'][task.name]['waits_for'].keys end |