Class: Delayed::Backend::DataMapper::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::DataMapper::Job
- Includes:
- DataMapper::Resource, Base
- Defined in:
- lib/delayed/backend/data_mapper.rb
Class Method Summary collapse
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
- .db_time_now ⇒ Object
-
.delete_all ⇒ Object
these are common to the other backends, so we provide an implementation.
- .expired(max_run_time = Worker.max_run_time) ⇒ Object
- .find(id) ⇒ Object
- .find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
- .lockable(worker_name, max_run_time = Worker.max_run_time) ⇒ Object
- .locked_by(worker_name) ⇒ Object
- .never_failed ⇒ Object
- .never_run ⇒ Object
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker.
- #reload(*args) ⇒ Object
- #reschedule_at ⇒ Object
- #update_attributes(attributes) ⇒ Object
Class Method Details
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
63 64 65 |
# File 'lib/delayed/backend/data_mapper.rb', line 63 def self.clear_locks!(worker_name) all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil) end |
.db_time_now ⇒ Object
24 25 26 |
# File 'lib/delayed/backend/data_mapper.rb', line 24 def self.db_time_now Time.now.utc.to_datetime end |
.delete_all ⇒ Object
these are common to the other backends, so we provide an implementation
100 101 102 |
# File 'lib/delayed/backend/data_mapper.rb', line 100 def self.delete_all Delayed::Job.auto_migrate! end |
.expired(max_run_time = Worker.max_run_time) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/delayed/backend/data_mapper.rb', line 34 def self.expired(max_run_time = Worker.max_run_time) ( all(:locked_at => nil) | # never locked all(:locked_at.lt => db_time_now - max_run_time) # lock expired ) end |
.find(id) ⇒ Object
104 105 106 |
# File 'lib/delayed/backend/data_mapper.rb', line 104 def self.find id get id end |
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/delayed/backend/data_mapper.rb', line 53 def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) simple_conditions = {:limit => limit, :order => [:priority.asc, :run_at.asc]} simple_conditions[:priority.gte] = Worker.min_priority if Worker.min_priority simple_conditions[:priority.lte] = Worker.max_priority if Worker.max_priority simple_conditions[:queue] = Worker.queues if Worker.queues.any? lockable(worker_name, max_run_time).all(simple_conditions) end |
.lockable(worker_name, max_run_time = Worker.max_run_time) ⇒ Object
28 29 30 31 32 |
# File 'lib/delayed/backend/data_mapper.rb', line 28 def self.lockable(worker_name, max_run_time = Worker.max_run_time) never_failed & never_run & (locked_by(worker_name) | expired(max_run_time)) end |
.locked_by(worker_name) ⇒ Object
41 42 43 |
# File 'lib/delayed/backend/data_mapper.rb', line 41 def self.locked_by(worker_name) all(:locked_by => worker_name) end |
.never_failed ⇒ Object
49 50 51 |
# File 'lib/delayed/backend/data_mapper.rb', line 49 def self.never_failed all(:failed_at => nil) end |
.never_run ⇒ Object
45 46 47 |
# File 'lib/delayed/backend/data_mapper.rb', line 45 def self.never_run (all(:run_at => nil) | all(:run_at.lte => db_time_now)) end |
Instance Method Details
#==(other) ⇒ Object
120 121 122 |
# File 'lib/delayed/backend/data_mapper.rb', line 120 def ==(other) id == other.id end |
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/delayed/backend/data_mapper.rb', line 69 def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now # FIXME - this is a bit gross # DM doesn't give us the number of rows affected by a collection update # so we have to circumvent some niceness in DM::Collection here collection = if locked_by != worker self.class.expired(max_run_time).never_run.all(:id => id) else self.class.locked_by(worker).all(:id => id) end attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes affected_rows = self.repository.update(attributes, collection) if affected_rows == 1 reload # pick up the updates above true else # does this mean > 1 was locked, or none? false end end |
#reload(*args) ⇒ Object
115 116 117 118 |
# File 'lib/delayed/backend/data_mapper.rb', line 115 def reload(*args) reset super end |
#reschedule_at ⇒ Object
93 94 95 96 97 |
# File 'lib/delayed/backend/data_mapper.rb', line 93 def reschedule_at payload_object.respond_to?(:reschedule_at) ? payload_object.reschedule_at(self.class.db_time_now, attempts) : self.class.db_time_now + ((attempts ** 4) + 5).seconds end |
#update_attributes(attributes) ⇒ Object
108 109 110 111 112 113 |
# File 'lib/delayed/backend/data_mapper.rb', line 108 def update_attributes(attributes) attributes.each do |k,v| self[k] = v end self.save end |