Module: ActiveJob::Continuable
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/active_job/continuable.rb
Overview
Active Job Continuable
The Continuable module provides the ability to track the progress of your jobs, and continue from where they left off if interrupted.
Mix ActiveJob::Continuable into your job to enable continuations.
See ActiveJob::Continuation for usage.
Instance Attribute Summary collapse
-
#continuation ⇒ Object
:nodoc:.
-
#resumptions ⇒ Object
The number of times the job has been resumed.
Instance Method Summary collapse
-
#checkpoint! ⇒ Object
:nodoc:.
-
#deserialize(job_data) ⇒ Object
:nodoc:.
-
#interrupt!(reason:) ⇒ Object
:nodoc:.
-
#serialize ⇒ Object
:nodoc:.
-
#step(step_name, start: nil, isolated: false, &block) ⇒ Object
Start a new continuation step.
Instance Attribute Details
#continuation ⇒ Object
:nodoc:
33 34 35 |
# File 'lib/active_job/continuable.rb', line 33 def continuation @continuation end |
#resumptions ⇒ Object
The number of times the job has been resumed.
31 32 33 |
# File 'lib/active_job/continuable.rb', line 31 def resumptions @resumptions end |
Instance Method Details
#checkpoint! ⇒ Object
:nodoc:
62 63 64 |
# File 'lib/active_job/continuable.rb', line 62 def checkpoint! # :nodoc: interrupt!(reason: :stopping) if queue_adapter.stopping? end |
#deserialize(job_data) ⇒ Object
:nodoc:
56 57 58 59 60 |
# File 'lib/active_job/continuable.rb', line 56 def deserialize(job_data) # :nodoc: super self.continuation = Continuation.new(self, job_data.fetch("continuation", {})) self.resumptions = job_data.fetch("resumptions", 0) end |
#interrupt!(reason:) ⇒ Object
:nodoc:
66 67 68 69 |
# File 'lib/active_job/continuable.rb', line 66 def interrupt!(reason:) # :nodoc: instrument :interrupt, reason: reason, **continuation.instrumentation raise Continuation::Interrupt, "Interrupted #{continuation.description} (#{reason})" end |
#serialize ⇒ Object
:nodoc:
52 53 54 |
# File 'lib/active_job/continuable.rb', line 52 def serialize # :nodoc: super.merge("continuation" => continuation.to_h, "resumptions" => resumptions) end |
#step(step_name, start: nil, isolated: false, &block) ⇒ Object
Start a new continuation step
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/active_job/continuable.rb', line 36 def step(step_name, start: nil, isolated: false, &block) unless block_given? step_method = method(step_name) raise ArgumentError, "Step method '#{step_name}' must accept 0 or 1 arguments" if step_method.arity > 1 if step_method.parameters.any? { |type, name| type == :key || type == :keyreq } raise ArgumentError, "Step method '#{step_name}' must not accept keyword arguments" end block = step_method.arity == 0 ? -> (_) { step_method.call } : step_method end checkpoint! if continuation.advanced? continuation.step(step_name, start: start, isolated: isolated, &block) end |