Class: Que::Job

Inherits:
Object
  • Object
show all
Includes:
JobMethods
Defined in:
lib/que/job.rb

Constant Summary collapse

MAXIMUM_TAGS_COUNT =
5
MAXIMUM_TAG_LENGTH =
100

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from JobMethods

#_run, #log_level

Constructor Details

#initialize(attrs) ⇒ Job

Returns a new instance of Job.



53
54
55
56
# File 'lib/que/job.rb', line 53

def initialize(attrs)
  @que_attrs = attrs
  Que.internal_log(:job_instantiate, self) { attrs }
end

Class Attribute Details

.maximum_retry_countObject

Job class configuration options.



72
73
74
# File 'lib/que/job.rb', line 72

def maximum_retry_count
  @maximum_retry_count
end

.priorityObject

Job class configuration options.



72
73
74
# File 'lib/que/job.rb', line 72

def priority
  @priority
end

.queueObject

Job class configuration options.



72
73
74
# File 'lib/que/job.rb', line 72

def queue
  @queue
end

.retry_intervalObject

Job class configuration options.



72
73
74
# File 'lib/que/job.rb', line 72

def retry_interval
  @retry_interval
end

.run_atObject

Job class configuration options.



72
73
74
# File 'lib/que/job.rb', line 72

def run_at
  @run_at
end

.run_synchronouslyObject

Job class configuration options.



72
73
74
# File 'lib/que/job.rb', line 72

def run_synchronously
  @run_synchronously
end

Instance Attribute Details

#que_attrsObject (readonly)

Returns the value of attribute que_attrs.



50
51
52
# File 'lib/que/job.rb', line 50

def que_attrs
  @que_attrs
end

#que_errorObject

Returns the value of attribute que_error.



51
52
53
# File 'lib/que/job.rb', line 51

def que_error
  @que_error
end

#que_resolvedObject

Returns the value of attribute que_resolved.



51
52
53
# File 'lib/que/job.rb', line 51

def que_resolved
  @que_resolved
end

Class Method Details

._bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/que/job.rb', line 155

def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
  raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) }

  if job_options[:tags]
    if job_options[:tags].length > MAXIMUM_TAGS_COUNT
      raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
    end

    job_options[:tags].each do |tag|
      if tag.length > MAXIMUM_TAG_LENGTH
        raise Que::Error, "Can't enqueue a job with a tag longer than 100 characters! (\"#{tag}\")"
      end
    end
  end

  args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs|
    args_and_kwargs.merge(
      args: args_and_kwargs.fetch(:args, []),
      kwargs: args_and_kwargs.fetch(:kwargs, {}),
    )
  end

  attrs = {
    queue:    job_options[:queue]    || resolve_que_setting(:queue) || Que.default_queue,
    priority: job_options[:priority] || resolve_que_setting(:priority),
    run_at:   job_options[:run_at]   || resolve_que_setting(:run_at),
    args_and_kwargs_array: args_and_kwargs_array,
    data:     job_options[:tags] ? { tags: job_options[:tags] } : {},
    job_class: \
      job_options[:job_class] || name ||
        raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
  }

  if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
    args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array)))
    args_and_kwargs_array.map do |args_and_kwargs|
      _run_attrs(
        attrs.merge(
          args: args_and_kwargs.fetch(:args),
          kwargs: args_and_kwargs.fetch(:kwargs),
        ),
      )
    end
  else
    attrs.merge!(
      args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]),
      data: Que.serialize_json(attrs[:data]),
    )
    values_array =
      Que.transaction do
        Que.execute('SET LOCAL que.skip_notify TO true') unless notify
        Que.execute(
          :bulk_insert_jobs,
          attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data),
        )
      end
    values_array.map(&method(:new))
  end
end

.bulk_enqueue(job_options: {}, notify: false) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/que/job.rb', line 140

def bulk_enqueue(job_options: {}, notify: false)
  raise Que::Error, "Can't nest .bulk_enqueue" unless Thread.current[:que_jobs_to_bulk_insert].nil?
  Thread.current[:que_jobs_to_bulk_insert] = { jobs_attrs: [], job_options: job_options }
  yield
  jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]
  job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
  return [] if jobs_attrs.empty?
  raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one?
  args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) }
  klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class])
  klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify)
ensure
  Thread.current[:que_jobs_to_bulk_insert] = nil
end

.enqueue(*args) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/que/job.rb', line 80

def enqueue(*args)
  args, kwargs = Que.split_out_ruby2_keywords(args)

  job_options = kwargs.delete(:job_options) || {}

  if job_options[:tags]
    if job_options[:tags].length > MAXIMUM_TAGS_COUNT
      raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
    end

    job_options[:tags].each do |tag|
      if tag.length > MAXIMUM_TAG_LENGTH
        raise Que::Error, "Can't enqueue a job with a tag longer than 100 characters! (\"#{tag}\")"
      end
    end
  end

  attrs = {
    queue:    job_options[:queue]    || resolve_que_setting(:queue) || Que.default_queue,
    priority: job_options[:priority] || resolve_que_setting(:priority),
    run_at:   job_options[:run_at]   || resolve_que_setting(:run_at),
    args:     args,
    kwargs:   kwargs,
    data:     job_options[:tags] ? { tags: job_options[:tags] } : {},
    job_class: \
      job_options[:job_class] || name ||
        raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
  }

  if Thread.current[:que_jobs_to_bulk_insert]
    if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
      raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
    end

    raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {}

    Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
    new({})
  elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
    attrs.merge!(
      args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
      kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
      data: Que.deserialize_json(Que.serialize_json(attrs[:data])),
    )
    _run_attrs(attrs)
  else
    attrs.merge!(
      args: Que.serialize_json(attrs[:args]),
      kwargs: Que.serialize_json(attrs[:kwargs]),
      data: Que.serialize_json(attrs[:data]),
    )
    values = Que.execute(
      :insert_job,
      attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data),
    ).first
    new(values)
  end
end

.resolve_que_setting(setting, *args) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/que/job.rb', line 227

def resolve_que_setting(setting, *args)
  value = send(setting) if respond_to?(setting)

  if !value.nil?
    value.respond_to?(:call) ? value.call(*args) : value
  else
    c = superclass
    if c.respond_to?(:resolve_que_setting)
      c.resolve_que_setting(setting, *args)
    end
  end
end

.run(*args) ⇒ Object



215
216
217
218
219
220
221
222
223
224
# File 'lib/que/job.rb', line 215

def run(*args)
  # Make sure things behave the same as they would have with a round-trip
  # to the DB.
  args, kwargs = Que.split_out_ruby2_keywords(args)
  args = Que.deserialize_json(Que.serialize_json(args))
  kwargs = Que.deserialize_json(Que.serialize_json(kwargs))

  # Should not fail if there's no DB connection.
  _run_attrs(args: args, kwargs: kwargs)
end

Instance Method Details

#run(*args) ⇒ Object

Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue can queue an empty job in testing.



60
61
# File 'lib/que/job.rb', line 60

def run(*args)
end