Class: Que::Job
- Inherits:
-
Object
- Object
- Que::Job
- Defined in:
- lib/que/job.rb
Class Attribute Summary collapse
-
.retry_interval ⇒ Object
readonly
Returns the value of attribute retry_interval.
Instance Attribute Summary collapse
-
#attrs ⇒ Object
readonly
Returns the value of attribute attrs.
Class Method Summary collapse
Instance Method Summary collapse
- #_run ⇒ Object
-
#initialize(attrs) ⇒ Job
constructor
A new instance of Job.
-
#run(*args) ⇒ Object
Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.
Constructor Details
#initialize(attrs) ⇒ Job
Returns a new instance of Job.
5 6 7 |
# File 'lib/que/job.rb', line 5 def initialize(attrs) @attrs = attrs end |
Class Attribute Details
.retry_interval ⇒ Object (readonly)
Returns the value of attribute retry_interval.
29 30 31 |
# File 'lib/que/job.rb', line 29 def retry_interval @retry_interval end |
Instance Attribute Details
#attrs ⇒ Object (readonly)
Returns the value of attribute attrs.
3 4 5 |
# File 'lib/que/job.rb', line 3 def attrs @attrs end |
Class Method Details
.enqueue(*args) ⇒ Object Also known as: queue
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/que/job.rb', line 31 def enqueue(*args) if args.last.is_a?(Hash) = args.pop queue = .delete(:queue) || '' if .key?(:queue) job_class = .delete(:job_class) run_at = .delete(:run_at) priority = .delete(:priority) args << if .any? end attrs = {:job_class => job_class || to_s, :args => args} if t = run_at || @run_at && @run_at.call || @default_run_at && @default_run_at.call attrs[:run_at] = t end if p = priority || @priority || @default_priority attrs[:priority] = p end if q = queue || @queue attrs[:queue] = q end if Que.mode == :sync && !t run(*attrs[:args]) else values = Que.execute(:insert_job, attrs.values_at(:queue, :priority, :run_at, :job_class, :args)).first Que.adapter.wake_worker_after_commit unless t new(values) end end |
.run(*args) ⇒ Object
66 67 68 |
# File 'lib/que/job.rb', line 66 def run(*args) new(:args => args).tap(&:_run) end |
.work(queue = '') ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/que/job.rb', line 70 def work(queue = '') # Since we're taking session-level advisory locks, we have to hold the # same connection throughout the process of getting a job, working it, # deleting it, and removing the lock. Que.adapter.checkout do begin if job = Que.execute(:lock_job, [queue]).first # Edge case: It's possible for the lock_job query to have # grabbed a job that's already been worked, if it took its MVCC # snapshot while the job was processing, but didn't attempt the # advisory lock until it was finished. Since we have the lock, a # previous worker would have deleted it by now, so we just # double check that it still exists before working it. # Note that there is currently no spec for this behavior, since # I'm not sure how to reliably commit a transaction that deletes # the job in a separate thread between lock_job and check_job. if Que.execute(:check_job, job.values_at(:queue, :priority, :run_at, :job_id)).none? {:event => :job_race_condition} else klass = class_for(job[:job_class]) klass.new(job)._run {:event => :job_worked, :job => job} end else {:event => :job_unavailable} end rescue => error begin if job count = job[:error_count].to_i + 1 interval = klass && klass.retry_interval || retry_interval delay = interval.respond_to?(:call) ? interval.call(count) : interval = "#{error.}\n#{error.backtrace.join("\n")}" Que.execute :set_error, [count, delay, ] + job.values_at(:queue, :priority, :run_at, :job_id) end rescue # If we can't reach the database for some reason, too bad, but # don't let it crash the work loop. end if Que.error_handler # Similarly, protect the work loop from a failure of the error handler. Que.error_handler.call(error) rescue nil end return {:event => :job_errored, :error => error, :job => job} ensure # Clear the advisory lock we took when locking the job. Important # to do this so that they don't pile up in the database. Again, if # we can't reach the database, don't crash the work loop. begin Que.execute "SELECT pg_advisory_unlock($1)", [job[:job_id]] if job rescue end end end end |
Instance Method Details
#_run ⇒ Object
14 15 16 17 |
# File 'lib/que/job.rb', line 14 def _run run *attrs[:args] destroy unless @destroyed end |
#run(*args) ⇒ Object
Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.
11 12 |
# File 'lib/que/job.rb', line 11 def run(*args) end |