Class: Qyu::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/qyu/models/task.rb

Overview

Qyu::Task A Task represents a unit of work in a workflow. Conceptually a Task:

  • may not exist outside the context of a queue.

  • it is created ON the queue

  • it remains on the queue until it was successfully processed (or failed “enough” times)

Constant Summary collapse

LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL =
0.8
POLL_INTERVAL =
0.5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#created_atObject (readonly)

Returns the value of attribute created_at


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def created_at
  @created_at
end

#idObject (readonly)

Returns the value of attribute id


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def id
  @id
end

#job_idObject (readonly)

Returns the value of attribute job_id


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def job_id
  @job_id
end

#message_idObject (readonly)

Returns the value of attribute message_id


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def message_id
  @message_id
end

#nameObject (readonly)

Returns the value of attribute name


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def name
  @name
end

#parent_task_idObject (readonly)

Returns the value of attribute parent_task_id


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def parent_task_id
  @parent_task_id
end

#payloadObject (readonly)

Returns the value of attribute payload


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def payload
  @payload
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def queue_name
  @queue_name
end

#statusObject (readonly)

Returns the value of attribute status


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def status
  @status
end

#updated_atObject (readonly)

Returns the value of attribute updated_at


11
12
13
# File 'lib/qyu/models/task.rb', line 11

def updated_at
  @updated_at
end

Class Method Details

.acknowledge_message(queue_name, message_id) ⇒ Object


145
146
147
148
# File 'lib/qyu/models/task.rb', line 145

def self.acknowledge_message(queue_name, message_id)
  Qyu.logger.debug "Acknowledging message with ID=#{message_id} from queue `#{queue_name}`"
  Qyu.queue.acknowledge_message(queue_name, message_id)
end

.create(queue_name: nil, attributes: nil) ⇒ Object

by defintion Task.create does 2 things:

  • persists the Task in the Store

  • enqueues the Task to the Queue

We have to make sure that a Task is unique in the Store. Because of this create first looks up if the task has already been persisted. If it exists then there is no need to persist it again, only to enqueue it. Double (or multiple) delivery of messages is allowed and handled at worker level. Possible scenario: A Job failed at some point. A few of its tasks completed successfully, others failed. Because of this, certain tasks haven't even been created. When we restart the job, the tasks will be recreated. If a task has already existed, and completed, then that state will be unchanged, and when the worker picks it up, will notice the completed state, acknowledge the message, and continue the next steps.


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/qyu/models/task.rb', line 31

def self.create(queue_name: nil, attributes: nil)
  fail Qyu::Errors::InvalidTaskAttributes unless valid_attributes?(attributes)
  fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name)
  Qyu.logger.debug "find_or_persist queue_name=#{queue_name} and attributes=#{attributes}"
  task_id = Qyu.store.find_or_persist_task(
    attributes['name'],
    queue_name,
    attributes['payload'],
    attributes['job_id'],
    attributes['parent_task_id']
  ) do |t_id|
    Qyu.logger.debug "enqueue queue_name=#{queue_name} and task_id=#{t_id}"
    Qyu.queue.enqueue_task(queue_name, t_id)
  end

  new(task_id, attributes, queue_name)
end

.enqueue_in_failure_queue(queue_name, id, message_id) ⇒ Object


169
170
171
172
# File 'lib/qyu/models/task.rb', line 169

def self.enqueue_in_failure_queue(queue_name, id, message_id)
  Qyu.logger.debug "Enqueuing failed message with ID=#{message_id} in #{queue_name} failures queue"
  Qyu.queue.enqueue_task_to_failed_queue(queue_name, id)
end

.fetch(queue_name) ⇒ Object


50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/qyu/models/task.rb', line 50

def self.fetch(queue_name)
  fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name)
  begin
    message    = Qyu.queue.fetch_next_message(queue_name)
    task_id    = message['task_id']
    task_attrs = Qyu.store.find_task(task_id)
  rescue => ex
    message ||= {}
    raise Qyu::Errors::CouldNotFetchTask.new(queue_name, message['id'], message['task_id'], ex)
  end
  new(task_id, task_attrs, queue_name, message['id'])
end

.requeue(queue_name, id, message_id) ⇒ Object


157
158
159
160
161
# File 'lib/qyu/models/task.rb', line 157

def self.requeue(queue_name, id, message_id)
  # TODO For FIFO queues (future use)
  Qyu.logger.debug "Re-enqueuing message with ID=#{message_id} in queue `#{queue_name}`"
  Qyu.queue.enqueue_task(queue_name, id)
end

.select(job_id:) ⇒ Object


63
64
65
66
67
# File 'lib/qyu/models/task.rb', line 63

def self.select(job_id:)
  Qyu.store.select_tasks_by_job_id(job_id).map do |task|
    new(task['id'], task, task['queue_name'])
  end
end

.valid_attributes?(_attributes) ⇒ Boolean


69
70
71
# File 'lib/qyu/models/task.rb', line 69

def self.valid_attributes?(_attributes)
  true
end

.valid_queue_name?(queue_name) ⇒ Boolean


73
74
75
# File 'lib/qyu/models/task.rb', line 73

def self.valid_queue_name?(queue_name)
  !queue_name.nil? && queue_name != ''
end

Instance Method Details

#[](attribute) ⇒ Object


202
203
204
# File 'lib/qyu/models/task.rb', line 202

def [](attribute)
  public_send(attribute)
end

#acknowledge_messageObject


140
141
142
143
# File 'lib/qyu/models/task.rb', line 140

def acknowledge_message
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
end

#acknowledgeable?Boolean


77
78
79
# File 'lib/qyu/models/task.rb', line 77

def acknowledgeable?
  @status.completed? || @status.invalid_payload?
end

#completed?Boolean


81
82
83
# File 'lib/qyu/models/task.rb', line 81

def completed?
  @status.completed?
end

#descriptorHash

Returns task descriptor


198
199
200
# File 'lib/qyu/models/task.rb', line 198

def descriptor
  workflow_descriptor['tasks'][name]
end

#enqueue_in_failure_queueObject


163
164
165
166
167
# File 'lib/qyu/models/task.rb', line 163

def enqueue_in_failure_queue
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
  self.class.enqueue_in_failure_queue(queue_name, id, message_id)
end

#jobQyu::Job

Returns parent job


191
192
193
# File 'lib/qyu/models/task.rb', line 191

def job
  @job ||= Qyu::Job.find(job_id)
end

#lock!Object


89
90
91
92
93
94
95
96
97
98
99
# File 'lib/qyu/models/task.rb', line 89

def lock!
  fail Qyu::Errors::LockAlreadyAcquired if locked?
  Qyu.logger.debug "Task with ID=#{id} lock!"

  @lease_token, @locked_until = Qyu.store.lock_task!(id, Qyu.config.store[:lease_period])
  Qyu.logger.debug "lease_token = #{@lease_token} | locked_until = #{@locked_until}"
  return false if @lease_token.nil?

  schedule_renewal
  true
end

#locked?Boolean


85
86
87
# File 'lib/qyu/models/task.rb', line 85

def locked?
  !@lease_token.nil? && !@locked_until.nil? && Time.now < @locked_until
end

#mark_completedObject


125
126
127
128
# File 'lib/qyu/models/task.rb', line 125

def mark_completed
  Qyu.store.update_status(id, Status::COMPLETED)
  Qyu.logger.info "Task with ID=#{id} marked completed."
end

#mark_failedObject


130
131
132
133
# File 'lib/qyu/models/task.rb', line 130

def mark_failed
  Qyu.store.update_status(id, Status::FAILED)
  Qyu.logger.debug "Task with ID=#{id} marked failed."
end

#mark_invalid_payloadObject


135
136
137
138
# File 'lib/qyu/models/task.rb', line 135

def mark_invalid_payload
  Qyu.store.update_status(id, Status::INVALID_PAYLOAD)
  Qyu.logger.debug "Task with ID=#{id} has invalid payload."
end

#mark_queuedObject


115
116
117
118
# File 'lib/qyu/models/task.rb', line 115

def mark_queued
  Qyu.store.update_status(id, Status::QUEUED)
  Qyu.logger.debug "Task with ID=#{id} marked queued."
end

#mark_workingObject


120
121
122
123
# File 'lib/qyu/models/task.rb', line 120

def mark_working
  Qyu.store.update_status(id, Status::WORKING)
  Qyu.logger.debug "Task with ID=#{id} marked working."
end

#requeueObject


150
151
152
153
154
155
# File 'lib/qyu/models/task.rb', line 150

def requeue
  # TODO For FIFO queues (future use)
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
  self.class.requeue(queue_name, id, message_id)
end

#unlock!Object


101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/qyu/models/task.rb', line 101

def unlock!
  fail Qyu::Errors::LockNotAcquired unless locked?
  Qyu.logger.debug "Task with ID=#{id} unlocking!"

  @lease_thread&.kill
  success = Qyu.store.unlock_task!(id, @lease_token)
  if success
    @lease_token  = nil
    @locked_until = nil
  end

  success
end

#workflowQyu::Workflow

Returns workflow specified in parent job


177
178
179
# File 'lib/qyu/models/task.rb', line 177

def workflow
  job.workflow
end

#workflow_descriptorHash

Returns workflow descriptor from parent job


184
185
186
# File 'lib/qyu/models/task.rb', line 184

def workflow_descriptor
  job.descriptor
end