Class: Delayed::Backend::DataMapper::Job

Inherits:
Object
  • Object
show all
Includes:
DataMapper::Resource, Base
Defined in:
lib/delayed/backend/data_mapper.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



63
64
65
# File 'lib/delayed/backend/data_mapper.rb', line 63

def self.clear_locks!(worker_name)
  all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil)
end

.db_time_nowObject



24
25
26
# File 'lib/delayed/backend/data_mapper.rb', line 24

def self.db_time_now
  Time.now.utc.to_datetime
end

.delete_allObject

these are common to the other backends, so we provide an implementation



100
101
102
# File 'lib/delayed/backend/data_mapper.rb', line 100

def self.delete_all
  Delayed::Job.auto_migrate!
end

.expired(max_run_time = Worker.max_run_time) ⇒ Object



34
35
36
37
38
39
# File 'lib/delayed/backend/data_mapper.rb', line 34

def self.expired(max_run_time = Worker.max_run_time)
  (
    all(:locked_at => nil) | # never locked
    all(:locked_at.lt => db_time_now - max_run_time) # lock expired
  )
end

.find(id) ⇒ Object



104
105
106
# File 'lib/delayed/backend/data_mapper.rb', line 104

def self.find id
  get id
end

.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object



53
54
55
56
57
58
59
60
# File 'lib/delayed/backend/data_mapper.rb', line 53

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  simple_conditions = {:limit => limit, :order => [:priority.asc, :run_at.asc]}
  simple_conditions[:priority.gte] = Worker.min_priority if Worker.min_priority
  simple_conditions[:priority.lte] = Worker.max_priority if Worker.max_priority
  simple_conditions[:queue] = Worker.queues if Worker.queues.any?

  lockable(worker_name, max_run_time).all(simple_conditions)
end

.lockable(worker_name, max_run_time = Worker.max_run_time) ⇒ Object



28
29
30
31
32
# File 'lib/delayed/backend/data_mapper.rb', line 28

def self.lockable(worker_name, max_run_time = Worker.max_run_time)
  never_failed &
  never_run &
  (locked_by(worker_name) | expired(max_run_time))
end

.locked_by(worker_name) ⇒ Object



41
42
43
# File 'lib/delayed/backend/data_mapper.rb', line 41

def self.locked_by(worker_name)
  all(:locked_by => worker_name)
end

.never_failedObject



49
50
51
# File 'lib/delayed/backend/data_mapper.rb', line 49

def self.never_failed
  all(:failed_at => nil)
end

.never_runObject



45
46
47
# File 'lib/delayed/backend/data_mapper.rb', line 45

def self.never_run
  (all(:run_at => nil) | all(:run_at.lte => db_time_now))
end

Instance Method Details

#==(other) ⇒ Object



120
121
122
# File 'lib/delayed/backend/data_mapper.rb', line 120

def ==(other)
  id == other.id
end

#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/delayed/backend/data_mapper.rb', line 69

def lock_exclusively!(max_run_time, worker = worker_name)
  now = self.class.db_time_now

  # FIXME - this is a bit gross
  # DM doesn't give us the number of rows affected by a collection update
  # so we have to circumvent some niceness in DM::Collection here
  collection = if locked_by != worker
    self.class.expired(max_run_time).never_run.all(:id => id)
  else
    self.class.locked_by(worker).all(:id => id)
  end

  attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes
  affected_rows = self.repository.update(attributes, collection)

  if affected_rows == 1
    reload # pick up the updates above
    true
  else
    # does this mean > 1 was locked, or none?
    false
  end
end

#reload(*args) ⇒ Object



115
116
117
118
# File 'lib/delayed/backend/data_mapper.rb', line 115

def reload(*args)
  reset
  super
end

#reschedule_atObject



93
94
95
96
97
# File 'lib/delayed/backend/data_mapper.rb', line 93

def reschedule_at
  payload_object.respond_to?(:reschedule_at) ?
    payload_object.reschedule_at(self.class.db_time_now, attempts) :
    self.class.db_time_now + ((attempts ** 4) + 5).seconds
end

#update_attributes(attributes) ⇒ Object



108
109
110
111
112
113
# File 'lib/delayed/backend/data_mapper.rb', line 108

def update_attributes(attributes)
  attributes.each do |k,v|
    self[k] = v
  end
  self.save
end