Class: GoodJob::Adapter

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/adapter.rb,
lib/good_job/adapter/inline_buffer.rb

Overview

ActiveJob Adapter.

Defined Under Namespace

Classes: InlineBuffer

Class Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(execution_mode: nil, _capsule: GoodJob.capsule) ⇒ Adapter



28
29
30
31
32
33
34
35
# File 'lib/good_job/adapter.rb', line 28

def initialize(execution_mode: nil, _capsule: GoodJob.capsule) # rubocop:disable Lint/UnderscorePrefixedVariableName
  @_execution_mode_override = execution_mode
  GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override
  @capsule = _capsule

  start_async if GoodJob.async_ready?
  self.class.instances << self
end

Class Attribute Details

.instancesArray<GoodJob::Adapter>? (readonly)

List of all instantiated Adapters in the current process.



12
# File 'lib/good_job/adapter.rb', line 12

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Method Details

#async_started?Boolean

Whether the async executors are running



224
225
226
# File 'lib/good_job/adapter.rb', line 224

def async_started?
  @_async_started
end

#enqueue(active_job) ⇒ GoodJob::Job

Enqueues the ActiveJob job to be performed. For use by Rails; you should generally not call this directly.



41
42
43
# File 'lib/good_job/adapter.rb', line 41

def enqueue(active_job)
  enqueue_at(active_job, nil)
end

#enqueue_after_transaction_commit?Boolean

Defines if enqueueing this job from inside an Active Record transaction automatically defers the enqueue to after the transaction commit.



47
48
49
# File 'lib/good_job/adapter.rb', line 47

def enqueue_after_transaction_commit?
  GoodJob.configuration.enqueue_after_transaction_commit
end

#enqueue_all(active_jobs) ⇒ Integer

Enqueues multiple ActiveJob instances at once



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/good_job/adapter.rb', line 54

def enqueue_all(active_jobs)
  active_jobs = Array(active_jobs)
  return 0 if active_jobs.empty?

  Rails.application.executor.wrap do
    current_time = Time.current
    jobs = active_jobs.map do |active_job|
      GoodJob::Job.build_for_enqueue(active_job).tap do |job|
        job.scheduled_at = current_time if job.scheduled_at == job.created_at
        job.created_at = current_time
        job.updated_at = current_time
      end
    end

    inline_jobs = []
    GoodJob::Job.transaction(requires_new: true, joinable: false) do
      job_attributes = jobs.map(&:attributes)
      results = GoodJob::Job.insert_all(job_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations

      job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
      active_jobs.each do |active_job|
        active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
        active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
      end
      jobs.each do |job|
        job.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[job.active_job_id]
      end
      jobs = jobs.select(&:persisted?) # prune unpersisted jobs

      if execute_inline?
        inline_jobs = jobs.select { |job| job.scheduled_at.nil? || job.scheduled_at <= Time.current }
        inline_jobs.each(&:advisory_lock!)
      end
    end

    if inline_jobs.any?
      deferred = InlineBuffer.defer?
      InlineBuffer.perform_now_or_defer do
        @capsule.tracker.register do
          until inline_jobs.empty?
            inline_job = inline_jobs.shift
            perform_inline(inline_job, notify: deferred ? send_notify?(inline_job) : false)
          end
        ensure
          inline_jobs.each(&:advisory_unlock)
        end
      end
    end

    non_inline_jobs = if InlineBuffer.defer?
                        jobs - inline_jobs
                      else
                        jobs.reject(&:finished_at)
                      end
    if non_inline_jobs.any?
      job_id_to_active_jobs = active_jobs.index_by(&:job_id)
      non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue|
        jobs_by_queue.group_by(&:scheduled_at).each do |scheduled_at, jobs_by_queue_and_scheduled_at|
          state = { queue_name: queue_name, count: jobs_by_queue_and_scheduled_at.size }
          state[:scheduled_at] = scheduled_at if scheduled_at

          executed_locally = execute_async? && @capsule&.create_thread(state)
          unless executed_locally
            state[:count] = job_id_to_active_jobs.values_at(*jobs_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
            Notifier.notify(state) unless state[:count].zero?
          end
        end
      end
    end
  end

  active_jobs.count(&:provider_job_id)
end

#enqueue_at(active_job, timestamp) ⇒ GoodJob::Job

Enqueues an ActiveJob job to be run at a specific time. For use by Rails; you should generally not call this directly.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/good_job/adapter.rb', line 133

def enqueue_at(active_job, timestamp)
  scheduled_at = timestamp ? Time.zone.at(timestamp) : nil

  # If there is a currently open Bulk in the current thread, direct the
  # job there to be enqueued using enqueue_all
  return if GoodJob::Bulk.capture(active_job, queue_adapter: self)

  Rails.application.executor.wrap do
    will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
    will_retry_inline = will_execute_inline && CurrentThread.job&.active_job_id == active_job.job_id && !CurrentThread.retry_now

    if will_retry_inline
      job = GoodJob::Job.enqueue(
        active_job,
        scheduled_at: scheduled_at
      )
    elsif will_execute_inline
      job = GoodJob::Job.enqueue(
        active_job,
        scheduled_at: scheduled_at,
        create_with_advisory_lock: true
      )
      InlineBuffer.perform_now_or_defer do
        @capsule.tracker.register do
          perform_inline(job, notify: send_notify?(active_job))
        end
      end
    else
      job = GoodJob::Job.enqueue(
        active_job,
        scheduled_at: scheduled_at
      )

      executed_locally = execute_async? && @capsule&.create_thread(job.job_state)
      Notifier.notify(job.job_state) if !executed_locally && send_notify?(active_job)
    end

    job
  end
end

#execute_async?Boolean

Whether in :async execution mode.



194
195
196
197
# File 'lib/good_job/adapter.rb', line 194

def execute_async?
  execution_mode == :async_all ||
    (execution_mode.in?([:async, :async_server]) && GoodJob.configuration.in_webserver?)
end

#execute_externally?Boolean

Whether in :external execution mode.



201
202
203
204
205
# File 'lib/good_job/adapter.rb', line 201

def execute_externally?
  execution_mode.nil? ||
    execution_mode == :external ||
    (execution_mode.in?([:async, :async_server]) && !GoodJob.configuration.in_webserver?)
end

#execute_inline?Boolean

Whether in :inline execution mode.



209
210
211
# File 'lib/good_job/adapter.rb', line 209

def execute_inline?
  execution_mode == :inline
end

#execution_modeSymbol?

This adapter’s execution mode



188
189
190
# File 'lib/good_job/adapter.rb', line 188

def execution_mode
  @_execution_mode_override || GoodJob.configuration.execution_mode
end

#shutdown(timeout: NONE) ⇒ void

This method returns an undefined value.

Shut down the thread pool executors.



181
182
183
184
# File 'lib/good_job/adapter.rb', line 181

def shutdown(timeout: NONE)
  @capsule&.shutdown(timeout: timeout)
  @_async_started = false
end

#start_asyncvoid

This method returns an undefined value.

Start async executors



215
216
217
218
219
220
# File 'lib/good_job/adapter.rb', line 215

def start_async
  return unless execute_async?

  @capsule.start
  @_async_started = true
end