Class: Que::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/que/job.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs) ⇒ Job

Returns a new instance of Job.



7
8
9
# File 'lib/que/job.rb', line 7

def initialize(attrs)
  @attrs = attrs
end

Class Attribute Details

.retry_intervalObject (readonly)

Returns the value of attribute retry_interval.



60
61
62
# File 'lib/que/job.rb', line 60

def retry_interval
  @retry_interval
end

Instance Attribute Details

#_errorObject (readonly)

Returns the value of attribute _error.



5
6
7
# File 'lib/que/job.rb', line 5

def _error
  @_error
end

#attrsObject (readonly)

Returns the value of attribute attrs.



5
6
7
# File 'lib/que/job.rb', line 5

def attrs
  @attrs
end

Class Method Details

.enqueue(*args) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/que/job.rb', line 62

def enqueue(*args)
  if args.last.is_a?(Hash)
    options   = args.pop
    queue     = options.delete(:queue) || '' if options.key?(:queue)
    job_class = options.delete(:job_class)
    run_at    = options.delete(:run_at)
    priority  = options.delete(:priority)
    args << options if options.any?
  end

  attrs = {:job_class => job_class || to_s, :args => args}

  warn "@default_run_at in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @run_at instead." if @default_run_at

  if t = run_at || @run_at && @run_at.call || @default_run_at && @default_run_at.call
    attrs[:run_at] = t
  end

  warn "@default_priority in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @priority instead." if @default_priority

  if p = priority || @priority || @default_priority
    attrs[:priority] = p
  end

  if q = queue || @queue
    attrs[:queue] = q
  end

  if Que.mode == :sync && !t
    run(*attrs[:args])
  else
    values = Que.execute(:insert_job, attrs.values_at(:queue, :priority, :run_at, :job_class, :args)).first
    Que.adapter.wake_worker_after_commit unless t
    new(values)
  end
end

.queue(*args) ⇒ Object



99
100
101
102
# File 'lib/que/job.rb', line 99

def queue(*args)
  warn "#{to_s}.queue(*args) is deprecated and will be removed in Que version 1.0.0. Please use #{to_s}.enqueue(*args) instead."
  enqueue(*args)
end

.run(*args) ⇒ Object



104
105
106
107
# File 'lib/que/job.rb', line 104

def run(*args)
  # Should not fail if there's no DB connection.
  new(:args => args).tap { |job| job.run(*args) }
end

.work(queue = '') ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/que/job.rb', line 109

def work(queue = '')
  # Since we're taking session-level advisory locks, we have to hold the
  # same connection throughout the process of getting a job, working it,
  # deleting it, and removing the lock.
  return_value =
    Que.adapter.checkout do
      begin
        if job = Que.execute(:lock_job, [queue]).first
          # Edge case: It's possible for the lock_job query to have
          # grabbed a job that's already been worked, if it took its MVCC
          # snapshot while the job was processing, but didn't attempt the
          # advisory lock until it was finished. Since we have the lock, a
          # previous worker would have deleted it by now, so we just
          # double check that it still exists before working it.

          # Note that there is currently no spec for this behavior, since
          # I'm not sure how to reliably commit a transaction that deletes
          # the job in a separate thread between lock_job and check_job.
          if Que.execute(:check_job, job.values_at(:queue, :priority, :run_at, :job_id)).none?
            {:event => :job_race_condition}
          else
            klass = class_for(job[:job_class])
            instance = klass.new(job)
            instance._run
            if e = instance._error
              {:event => :job_errored, :job => job, :error => e}
            else
              {:event => :job_worked, :job => job}
            end
          end
        else
          {:event => :job_unavailable}
        end
      rescue => error
        begin
          if job
            count    = job[:error_count].to_i + 1
            interval = klass && klass.respond_to?(:retry_interval) && klass.retry_interval || retry_interval
            delay    = interval.respond_to?(:call) ? interval.call(count) : interval
            message  = error_message(error)
            Que.execute :set_error, [delay, message] + job.values_at(:queue, :priority, :run_at, :job_id)
          end
        rescue
          # If we can't reach the database for some reason, too bad, but
          # don't let it crash the work loop.
        end

        if Que.error_notifier
          # Similarly, protect the work loop from a failure of the error notifier.
          Que.error_notifier.call(error, job) rescue nil
        end

        return {:event => :job_errored, :error => error, :job => job}
      ensure
        # Clear the advisory lock we took when locking the job. Important
        # to do this so that they don't pile up in the database. Again, if
        # we can't reach the database, don't crash the work loop.
        begin
          Que.execute "SELECT pg_advisory_unlock($1)", [job[:job_id]] if job
        rescue
        end
      end
    end

  Que.adapter.cleanup!

  return_value
end

Instance Method Details

#_runObject



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/que/job.rb', line 16

def _run
  run(*attrs[:args])
  destroy unless @destroyed
rescue => error
  @_error = error
  run_error_notifier = handle_error(error)
  destroy unless @retried || @destroyed

  if run_error_notifier && Que.error_notifier
    # Protect the work loop from a failure of the error notifier.
    Que.error_notifier.call(error, @attrs) rescue nil
  end
end

#run(*args) ⇒ Object

Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.



13
14
# File 'lib/que/job.rb', line 13

def run(*args)
end