Class: Que::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/que/job.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs) ⇒ Job

Returns a new instance of Job.



5
6
7
# File 'lib/que/job.rb', line 5

def initialize(attrs)
  @attrs = attrs
end

Class Attribute Details

.retry_intervalObject (readonly)

Returns the value of attribute retry_interval.



29
30
31
# File 'lib/que/job.rb', line 29

def retry_interval
  @retry_interval
end

Instance Attribute Details

#attrsObject (readonly)

Returns the value of attribute attrs.



3
4
5
# File 'lib/que/job.rb', line 3

def attrs
  @attrs
end

Class Method Details

.enqueue(*args) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/que/job.rb', line 31

def enqueue(*args)
  if args.last.is_a?(Hash)
    options   = args.pop
    queue     = options.delete(:queue) || '' if options.key?(:queue)
    job_class = options.delete(:job_class)
    run_at    = options.delete(:run_at)
    priority  = options.delete(:priority)
    args << options if options.any?
  end

  attrs = {:job_class => job_class || to_s, :args => args}

  warn "@default_run_at in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @run_at instead." if @default_run_at

  if t = run_at || @run_at && @run_at.call || @default_run_at && @default_run_at.call
    attrs[:run_at] = t
  end

  warn "@default_priority in #{to_s} has been deprecated and will be removed in Que version 1.0.0. Please use @priority instead." if @default_priority

  if p = priority || @priority || @default_priority
    attrs[:priority] = p
  end

  if q = queue || @queue
    attrs[:queue] = q
  end

  if Que.mode == :sync && !t
    run(*attrs[:args])
  else
    values = Que.execute(:insert_job, attrs.values_at(:queue, :priority, :run_at, :job_class, :args)).first
    Que.adapter.wake_worker_after_commit unless t
    new(values)
  end
end

.queue(*args) ⇒ Object



68
69
70
71
# File 'lib/que/job.rb', line 68

def queue(*args)
  warn "#{to_s}.queue(*args) is deprecated and will be removed in Que version 1.0.0. Please use #{to_s}.enqueue(*args) instead."
  enqueue(*args)
end

.run(*args) ⇒ Object



73
74
75
76
# File 'lib/que/job.rb', line 73

def run(*args)
  # Should not fail if there's no DB connection.
  new(:args => args).tap { |job| job.run(*args) }
end

.work(queue = '') ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/que/job.rb', line 78

def work(queue = '')
  # Since we're taking session-level advisory locks, we have to hold the
  # same connection throughout the process of getting a job, working it,
  # deleting it, and removing the lock.
  Que.adapter.checkout do
    begin
      if job = Que.execute(:lock_job, [queue]).first
        # Edge case: It's possible for the lock_job query to have
        # grabbed a job that's already been worked, if it took its MVCC
        # snapshot while the job was processing, but didn't attempt the
        # advisory lock until it was finished. Since we have the lock, a
        # previous worker would have deleted it by now, so we just
        # double check that it still exists before working it.

        # Note that there is currently no spec for this behavior, since
        # I'm not sure how to reliably commit a transaction that deletes
        # the job in a separate thread between lock_job and check_job.
        if Que.execute(:check_job, job.values_at(:queue, :priority, :run_at, :job_id)).none?
          {:event => :job_race_condition}
        else
          klass = class_for(job[:job_class])
          klass.new(job)._run
          {:event => :job_worked, :job => job}
        end
      else
        {:event => :job_unavailable}
      end
    rescue => error
      begin
        if job
          count    = job[:error_count].to_i + 1
          interval = klass && klass.respond_to?(:retry_interval) && klass.retry_interval || retry_interval
          delay    = interval.respond_to?(:call) ? interval.call(count) : interval
          message  = "#{error.message}\n#{error.backtrace.join("\n")}"
          Que.execute :set_error, [count, delay, message] + job.values_at(:queue, :priority, :run_at, :job_id)
        end
      rescue
        # If we can't reach the database for some reason, too bad, but
        # don't let it crash the work loop.
      end

      if Que.error_handler
        # Similarly, protect the work loop from a failure of the error handler.
        Que.error_handler.call(error, job) rescue nil
      end

      return {:event => :job_errored, :error => error, :job => job}
    ensure
      # Clear the advisory lock we took when locking the job. Important
      # to do this so that they don't pile up in the database. Again, if
      # we can't reach the database, don't crash the work loop.
      begin
        Que.execute "SELECT pg_advisory_unlock($1)", [job[:job_id]] if job
      rescue
      end
    end
  end
end

Instance Method Details

#_runObject



14
15
16
17
# File 'lib/que/job.rb', line 14

def _run
  run(*attrs[:args])
  destroy unless @destroyed
end

#run(*args) ⇒ Object

Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.



11
12
# File 'lib/que/job.rb', line 11

def run(*args)
end