Class: Qyu::Task
- Inherits:
-
Object
- Object
- Qyu::Task
- Defined in:
- lib/qyu/models/task.rb
Overview
A Task represents a unit of work in a workflow. Conceptually a Task:
-
may not exist outside the context of a queue.
-
it is created ON the queue
-
it remains on the queue until it was successfully processed (or failed “enough” times)
Constant Summary collapse
- LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL =
0.8
- POLL_INTERVAL =
0.5
Instance Attribute Summary collapse
-
#created_at ⇒ Object
readonly
Returns the value of attribute created_at.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#job_id ⇒ Object
readonly
Returns the value of attribute job_id.
-
#message_id ⇒ Object
readonly
Returns the value of attribute message_id.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#parent_task_id ⇒ Object
readonly
Returns the value of attribute parent_task_id.
-
#payload ⇒ Object
readonly
Returns the value of attribute payload.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#updated_at ⇒ Object
readonly
Returns the value of attribute updated_at.
Class Method Summary collapse
- .acknowledge_message(queue_name, message_id) ⇒ Object
-
.create(queue_name: nil, attributes: nil) ⇒ Object
by defintion Task.create does 2 things: - persists the Task in the Store - enqueues the Task to the Queue We have to make sure that a Task is unique in the Store.
- .enqueue_in_failure_queue(queue_name, id, message_id) ⇒ Object
- .fetch(queue_name) ⇒ Object
- .requeue(queue_name, id, message_id) ⇒ Object
- .select(job_id:) ⇒ Object
- .valid_attributes?(_attributes) ⇒ Boolean
- .valid_queue_name?(queue_name) ⇒ Boolean
Instance Method Summary collapse
- #[](attribute) ⇒ Object
- #acknowledge_message ⇒ Object
- #acknowledgeable? ⇒ Boolean
- #completed? ⇒ Boolean
- #enqueue_in_failure_queue ⇒ Object
- #job ⇒ Object
- #lock! ⇒ Object
- #locked? ⇒ Boolean
- #mark_completed ⇒ Object
- #mark_failed ⇒ Object
- #mark_invalid_payload ⇒ Object
- #mark_queued ⇒ Object
- #mark_working ⇒ Object
- #requeue ⇒ Object
- #unlock! ⇒ Object
Instance Attribute Details
#created_at ⇒ Object (readonly)
Returns the value of attribute created_at.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def created_at @created_at end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def id @id end |
#job_id ⇒ Object (readonly)
Returns the value of attribute job_id.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def job_id @job_id end |
#message_id ⇒ Object (readonly)
Returns the value of attribute message_id.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def @message_id end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def name @name end |
#parent_task_id ⇒ Object (readonly)
Returns the value of attribute parent_task_id.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def parent_task_id @parent_task_id end |
#payload ⇒ Object (readonly)
Returns the value of attribute payload.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def payload @payload end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def queue_name @queue_name end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def status @status end |
#updated_at ⇒ Object (readonly)
Returns the value of attribute updated_at.
10 11 12 |
# File 'lib/qyu/models/task.rb', line 10 def updated_at @updated_at end |
Class Method Details
.acknowledge_message(queue_name, message_id) ⇒ Object
144 145 146 147 |
# File 'lib/qyu/models/task.rb', line 144 def self.(queue_name, ) Qyu.logger.debug "Acknowledging message with ID=#{} from queue `#{queue_name}`" Qyu.queue.(queue_name, ) end |
.create(queue_name: nil, attributes: nil) ⇒ Object
by defintion Task.create does 2 things:
-
persists the Task in the Store
-
enqueues the Task to the Queue
We have to make sure that a Task is unique in the Store. Because of this create first looks up if the task has already been persisted. If it exists then there is no need to persist it again, only to enqueue it. Double (or multiple) delivery of messages is allowed and handled at worker level. Possible scenario: A Job failed at some point. A few of its tasks completed successfully, others failed. Because of this, certain tasks haven’t even been created. When we restart the job, the tasks will be recreated. If a task has already existed, and completed, then that state will be unchanged, and when the worker picks it up, will notice the completed state, acknowledge the message, and continue the next steps.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/qyu/models/task.rb', line 30 def self.create(queue_name: nil, attributes: nil) fail Qyu::Errors::InvalidTaskAttributes unless valid_attributes?(attributes) fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name) Qyu.logger.debug "find_or_persist queue_name=#{queue_name} and attributes=#{attributes}" task_id = Qyu.store.find_or_persist_task( attributes['name'], queue_name, attributes['payload'], attributes['job_id'], attributes['parent_task_id'] ) do |t_id| Qyu.logger.debug "enqueue queue_name=#{queue_name} and task_id=#{t_id}" Qyu.queue.enqueue_task(queue_name, t_id) end new(task_id, attributes, queue_name) end |
.enqueue_in_failure_queue(queue_name, id, message_id) ⇒ Object
168 169 170 171 |
# File 'lib/qyu/models/task.rb', line 168 def self.enqueue_in_failure_queue(queue_name, id, ) Qyu.logger.debug "Enqueuing failed message with ID=#{} in #{queue_name} failures queue" Qyu.queue.enqueue_task_to_failed_queue(queue_name, id) end |
.fetch(queue_name) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/qyu/models/task.rb', line 49 def self.fetch(queue_name) fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name) begin = Qyu.queue.(queue_name) task_id = ['task_id'] task_attrs = Qyu.store.find_task(task_id) rescue => ex ||= {} raise Qyu::Errors::CouldNotFetchTask.new(queue_name, ['id'], ['task_id'], ex) end new(task_id, task_attrs, queue_name, ['id']) end |
.requeue(queue_name, id, message_id) ⇒ Object
156 157 158 159 160 |
# File 'lib/qyu/models/task.rb', line 156 def self.requeue(queue_name, id, ) # TODO For FIFO queues (future use) Qyu.logger.debug "Re-enqueuing message with ID=#{} in queue `#{queue_name}`" Qyu.queue.enqueue_task(queue_name, id) end |
.select(job_id:) ⇒ Object
62 63 64 65 66 |
# File 'lib/qyu/models/task.rb', line 62 def self.select(job_id:) Qyu.store.select_tasks_by_job_id(job_id).map do |task| new(task['id'], task, task['queue_name']) end end |
.valid_attributes?(_attributes) ⇒ Boolean
68 69 70 |
# File 'lib/qyu/models/task.rb', line 68 def self.valid_attributes?(_attributes) true end |
.valid_queue_name?(queue_name) ⇒ Boolean
72 73 74 |
# File 'lib/qyu/models/task.rb', line 72 def self.valid_queue_name?(queue_name) !queue_name.nil? && queue_name != '' end |
Instance Method Details
#[](attribute) ⇒ Object
177 178 179 |
# File 'lib/qyu/models/task.rb', line 177 def [](attribute) public_send(attribute) end |
#acknowledge_message ⇒ Object
139 140 141 142 |
# File 'lib/qyu/models/task.rb', line 139 def fail Qyu::Errors::MessageNotReceived if .nil? self.class.(queue_name, ) end |
#acknowledgeable? ⇒ Boolean
76 77 78 |
# File 'lib/qyu/models/task.rb', line 76 def acknowledgeable? @status.completed? || @status.invalid_payload? end |
#completed? ⇒ Boolean
80 81 82 |
# File 'lib/qyu/models/task.rb', line 80 def completed? @status.completed? end |
#enqueue_in_failure_queue ⇒ Object
162 163 164 165 166 |
# File 'lib/qyu/models/task.rb', line 162 def enqueue_in_failure_queue fail Qyu::Errors::MessageNotReceived if .nil? self.class.(queue_name, ) self.class.enqueue_in_failure_queue(queue_name, id, ) end |
#job ⇒ Object
173 174 175 |
# File 'lib/qyu/models/task.rb', line 173 def job @job ||= Qyu::Job.find(job_id) end |
#lock! ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/qyu/models/task.rb', line 88 def lock! fail Qyu::Errors::LockAlreadyAcquired if locked? Qyu.logger.debug "Task with ID=#{id} lock!" @lease_token, @locked_until = Qyu.store.lock_task!(id, Qyu.config.store[:lease_period]) Qyu.logger.debug "lease_token = #{@lease_token} | locked_until = #{@locked_until}" return false if @lease_token.nil? schedule_renewal true end |
#locked? ⇒ Boolean
84 85 86 |
# File 'lib/qyu/models/task.rb', line 84 def locked? !@lease_token.nil? && !@locked_until.nil? && Time.now < @locked_until end |
#mark_completed ⇒ Object
124 125 126 127 |
# File 'lib/qyu/models/task.rb', line 124 def mark_completed Qyu.store.update_status(id, Status::COMPLETED) Qyu.logger.info "Task with ID=#{id} marked completed." end |
#mark_failed ⇒ Object
129 130 131 132 |
# File 'lib/qyu/models/task.rb', line 129 def mark_failed Qyu.store.update_status(id, Status::FAILED) Qyu.logger.debug "Task with ID=#{id} marked failed." end |
#mark_invalid_payload ⇒ Object
134 135 136 137 |
# File 'lib/qyu/models/task.rb', line 134 def mark_invalid_payload Qyu.store.update_status(id, Status::INVALID_PAYLOAD) Qyu.logger.debug "Task with ID=#{id} has invalid payload." end |
#mark_queued ⇒ Object
114 115 116 117 |
# File 'lib/qyu/models/task.rb', line 114 def mark_queued Qyu.store.update_status(id, Status::QUEUED) Qyu.logger.debug "Task with ID=#{id} marked queued." end |
#mark_working ⇒ Object
119 120 121 122 |
# File 'lib/qyu/models/task.rb', line 119 def mark_working Qyu.store.update_status(id, Status::WORKING) Qyu.logger.debug "Task with ID=#{id} marked working." end |
#requeue ⇒ Object
149 150 151 152 153 154 |
# File 'lib/qyu/models/task.rb', line 149 def requeue # TODO For FIFO queues (future use) fail Qyu::Errors::MessageNotReceived if .nil? self.class.(queue_name, ) self.class.requeue(queue_name, id, ) end |
#unlock! ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/qyu/models/task.rb', line 100 def unlock! fail Qyu::Errors::LockNotAcquired unless locked? Qyu.logger.debug "Task with ID=#{id} unlocking!" @lease_thread&.kill success = Qyu.store.unlock_task!(id, @lease_token) if success @lease_token = nil @locked_until = nil end success end |