Class: Qyu::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/qyu/models/task.rb

Overview

A Task represents a unit of work in a workflow. Conceptually a Task:

  • may not exist outside the context of a queue.

  • it is created ON the queue

  • it remains on the queue until it was successfully processed (or failed “enough” times)

Constant Summary collapse

LEASE_PERCENTAGE_THRESHOLD_BEFORE_RENEWAL =
0.8
POLL_INTERVAL =
0.5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#created_atObject (readonly)

Returns the value of attribute created_at.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def created_at
  @created_at
end

#idObject (readonly)

Returns the value of attribute id.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def id
  @id
end

#job_idObject (readonly)

Returns the value of attribute job_id.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def job_id
  @job_id
end

#message_idObject (readonly)

Returns the value of attribute message_id.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def message_id
  @message_id
end

#nameObject (readonly)

Returns the value of attribute name.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def name
  @name
end

#parent_task_idObject (readonly)

Returns the value of attribute parent_task_id.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def parent_task_id
  @parent_task_id
end

#payloadObject (readonly)

Returns the value of attribute payload.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def payload
  @payload
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def queue_name
  @queue_name
end

#statusObject (readonly)

Returns the value of attribute status.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def status
  @status
end

#updated_atObject (readonly)

Returns the value of attribute updated_at.



10
11
12
# File 'lib/qyu/models/task.rb', line 10

def updated_at
  @updated_at
end

Class Method Details

.acknowledge_message(queue_name, message_id) ⇒ Object



144
145
146
147
# File 'lib/qyu/models/task.rb', line 144

def self.acknowledge_message(queue_name, message_id)
  Qyu.logger.debug "Acknowledging message with ID=#{message_id} from queue `#{queue_name}`"
  Qyu.queue.acknowledge_message(queue_name, message_id)
end

.create(queue_name: nil, attributes: nil) ⇒ Object

by defintion Task.create does 2 things:

  • persists the Task in the Store

  • enqueues the Task to the Queue

We have to make sure that a Task is unique in the Store. Because of this create first looks up if the task has already been persisted. If it exists then there is no need to persist it again, only to enqueue it. Double (or multiple) delivery of messages is allowed and handled at worker level. Possible scenario: A Job failed at some point. A few of its tasks completed successfully, others failed. Because of this, certain tasks haven’t even been created. When we restart the job, the tasks will be recreated. If a task has already existed, and completed, then that state will be unchanged, and when the worker picks it up, will notice the completed state, acknowledge the message, and continue the next steps.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/qyu/models/task.rb', line 30

def self.create(queue_name: nil, attributes: nil)
  fail Qyu::Errors::InvalidTaskAttributes unless valid_attributes?(attributes)
  fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name)
  Qyu.logger.debug "find_or_persist queue_name=#{queue_name} and attributes=#{attributes}"
  task_id = Qyu.store.find_or_persist_task(
    attributes['name'],
    queue_name,
    attributes['payload'],
    attributes['job_id'],
    attributes['parent_task_id']
  ) do |t_id|
    Qyu.logger.debug "enqueue queue_name=#{queue_name} and task_id=#{t_id}"
    Qyu.queue.enqueue_task(queue_name, t_id)
  end

  new(task_id, attributes, queue_name)
end

.enqueue_in_failure_queue(queue_name, id, message_id) ⇒ Object



168
169
170
171
# File 'lib/qyu/models/task.rb', line 168

def self.enqueue_in_failure_queue(queue_name, id, message_id)
  Qyu.logger.debug "Enqueuing failed message with ID=#{message_id} in #{queue_name} failures queue"
  Qyu.queue.enqueue_task_to_failed_queue(queue_name, id)
end

.fetch(queue_name) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/qyu/models/task.rb', line 49

def self.fetch(queue_name)
  fail Qyu::Errors::InvalidQueueName unless valid_queue_name?(queue_name)
  begin
    message    = Qyu.queue.fetch_next_message(queue_name)
    task_id    = message['task_id']
    task_attrs = Qyu.store.find_task(task_id)
  rescue => ex
    message ||= {}
    raise Qyu::Errors::CouldNotFetchTask.new(queue_name, message['id'], message['task_id'], ex)
  end
  new(task_id, task_attrs, queue_name, message['id'])
end

.requeue(queue_name, id, message_id) ⇒ Object



156
157
158
159
160
# File 'lib/qyu/models/task.rb', line 156

def self.requeue(queue_name, id, message_id)
  # TODO For FIFO queues (future use)
  Qyu.logger.debug "Re-enqueuing message with ID=#{message_id} in queue `#{queue_name}`"
  Qyu.queue.enqueue_task(queue_name, id)
end

.select(job_id:) ⇒ Object



62
63
64
65
66
# File 'lib/qyu/models/task.rb', line 62

def self.select(job_id:)
  Qyu.store.select_tasks_by_job_id(job_id).map do |task|
    new(task['id'], task, task['queue_name'])
  end
end

.valid_attributes?(_attributes) ⇒ Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/qyu/models/task.rb', line 68

def self.valid_attributes?(_attributes)
  true
end

.valid_queue_name?(queue_name) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/qyu/models/task.rb', line 72

def self.valid_queue_name?(queue_name)
  !queue_name.nil? && queue_name != ''
end

Instance Method Details

#[](attribute) ⇒ Object



177
178
179
# File 'lib/qyu/models/task.rb', line 177

def [](attribute)
  public_send(attribute)
end

#acknowledge_messageObject



139
140
141
142
# File 'lib/qyu/models/task.rb', line 139

def acknowledge_message
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
end

#acknowledgeable?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/qyu/models/task.rb', line 76

def acknowledgeable?
  @status.completed? || @status.invalid_payload?
end

#completed?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/qyu/models/task.rb', line 80

def completed?
  @status.completed?
end

#enqueue_in_failure_queueObject



162
163
164
165
166
# File 'lib/qyu/models/task.rb', line 162

def enqueue_in_failure_queue
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
  self.class.enqueue_in_failure_queue(queue_name, id, message_id)
end

#jobObject



173
174
175
# File 'lib/qyu/models/task.rb', line 173

def job
  @job ||= Qyu::Job.find(job_id)
end

#lock!Object



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/qyu/models/task.rb', line 88

def lock!
  fail Qyu::Errors::LockAlreadyAcquired if locked?
  Qyu.logger.debug "Task with ID=#{id} lock!"

  @lease_token, @locked_until = Qyu.store.lock_task!(id, Qyu.config.store[:lease_period])
  Qyu.logger.debug "lease_token = #{@lease_token} | locked_until = #{@locked_until}"
  return false if @lease_token.nil?

  schedule_renewal
  true
end

#locked?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/qyu/models/task.rb', line 84

def locked?
  !@lease_token.nil? && !@locked_until.nil? && Time.now < @locked_until
end

#mark_completedObject



124
125
126
127
# File 'lib/qyu/models/task.rb', line 124

def mark_completed
  Qyu.store.update_status(id, Status::COMPLETED)
  Qyu.logger.info "Task with ID=#{id} marked completed."
end

#mark_failedObject



129
130
131
132
# File 'lib/qyu/models/task.rb', line 129

def mark_failed
  Qyu.store.update_status(id, Status::FAILED)
  Qyu.logger.debug "Task with ID=#{id} marked failed."
end

#mark_invalid_payloadObject



134
135
136
137
# File 'lib/qyu/models/task.rb', line 134

def mark_invalid_payload
  Qyu.store.update_status(id, Status::INVALID_PAYLOAD)
  Qyu.logger.debug "Task with ID=#{id} has invalid payload."
end

#mark_queuedObject



114
115
116
117
# File 'lib/qyu/models/task.rb', line 114

def mark_queued
  Qyu.store.update_status(id, Status::QUEUED)
  Qyu.logger.debug "Task with ID=#{id} marked queued."
end

#mark_workingObject



119
120
121
122
# File 'lib/qyu/models/task.rb', line 119

def mark_working
  Qyu.store.update_status(id, Status::WORKING)
  Qyu.logger.debug "Task with ID=#{id} marked working."
end

#requeueObject



149
150
151
152
153
154
# File 'lib/qyu/models/task.rb', line 149

def requeue
  # TODO For FIFO queues (future use)
  fail Qyu::Errors::MessageNotReceived if message_id.nil?
  self.class.acknowledge_message(queue_name, message_id)
  self.class.requeue(queue_name, id, message_id)
end

#unlock!Object



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/qyu/models/task.rb', line 100

def unlock!
  fail Qyu::Errors::LockNotAcquired unless locked?
  Qyu.logger.debug "Task with ID=#{id} unlocking!"

  @lease_thread&.kill
  success = Qyu.store.unlock_task!(id, @lease_token)
  if success
    @lease_token  = nil
    @locked_until = nil
  end

  success
end