Class: Resque::Durable::QueueAudit

Inherits:
ActiveRecord::Base
  • Object
show all
Extended by:
Recovery
Defined in:
lib/resque/durable/queue_audit.rb

Defined Under Namespace

Modules: Recovery

Constant Summary collapse

JobCollision =
Class.new(StandardError)
DEFAULT_DURATION =

id enqueued_id queue_name payload enqueue_count enqueued_at completed_at timeout_at updated_at created_at

10.minutes

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Recovery

cleanup, recover

Class Method Details

.initialize_by_klass_and_args(job_klass, args) ⇒ Object



55
56
57
# File 'lib/resque/durable/queue_audit.rb', line 55

def self.initialize_by_klass_and_args(job_klass, args)
  new(:job_klass => job_klass, :payload => args, :enqueued_id => GUID.generate)
end

Instance Method Details

#complete!Object



114
115
116
117
# File 'lib/resque/durable/queue_audit.rb', line 114

def complete!
  self.completed_at = Time.now.utc
  save!
end

#complete?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/resque/durable/queue_audit.rb', line 119

def complete?
  completed_at.present?
end

#delayObject

1, 8, 27, 64, 125, 216, etc. minutes.



128
129
130
# File 'lib/resque/durable/queue_audit.rb', line 128

def delay
  (enqueue_count ** 3).minutes
end

#durationObject



83
84
85
# File 'lib/resque/durable/queue_audit.rb', line 83

def duration
  job_klass.job_timeout
end

#enqueueObject



79
80
81
# File 'lib/resque/durable/queue_audit.rb', line 79

def enqueue
  job_klass.enqueue(*(payload.push(becomes(job_klass.auditor))))
end

#enqueued!Object



107
108
109
110
111
112
# File 'lib/resque/durable/queue_audit.rb', line 107

def enqueued!
  self.enqueued_at    = Time.now.utc
  self.timeout_at     = enqueued_at + duration
  self.enqueue_count += 1
  save!
end

#fail!Object



103
104
105
# File 'lib/resque/durable/queue_audit.rb', line 103

def fail!
  update_attribute(:timeout_at, Time.now.utc)
end

#heartbeat!Object



87
88
89
# File 'lib/resque/durable/queue_audit.rb', line 87

def heartbeat!
  update_attribute(:timeout_at, Time.now.utc + duration)
end

#job_klassObject



59
60
61
# File 'lib/resque/durable/queue_audit.rb', line 59

def job_klass
  read_attribute(:job_klass).constantize
end

#job_klass=(klass) ⇒ Object



63
64
65
# File 'lib/resque/durable/queue_audit.rb', line 63

def job_klass=(klass)
  write_attribute(:job_klass, klass.to_s)
end

#optimistic_heartbeat!(last_timeout_at) ⇒ Object

Bumps the ‘timeout_at` column, but raises a `JobCollision` exception if another process has changed the value, indicating we may have multiple workers processing the same job.

Raises:



94
95
96
97
98
99
100
101
# File 'lib/resque/durable/queue_audit.rb', line 94

def optimistic_heartbeat!(last_timeout_at)
  next_timeout_at = Time.now.utc + duration
  nrows = self.class.
    where(id: id, timeout_at: last_timeout_at).
    update_all(timeout_at: next_timeout_at)
  raise JobCollision.new unless nrows == 1
  next_timeout_at
end

#payloadObject



67
68
69
# File 'lib/resque/durable/queue_audit.rb', line 67

def payload
  ActiveSupport::JSON.decode(super)
end

#payload=(value) ⇒ Object



71
72
73
# File 'lib/resque/durable/queue_audit.rb', line 71

def payload=(value)
  super value.to_json
end

#queueObject



75
76
77
# File 'lib/resque/durable/queue_audit.rb', line 75

def queue
  Resque.queue_from_class(job_klass)
end

#reset_backoff!(timeout_at = Time.now.utc) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/resque/durable/queue_audit.rb', line 132

def reset_backoff!(timeout_at = Time.now.utc)
  # Set timeout_at = Time.now and enqueue_count = 1 so
  # the job can be picked up by the Durable Monitor asap.
  self.timeout_at = timeout_at
  self.enqueue_count = 1
  save!
end

#retryable?Boolean

Returns:

  • (Boolean)


123
124
125
# File 'lib/resque/durable/queue_audit.rb', line 123

def retryable?
  Time.now.utc > (timeout_at + delay)
end