Class: Delayed::Backend::Mongoid::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::Mongoid::Job
- Includes:
- Base, Mongoid::Document, Mongoid::Timestamps
- Defined in:
- lib/delayed/backend/mongoid.rb
Class Method Summary collapse
-
.after_fork ⇒ Object
Hook method that is called after a new worker is forked.
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
- .db_time_now ⇒ Object
-
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
Reserves this job for the worker.
Instance Method Summary collapse
Class Method Details
.after_fork ⇒ Object
Hook method that is called after a new worker is forked
67 68 69 70 |
# File 'lib/delayed/backend/mongoid.rb', line 67 def self.after_fork # to avoid `failed with error "unauthorized"` errors in Mongoid 4.0.alpha2 ::Mongoid.default_session.disconnect end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
57 58 59 |
# File 'lib/delayed/backend/mongoid.rb', line 57 def self.clear_locks!(worker_name) where(:locked_by => worker_name).update_all(:locked_at => nil, :locked_by => nil) end |
.db_time_now ⇒ Object
27 28 29 |
# File 'lib/delayed/backend/mongoid.rb', line 27 def self.db_time_now Time.now.utc end |
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
Reserves this job for the worker.
Uses Mongo’s findAndModify operation to atomically pick and lock one job from from the collection.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/delayed/backend/mongoid.rb', line 35 def self.reserve(worker, max_run_time = Worker.max_run_time) right_now = db_time_now criteria = where( :run_at => {'$lte' => right_now}, :failed_at => nil ).any_of( {:locked_by => worker.name}, {:locked_at => nil}, {:locked_at => {'$lt' => (right_now - max_run_time)}} ) criteria = criteria.gte(:priority => Worker.min_priority.to_i) if Worker.min_priority criteria = criteria.lte(:priority => Worker.max_priority.to_i) if Worker.max_priority criteria = criteria.any_in(:queue => Worker.queues) if Worker.queues.any? criteria.desc(:locked_by).asc(:priority).asc(:run_at).find_and_modify( {'$set' => {:locked_at => right_now, :locked_by => worker.name}}, :new => true ) end |
Instance Method Details
#reload(*args) ⇒ Object
61 62 63 64 |
# File 'lib/delayed/backend/mongoid.rb', line 61 def reload(*args) reset super end |