Class: Que::Job
- Inherits:
-
Object
- Object
- Que::Job
- Defined in:
- lib/que/job.rb
Class Attribute Summary collapse
-
.retry_interval ⇒ Object
readonly
Returns the value of attribute retry_interval.
Instance Attribute Summary collapse
-
#attrs ⇒ Object
readonly
Returns the value of attribute attrs.
Class Method Summary collapse
Instance Method Summary collapse
- #_run ⇒ Object
-
#initialize(attrs) ⇒ Job
constructor
A new instance of Job.
-
#run(*args) ⇒ Object
Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.
Constructor Details
#initialize(attrs) ⇒ Job
Returns a new instance of Job.
5 6 7 |
# File 'lib/que/job.rb', line 5 def initialize(attrs) @attrs = attrs end |
Class Attribute Details
.retry_interval ⇒ Object (readonly)
Returns the value of attribute retry_interval.
29 30 31 |
# File 'lib/que/job.rb', line 29 def retry_interval @retry_interval end |
Instance Attribute Details
#attrs ⇒ Object (readonly)
Returns the value of attribute attrs.
3 4 5 |
# File 'lib/que/job.rb', line 3 def attrs @attrs end |
Class Method Details
.enqueue(*args) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/que/job.rb', line 31 def enqueue(*args) if args.last.is_a?(Hash) = args.pop queue = .delete(:queue) || '' if .key?(:queue) job_class = .delete(:job_class) run_at = .delete(:run_at) priority = .delete(:priority) args << if .any? end attrs = {:job_class => job_class || to_s, :args => args} warn "@default_run_at in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @run_at instead." if @default_run_at if t = run_at || @run_at && @run_at.call || @default_run_at && @default_run_at.call attrs[:run_at] = t end warn "@default_priority in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @priority instead." if @default_priority if p = priority || @priority || @default_priority attrs[:priority] = p end if q = queue || @queue attrs[:queue] = q end if Que.mode == :sync && !t run(*attrs[:args]) else values = Que.execute(:insert_job, attrs.values_at(:queue, :priority, :run_at, :job_class, :args)).first Que.adapter.wake_worker_after_commit unless t new(values) end end |
.queue(*args) ⇒ Object
68 69 70 71 |
# File 'lib/que/job.rb', line 68 def queue(*args) warn "#{to_s}.queue(*args) is deprecated and will be removed in Que version 1.0.0. Please use #{to_s}.enqueue(*args) instead." enqueue(*args) end |
.run(*args) ⇒ Object
73 74 75 76 |
# File 'lib/que/job.rb', line 73 def run(*args) # Should not fail if there's no DB connection. new(:args => args).tap { |job| job.run(*args) } end |
.work(queue = '') ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/que/job.rb', line 78 def work(queue = '') # Since we're taking session-level advisory locks, we have to hold the # same connection throughout the process of getting a job, working it, # deleting it, and removing the lock. Que.adapter.checkout do begin if job = Que.execute(:lock_job, [queue]).first # Edge case: It's possible for the lock_job query to have # grabbed a job that's already been worked, if it took its MVCC # snapshot while the job was processing, but didn't attempt the # advisory lock until it was finished. Since we have the lock, a # previous worker would have deleted it by now, so we just # double check that it still exists before working it. # Note that there is currently no spec for this behavior, since # I'm not sure how to reliably commit a transaction that deletes # the job in a separate thread between lock_job and check_job. if Que.execute(:check_job, job.values_at(:queue, :priority, :run_at, :job_id)).none? {:event => :job_race_condition} else klass = class_for(job[:job_class]) klass.new(job)._run {:event => :job_worked, :job => job} end else {:event => :job_unavailable} end rescue => error begin if job count = job[:error_count].to_i + 1 interval = klass && klass.respond_to?(:retry_interval) && klass.retry_interval || retry_interval delay = interval.respond_to?(:call) ? interval.call(count) : interval = "#{error.}\n#{error.backtrace.join("\n")}" Que.execute :set_error, [count, delay, ] + job.values_at(:queue, :priority, :run_at, :job_id) end rescue # If we can't reach the database for some reason, too bad, but # don't let it crash the work loop. end if Que.error_handler # Similarly, protect the work loop from a failure of the error handler. Que.error_handler.call(error, job) rescue nil end return {:event => :job_errored, :error => error, :job => job} ensure # Clear the advisory lock we took when locking the job. Important # to do this so that they don't pile up in the database. Again, if # we can't reach the database, don't crash the work loop. begin Que.execute "SELECT pg_advisory_unlock($1)", [job[:job_id]] if job rescue end end end end |
Instance Method Details
#_run ⇒ Object
14 15 16 17 |
# File 'lib/que/job.rb', line 14 def _run run(*attrs[:args]) destroy unless @destroyed end |
#run(*args) ⇒ Object
Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.
11 12 |
# File 'lib/que/job.rb', line 11 def run(*args) end |