Class: Delayed::Backend::Mongoid::Job

Inherits:
Object
  • Object
show all
Includes:
Base, Mongoid::Document, Mongoid::Timestamps
Defined in:
lib/delayed/backend/mongoid.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_forkObject

Hook method that is called after a new worker is forked



87
88
89
90
91
92
# File 'lib/delayed/backend/mongoid.rb', line 87

def self.after_fork
  if ::Mongoid::Compatibility::Version.mongoid4?
    # to avoid `failed with error "unauthorized"` errors in Mongoid 4.0.alpha2
    ::Mongoid.default_session.disconnect
  end
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



77
78
79
# File 'lib/delayed/backend/mongoid.rb', line 77

def self.clear_locks!(worker_name)
  where(:locked_by => worker_name).update_all(:locked_at => nil, :locked_by => nil)
end

.db_time_nowObject



28
29
30
# File 'lib/delayed/backend/mongoid.rb', line 28

def self.db_time_now
  Time.now.utc
end

.reservation_criteria(worker, right_now, max_run_time) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Mongo criteria matching all the jobs the worker can reserver

Jobs are sorted by priority and run_at.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/delayed/backend/mongoid.rb', line 58

def self.reservation_criteria(worker, right_now, max_run_time)
  criteria = where(
    :run_at => {'$lte' => right_now},
    :failed_at => nil
  ).any_of(
    {:locked_by => worker.name},
    {:locked_at => nil},
    :locked_at => {'$lt' => (right_now - max_run_time)}
  )

  criteria = criteria.gte(:priority => Worker.min_priority.to_i) if Worker.min_priority
  criteria = criteria.lte(:priority => Worker.max_priority.to_i) if Worker.max_priority
  criteria = criteria.any_in(:queue => Worker.queues) if Worker.queues.any?
  criteria = criteria.desc(:locked_by).asc(:priority).asc(:run_at)

  criteria
end

.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object

Reserves one job for the worker.

Atomically picks and locks one job from the collection.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/delayed/backend/mongoid.rb', line 35

def self.reserve(worker, max_run_time = Worker.max_run_time)
  right_now = db_time_now

  criteria = reservation_criteria worker, right_now, max_run_time

  if ::Mongoid::Compatibility::Version.mongoid5?
    criteria.find_one_and_update(
      {'$set' => {:locked_at => right_now, :locked_by => worker.name}},
      :return_document => :after
    )
  else
    criteria.find_and_modify(
      {'$set' => {:locked_at => right_now, :locked_by => worker.name}},
      :new => true
    )
  end
end

Instance Method Details

#reload(*args) ⇒ Object



81
82
83
84
# File 'lib/delayed/backend/mongoid.rb', line 81

def reload(*args)
  reset
  super
end