Method: Que::Job.work

Defined in:
lib/que/job.rb

.work(queue = '') ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/que/job.rb', line 106

def work(queue = '')
  # Since we're taking session-level advisory locks, we have to hold the
  # same connection throughout the process of getting a job, working it,
  # deleting it, and removing the lock.
  return_value =
    Que.adapter.checkout do
      begin
        if job = Que.execute(:lock_job, [queue]).first
          # Edge case: It's possible for the lock_job query to have
          # grabbed a job that's already been worked, if it took its MVCC
          # snapshot while the job was processing, but didn't attempt the
          # advisory lock until it was finished. Since we have the lock, a
          # previous worker would have deleted it by now, so we just
          # double check that it still exists before working it.

          # Note that there is currently no spec for this behavior, since
          # I'm not sure how to reliably commit a transaction that deletes
          # the job in a separate thread between lock_job and check_job.
          if Que.execute(:check_job, job.values_at(:queue, :priority, :run_at, :job_id)).none?
            {:event => :job_race_condition}
          else
            klass = class_for(job[:job_class])
            instance = klass.new(job)
            instance._run
            if e = instance._error
              {:event => :job_errored, :job => job, :error => e}
            else
              {:event => :job_worked, :job => job}
            end
          end
        else
          {:event => :job_unavailable}
        end
      rescue => error
        begin
          if job
            count    = job[:error_count].to_i + 1
            interval = klass && klass.respond_to?(:retry_interval) && klass.retry_interval || retry_interval
            delay    = interval.respond_to?(:call) ? interval.call(count) : interval
            message  = "#{error.message}\n#{error.backtrace.join("\n")}"
            Que.execute :set_error, [delay, message] + job.values_at(:queue, :priority, :run_at, :job_id)
          end
        rescue
          # If we can't reach the database for some reason, too bad, but
          # don't let it crash the work loop.
        end

        if Que.error_notifier
          # Similarly, protect the work loop from a failure of the error notifier.
          Que.error_notifier.call(error, job) rescue nil
        end

        return {:event => :job_errored, :error => error, :job => job}
      ensure
        # Clear the advisory lock we took when locking the job. Important
        # to do this so that they don't pile up in the database. Again, if
        # we can't reach the database, don't crash the work loop.
        begin
          Que.execute "SELECT pg_advisory_unlock($1)", [job[:job_id]] if job
        rescue
        end
      end
    end

  Que.adapter.cleanup!

  return_value
end