Class: Qyu::Task
- Inherits:
-
Object
- Object
- Qyu::Task
- Defined in:
- lib/qyu/models/task.rb
Overview
Qyu::Task A Task represents a unit of work in a workflow. Conceptually a Task:
-
may not exist outside the context of a queue.
-
it is created ON the queue
-
it remains on the queue until it was successfully processed (or failed “enough” times)
Constant Summary collapse
- LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL =
0.8
- POLL_INTERVAL =
0.5
Instance Attribute Summary collapse
-
#created_at ⇒ Object
readonly
Returns the value of attribute created_at.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#job_id ⇒ Object
readonly
Returns the value of attribute job_id.
-
#message_id ⇒ Object
readonly
Returns the value of attribute message_id.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#parent_task_id ⇒ Object
readonly
Returns the value of attribute parent_task_id.
-
#payload ⇒ Object
readonly
Returns the value of attribute payload.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#updated_at ⇒ Object
readonly
Returns the value of attribute updated_at.
Class Method Summary collapse
- .acknowledge_message(queue_name, message_id) ⇒ Object
-
.create(queue_name: nil, attributes: nil) ⇒ Object
by defintion Task.create does 2 things: - persists the Task in the Store - enqueues the Task to the Queue We have to make sure that a Task is unique in the Store.
- .enqueue_in_failure_queue(queue_name, id, message_id) ⇒ Object
- .fetch(queue_name) ⇒ Object
- .requeue(queue_name, id, message_id) ⇒ Object
- .select(job_id:) ⇒ Object
- .valid_attributes?(_attributes) ⇒ Boolean
- .valid_queue_name?(queue_name) ⇒ Boolean
Instance Method Summary collapse
- #[](attribute) ⇒ Object
- #acknowledge_message ⇒ Object
- #acknowledgeable? ⇒ Boolean
- #completed? ⇒ Boolean
-
#descriptor ⇒ Hash
Returns task descriptor.
- #enqueue_in_failure_queue ⇒ Object
-
#job ⇒ Qyu::Job
Returns parent job.
- #lock! ⇒ Object
- #locked? ⇒ Boolean
- #mark_completed ⇒ Object
- #mark_failed ⇒ Object
- #mark_invalid_payload ⇒ Object
- #mark_queued ⇒ Object
- #mark_working ⇒ Object
- #requeue ⇒ Object
- #unlock! ⇒ Object
-
#workflow ⇒ Qyu::Workflow
Returns workflow specified in parent job.
-
#workflow_descriptor ⇒ Hash
Returns workflow descriptor from parent job.
Instance Attribute Details
#created_at ⇒ Object (readonly)
Returns the value of attribute created_at.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def created_at @created_at end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def id @id end |
#job_id ⇒ Object (readonly)
Returns the value of attribute job_id.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def job_id @job_id end |
#message_id ⇒ Object (readonly)
Returns the value of attribute message_id.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def name @name end |
#parent_task_id ⇒ Object (readonly)
Returns the value of attribute parent_task_id.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def parent_task_id @parent_task_id end |
#payload ⇒ Object (readonly)
Returns the value of attribute payload.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def payload @payload end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def queue_name @queue_name end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def status @status end |
#updated_at ⇒ Object (readonly)
Returns the value of attribute updated_at.
11 12 13 |
# File 'lib/qyu/models/task.rb', line 11 def updated_at @updated_at end |
Class Method Details
.acknowledge_message(queue_name, message_id) ⇒ Object
145 146 147 148 |
# File 'lib/qyu/models/task.rb', line 145 def self.(queue_name, ) Qyu.logger.debug "Acknowledging message with ID=#{message_id} from queue `#{queue_name}`" Qyu.queue.(queue_name, ) end |
.create(queue_name: nil, attributes: nil) ⇒ Object
by defintion Task.create does 2 things:
-
persists the Task in the Store
-
enqueues the Task to the Queue
We have to make sure that a Task is unique in the Store. Because of this create first looks up if the task has already been persisted. If it exists then there is no need to persist it again, only to enqueue it. Double (or multiple) delivery of messages is allowed and handled at worker level. Possible scenario: A Job failed at some point. A few of its tasks completed successfully, others failed. Because of this, certain tasks haven’t even been created. When we restart the job, the tasks will be recreated. If a task has already existed, and completed, then that state will be unchanged, and when the worker picks it up, will notice the completed state, acknowledge the message, and continue the next steps.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/qyu/models/task.rb', line 31 def self.create(queue_name: nil, attributes: nil) fail Qyu::Errors::InvalidTaskAttributes unless valid_attributes?(attributes) fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name) Qyu.logger.debug "find_or_persist queue_name=#{queue_name} and attributes=#{attributes}" task_id = Qyu.store.find_or_persist_task( attributes['name'], queue_name, attributes['payload'], attributes['job_id'], attributes['parent_task_id'] ) do |t_id| Qyu.logger.debug "enqueue queue_name=#{queue_name} and task_id=#{t_id}" Qyu.queue.enqueue_task(queue_name, t_id) end new(task_id, attributes, queue_name) end |
.enqueue_in_failure_queue(queue_name, id, message_id) ⇒ Object
169 170 171 172 |
# File 'lib/qyu/models/task.rb', line 169 def self.enqueue_in_failure_queue(queue_name, id, ) Qyu.logger.debug "Enqueuing failed message with ID=#{message_id} in #{queue_name} failures queue" Qyu.queue.enqueue_task_to_failed_queue(queue_name, id) end |
.fetch(queue_name) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/qyu/models/task.rb', line 50 def self.fetch(queue_name) fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name) begin = Qyu.queue.(queue_name) task_id = ['task_id'] task_attrs = Qyu.store.find_task(task_id) rescue => ex ||= {} raise Qyu::Errors::CouldNotFetchTask.new(queue_name, ['id'], ['task_id'], ex) end new(task_id, task_attrs, queue_name, ['id']) end |
.requeue(queue_name, id, message_id) ⇒ Object
157 158 159 160 161 |
# File 'lib/qyu/models/task.rb', line 157 def self.requeue(queue_name, id, ) # TODO For FIFO queues (future use) Qyu.logger.debug "Re-enqueuing message with ID=#{message_id} in queue `#{queue_name}`" Qyu.queue.enqueue_task(queue_name, id) end |
.select(job_id:) ⇒ Object
63 64 65 66 67 |
# File 'lib/qyu/models/task.rb', line 63 def self.select(job_id:) Qyu.store.select_tasks_by_job_id(job_id).map do |task| new(task['id'], task, task['queue_name']) end end |
.valid_attributes?(_attributes) ⇒ Boolean
69 70 71 |
# File 'lib/qyu/models/task.rb', line 69 def self.valid_attributes?(_attributes) true end |
.valid_queue_name?(queue_name) ⇒ Boolean
73 74 75 |
# File 'lib/qyu/models/task.rb', line 73 def self.valid_queue_name?(queue_name) !queue_name.nil? && queue_name != '' end |
Instance Method Details
#[](attribute) ⇒ Object
202 203 204 |
# File 'lib/qyu/models/task.rb', line 202 def [](attribute) public_send(attribute) end |
#acknowledge_message ⇒ Object
140 141 142 143 |
# File 'lib/qyu/models/task.rb', line 140 def fail Qyu::Errors::MessageNotReceived if .nil? self.class.(queue_name, ) end |
#acknowledgeable? ⇒ Boolean
77 78 79 |
# File 'lib/qyu/models/task.rb', line 77 def acknowledgeable? @status.completed? || @status.invalid_payload? end |
#completed? ⇒ Boolean
81 82 83 |
# File 'lib/qyu/models/task.rb', line 81 def completed? @status.completed? end |
#descriptor ⇒ Hash
Returns task descriptor
198 199 200 |
# File 'lib/qyu/models/task.rb', line 198 def descriptor workflow_descriptor['tasks'][name] end |
#enqueue_in_failure_queue ⇒ Object
163 164 165 166 167 |
# File 'lib/qyu/models/task.rb', line 163 def enqueue_in_failure_queue fail Qyu::Errors::MessageNotReceived if .nil? self.class.(queue_name, ) self.class.enqueue_in_failure_queue(queue_name, id, ) end |
#job ⇒ Qyu::Job
Returns parent job
191 192 193 |
# File 'lib/qyu/models/task.rb', line 191 def job @job ||= Qyu::Job.find(job_id) end |
#lock! ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/qyu/models/task.rb', line 89 def lock! fail Qyu::Errors::LockAlreadyAcquired if locked? Qyu.logger.debug "Task with ID=#{id} lock!" @lease_token, @locked_until = Qyu.store.lock_task!(id, Qyu.config.store[:lease_period]) Qyu.logger.debug "lease_token = #{@lease_token} | locked_until = #{@locked_until}" return false if @lease_token.nil? schedule_renewal true end |
#locked? ⇒ Boolean
85 86 87 |
# File 'lib/qyu/models/task.rb', line 85 def locked? !@lease_token.nil? && !@locked_until.nil? && Time.now < @locked_until end |
#mark_completed ⇒ Object
125 126 127 128 |
# File 'lib/qyu/models/task.rb', line 125 def mark_completed Qyu.store.update_status(id, Status::COMPLETED) Qyu.logger.info "Task with ID=#{id} marked completed." end |
#mark_failed ⇒ Object
130 131 132 133 |
# File 'lib/qyu/models/task.rb', line 130 def mark_failed Qyu.store.update_status(id, Status::FAILED) Qyu.logger.debug "Task with ID=#{id} marked failed." end |
#mark_invalid_payload ⇒ Object
135 136 137 138 |
# File 'lib/qyu/models/task.rb', line 135 def mark_invalid_payload Qyu.store.update_status(id, Status::INVALID_PAYLOAD) Qyu.logger.debug "Task with ID=#{id} has invalid payload." end |
#mark_queued ⇒ Object
115 116 117 118 |
# File 'lib/qyu/models/task.rb', line 115 def mark_queued Qyu.store.update_status(id, Status::QUEUED) Qyu.logger.debug "Task with ID=#{id} marked queued." end |
#mark_working ⇒ Object
120 121 122 123 |
# File 'lib/qyu/models/task.rb', line 120 def mark_working Qyu.store.update_status(id, Status::WORKING) Qyu.logger.debug "Task with ID=#{id} marked working." end |
#requeue ⇒ Object
150 151 152 153 154 155 |
# File 'lib/qyu/models/task.rb', line 150 def requeue # TODO For FIFO queues (future use) fail Qyu::Errors::MessageNotReceived if .nil? self.class.(queue_name, ) self.class.requeue(queue_name, id, ) end |
#unlock! ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/qyu/models/task.rb', line 101 def unlock! fail Qyu::Errors::LockNotAcquired unless locked? Qyu.logger.debug "Task with ID=#{id} unlocking!" @lease_thread&.kill success = Qyu.store.unlock_task!(id, @lease_token) if success @lease_token = nil @locked_until = nil end success end |
#workflow ⇒ Qyu::Workflow
Returns workflow specified in parent job
177 178 179 |
# File 'lib/qyu/models/task.rb', line 177 def workflow job.workflow end |
#workflow_descriptor ⇒ Hash
Returns workflow descriptor from parent job
184 185 186 |
# File 'lib/qyu/models/task.rb', line 184 def workflow_descriptor job.descriptor end |