Class: Que::Job
- Inherits:
-
Object
- Object
- Que::Job
- Defined in:
- lib/que/job.rb
Class Attribute Summary collapse
-
.retry_interval ⇒ Object
readonly
Returns the value of attribute retry_interval.
Instance Attribute Summary collapse
-
#_error ⇒ Object
readonly
Returns the value of attribute _error.
-
#attrs ⇒ Object
readonly
Returns the value of attribute attrs.
Class Method Summary collapse
Instance Method Summary collapse
- #_run ⇒ Object
-
#initialize(attrs) ⇒ Job
constructor
A new instance of Job.
-
#run(*args) ⇒ Object
Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.
Constructor Details
#initialize(attrs) ⇒ Job
Returns a new instance of Job.
7 8 9 |
# File 'lib/que/job.rb', line 7 def initialize(attrs) @attrs = attrs end |
Class Attribute Details
.retry_interval ⇒ Object (readonly)
Returns the value of attribute retry_interval.
60 61 62 |
# File 'lib/que/job.rb', line 60 def retry_interval @retry_interval end |
Instance Attribute Details
#_error ⇒ Object (readonly)
Returns the value of attribute _error.
5 6 7 |
# File 'lib/que/job.rb', line 5 def _error @_error end |
#attrs ⇒ Object (readonly)
Returns the value of attribute attrs.
5 6 7 |
# File 'lib/que/job.rb', line 5 def attrs @attrs end |
Class Method Details
.enqueue(*args) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/que/job.rb', line 62 def enqueue(*args) if args.last.is_a?(Hash) = args.pop queue = .delete(:queue) || '' if .key?(:queue) job_class = .delete(:job_class) run_at = .delete(:run_at) priority = .delete(:priority) args << if .any? end attrs = {:job_class => job_class || to_s, :args => args} warn "@default_run_at in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @run_at instead." if @default_run_at if t = run_at || @run_at && @run_at.call || @default_run_at && @default_run_at.call attrs[:run_at] = t end warn "@default_priority in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @priority instead." if @default_priority if p = priority || @priority || @default_priority attrs[:priority] = p end if q = queue || @queue attrs[:queue] = q end if Que.mode == :sync && !t run(*attrs[:args]) else values = Que.execute(:insert_job, attrs.values_at(:queue, :priority, :run_at, :job_class, :args)).first Que.adapter.wake_worker_after_commit unless t new(values) end end |
.queue(*args) ⇒ Object
99 100 101 102 |
# File 'lib/que/job.rb', line 99 def queue(*args) warn "#{to_s}.queue(*args) is deprecated and will be removed in Que version 1.0.0. Please use #{to_s}.enqueue(*args) instead." enqueue(*args) end |
.run(*args) ⇒ Object
104 105 106 107 |
# File 'lib/que/job.rb', line 104 def run(*args) # Should not fail if there's no DB connection. new(:args => args).tap { |job| job.run(*args) } end |
.work(queue = '') ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/que/job.rb', line 109 def work(queue = '') # Since we're taking session-level advisory locks, we have to hold the # same connection throughout the process of getting a job, working it, # deleting it, and removing the lock. return_value = Que.adapter.checkout do begin if job = Que.execute(:lock_job, [queue]).first # Edge case: It's possible for the lock_job query to have # grabbed a job that's already been worked, if it took its MVCC # snapshot while the job was processing, but didn't attempt the # advisory lock until it was finished. Since we have the lock, a # previous worker would have deleted it by now, so we just # double check that it still exists before working it. # Note that there is currently no spec for this behavior, since # I'm not sure how to reliably commit a transaction that deletes # the job in a separate thread between lock_job and check_job. if Que.execute(:check_job, job.values_at(:queue, :priority, :run_at, :job_id)).none? {:event => :job_race_condition} else klass = class_for(job[:job_class]) instance = klass.new(job) instance._run if e = instance._error {:event => :job_errored, :job => job, :error => e} else {:event => :job_worked, :job => job} end end else {:event => :job_unavailable} end rescue => error begin if job count = job[:error_count].to_i + 1 interval = klass && klass.respond_to?(:retry_interval) && klass.retry_interval || retry_interval delay = interval.respond_to?(:call) ? interval.call(count) : interval = (error) Que.execute :set_error, [delay, ] + job.values_at(:queue, :priority, :run_at, :job_id) end rescue # If we can't reach the database for some reason, too bad, but # don't let it crash the work loop. end if Que.error_notifier # Similarly, protect the work loop from a failure of the error notifier. Que.error_notifier.call(error, job) rescue nil end return {:event => :job_errored, :error => error, :job => job} ensure # Clear the advisory lock we took when locking the job. Important # to do this so that they don't pile up in the database. Again, if # we can't reach the database, don't crash the work loop. begin Que.execute "SELECT pg_advisory_unlock($1)", [job[:job_id]] if job rescue end end end Que.adapter.cleanup! return_value end |
Instance Method Details
#_run ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/que/job.rb', line 16 def _run run(*attrs[:args]) destroy unless @destroyed rescue => error @_error = error run_error_notifier = handle_error(error) destroy unless @retried || @destroyed if run_error_notifier && Que.error_notifier # Protect the work loop from a failure of the error notifier. Que.error_notifier.call(error, @attrs) rescue nil end end |
#run(*args) ⇒ Object
Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.
13 14 |
# File 'lib/que/job.rb', line 13 def run(*args) end |