Module: ActiveJob::Core

Extended by:
ActiveSupport::Concern
Included in:
Base
Defined in:
lib/active_job/core.rb

Overview

Active Job Core

Provides general behavior that will be included into every Active Job object that inherits from ActiveJob::Base.

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#_scheduled_at_timeObject (readonly)

:nodoc:



18
19
20
# File 'lib/active_job/core.rb', line 18

def _scheduled_at_time
  @_scheduled_at_time
end

#argumentsObject

Job arguments



12
13
14
# File 'lib/active_job/core.rb', line 12

def arguments
  @arguments
end

#enqueue_errorObject

Track any exceptions raised by the backend so callers can inspect the errors.



58
59
60
# File 'lib/active_job/core.rb', line 58

def enqueue_error
  @enqueue_error
end

#enqueued_atObject

Track when a job was enqueued



48
49
50
# File 'lib/active_job/core.rb', line 48

def enqueued_at
  @enqueued_at
end

#exception_executionsObject

Hash that contains the number of times this job handled errors for each specific retry_on declaration. Keys are the string representation of the exceptions listed in the retry_on declaration, while its associated value holds the number of executions where the corresponding retry_on declaration handled one of its listed exceptions.



39
40
41
# File 'lib/active_job/core.rb', line 39

def exception_executions
  @exception_executions
end

#executionsObject

Number of times this job has been executed (which increments on every retry, like after an exception).



33
34
35
# File 'lib/active_job/core.rb', line 33

def executions
  @executions
end

#job_idObject

Job Identifier



21
22
23
# File 'lib/active_job/core.rb', line 21

def job_id
  @job_id
end

#localeObject

I18n.locale to be used during the job.



42
43
44
# File 'lib/active_job/core.rb', line 42

def locale
  @locale
end

#priority=(value) ⇒ Object (writeonly)

Priority that the job will have (lower is more priority).



27
28
29
# File 'lib/active_job/core.rb', line 27

def priority=(value)
  @priority = value
end

#provider_job_idObject

ID optionally provided by adapter



30
31
32
# File 'lib/active_job/core.rb', line 30

def provider_job_id
  @provider_job_id
end

#queue_name=(value) ⇒ Object (writeonly)

Queue in which the job will reside.



24
25
26
# File 'lib/active_job/core.rb', line 24

def queue_name=(value)
  @queue_name = value
end

#scheduled_atObject

Time when the job should be performed



16
17
18
# File 'lib/active_job/core.rb', line 16

def scheduled_at
  @scheduled_at
end

#serialized_arguments=(value) ⇒ Object (writeonly)

Sets the attribute serialized_arguments

Parameters:

  • value

    the value to set the attribute serialized_arguments to.



13
14
15
# File 'lib/active_job/core.rb', line 13

def serialized_arguments=(value)
  @serialized_arguments = value
end

#successfully_enqueued=(value) ⇒ Object (writeonly)

Track whether the adapter received the job successfully.



51
52
53
# File 'lib/active_job/core.rb', line 51

def successfully_enqueued=(value)
  @successfully_enqueued = value
end

#timezoneObject

Timezone to be used during the job.



45
46
47
# File 'lib/active_job/core.rb', line 45

def timezone
  @timezone
end

Instance Method Details

#deserialize(job_data) ⇒ Object

Attaches the stored job data to the current instance. Receives a hash returned from serialize

Examples

class DeliverWebhookJob < ActiveJob::Base
  attr_writer :attempt_number

  def attempt_number
    @attempt_number ||= 0
  end

  def serialize
    super.merge('attempt_number' => attempt_number + 1)
  end

  def deserialize(job_data)
    super
    self.attempt_number = job_data['attempt_number']
  end

  rescue_from(Timeout::Error) do |exception|
    raise exception if attempt_number > 5
    retry_job(wait: 10)
  end
end


153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/active_job/core.rb', line 153

def deserialize(job_data)
  self.job_id               = job_data["job_id"]
  self.provider_job_id      = job_data["provider_job_id"]
  self.queue_name           = job_data["queue_name"]
  self.priority             = job_data["priority"]
  self.serialized_arguments = job_data["arguments"]
  self.executions           = job_data["executions"]
  self.exception_executions = job_data["exception_executions"]
  self.locale               = job_data["locale"] || I18n.locale.to_s
  self.timezone             = job_data["timezone"] || Time.zone&.name
  self.enqueued_at          = Time.iso8601(job_data["enqueued_at"]) if job_data["enqueued_at"]
  self.scheduled_at         = Time.iso8601(job_data["scheduled_at"]) if job_data["scheduled_at"]
end

#initialize(*arguments) ⇒ Object

Creates a new job instance. Takes the arguments that will be passed to the perform method.



95
96
97
98
99
100
101
102
103
104
105
# File 'lib/active_job/core.rb', line 95

def initialize(*arguments)
  @arguments  = arguments
  @job_id     = SecureRandom.uuid
  @queue_name = self.class.queue_name
  @scheduled_at = nil
  @_scheduled_at_time = nil
  @priority   = self.class.priority
  @executions = 0
  @exception_executions = {}
  @timezone   = Time.zone&.name
end

#serializeObject

Returns a hash with the job data that can safely be passed to the queuing adapter.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/active_job/core.rb', line 110

def serialize
  {
    "job_class"  => self.class.name,
    "job_id"     => job_id,
    "provider_job_id" => provider_job_id,
    "queue_name" => queue_name,
    "priority"   => priority,
    "arguments"  => serialize_arguments_if_needed(arguments),
    "executions" => executions,
    "exception_executions" => exception_executions,
    "locale"     => I18n.locale.to_s,
    "timezone"   => timezone,
    "enqueued_at" => Time.now.utc.iso8601(9),
    "scheduled_at" => _scheduled_at_time ? _scheduled_at_time.utc.iso8601(9) : nil,
  }
end

#set(options = {}) ⇒ Object

Configures the job with the given options.



168
169
170
171
172
173
174
175
# File 'lib/active_job/core.rb', line 168

def set(options = {}) # :nodoc:
  self.scheduled_at = options[:wait].seconds.from_now if options[:wait]
  self.scheduled_at = options[:wait_until] if options[:wait_until]
  self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
  self.priority     = options[:priority].to_i if options[:priority]

  self
end

#successfully_enqueued?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/active_job/core.rb', line 53

def successfully_enqueued?
  @successfully_enqueued
end