Class: Delayed::Backend::ActiveRecord::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/active_record.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_forkObject



40
41
42
# File 'lib/delayed/backend/active_record.rb', line 40

def self.after_fork
  ::ActiveRecord::Base.establish_connection
end

.before_forkObject



36
37
38
# File 'lib/delayed/backend/active_record.rb', line 36

def self.before_fork
  ::ActiveRecord::Base.clear_all_connections!
end

.by_priorityObject



25
26
27
# File 'lib/delayed/backend/active_record.rb', line 25

def self.by_priority
  order('priority ASC, run_at ASC')
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



45
46
47
# File 'lib/delayed/backend/active_record.rb', line 45

def self.clear_locks!(worker_name)
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



87
88
89
90
91
92
93
94
95
# File 'lib/delayed/backend/active_record.rb', line 87

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif ::ActiveRecord::Base.default_timezone == :utc
    Time.now.utc
  else
    Time.now
  end
end

.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object

Find a few candidate jobs to run (in case some immediately get locked by others).



50
51
52
53
54
55
56
57
58
59
# File 'lib/delayed/backend/active_record.rb', line 50

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  scope = self.ready_to_run(worker_name, max_run_time)
  scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
  scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
  scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?

  ::ActiveRecord::Base.silence do
    scope.by_priority.all(:limit => limit)
  end
end

.rails3?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/delayed/backend/active_record.rb', line 16

def self.rails3?
  ::ActiveRecord::VERSION::MAJOR == 3
end

.ready_to_run(worker_name, max_run_time) ⇒ Object



22
23
24
# File 'lib/delayed/backend/active_record.rb', line 22

def self.ready_to_run(worker_name, max_run_time)
  where('(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name)
end

Instance Method Details

#lock_exclusively!(max_run_time, worker) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/delayed/backend/active_record.rb', line 63

def lock_exclusively!(max_run_time, worker)
  now = self.class.db_time_now
  affected_rows = if locked_by != worker
    # We don't own this job so we will update the locked_by name and the locked_at
    self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
  else
    # We already own this job, this may happen if the job queue crashes.
    # Simply resume and update the locked_at
    self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  end
  if affected_rows == 1
    self.locked_at = now
    self.locked_by = worker
    self.locked_at_will_change!
    self.locked_by_will_change!
    return true
  else
    return false
  end
end

#reload(*args) ⇒ Object



97
98
99
100
# File 'lib/delayed/backend/active_record.rb', line 97

def reload(*args)
  reset
  super
end