Class: Resque::Plugins::Stages::StagedJob
- Inherits:
-
Object
- Object
- Resque::Plugins::Stages::StagedJob
- Includes:
- Comparable, RedisAccess
- Defined in:
- lib/resque/plugins/stages/staged_job.rb
Overview
rubocop:disable Metrics/ClassLength
Constant Summary
Constants included from RedisAccess
Instance Attribute Summary collapse
- #class_name ⇒ Object
-
#job_id ⇒ Object
readonly
Returns the value of attribute job_id.
Class Method Summary collapse
-
.create_job(staged_group_stage, klass, *args) ⇒ Object
Creates a job to be queued to Resque that has an ID that we can track its status with.
Instance Method Summary collapse
- #<=>(other) ⇒ Object
- #args ⇒ Object
- #args=(value) ⇒ Object
- #blank? ⇒ Boolean
- #completed? ⇒ Boolean
- #compressed? ⇒ Boolean
-
#delete ⇒ Object
rubocop:enable Metrics/AbcSize.
- #enqueue_args ⇒ Object
- #enqueue_compressed_args ⇒ Object
- #enqueue_job ⇒ Object
-
#initialize(job_id) ⇒ StagedJob
constructor
A new instance of StagedJob.
- #pending? ⇒ Boolean
- #queue_time ⇒ Object
- #queued? ⇒ Boolean
-
#save! ⇒ Object
rubocop:disable Metrics/AbcSize.
- #staged_group_stage ⇒ Object
- #staged_group_stage=(value) ⇒ Object
- #status ⇒ Object
- #status=(value) ⇒ Object
- #status_message ⇒ Object
- #status_message=(value) ⇒ Object
- #uncompressed_args ⇒ Object
- #verify ⇒ Object
Methods included from RedisAccess
Constructor Details
#initialize(job_id) ⇒ StagedJob
Returns a new instance of StagedJob.
39 40 41 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 39 def initialize(job_id) @job_id = job_id end |
Instance Attribute Details
#class_name ⇒ Object
49 50 51 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 49 def class_name @class_name ||= stored_values[:class_name] end |
#job_id ⇒ Object (readonly)
Returns the value of attribute job_id.
21 22 23 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 21 def job_id @job_id end |
Class Method Details
.create_job(staged_group_stage, klass, *args) ⇒ Object
Creates a job to be queued to Resque that has an ID that we can track its status with.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 26 def create_job(staged_group_stage, klass, *args) job = Resque::Plugins::Stages::StagedJob.new(SecureRandom.uuid) job.staged_group_stage = staged_group_stage job.class_name = klass.name job.args = args job.save! job end |
Instance Method Details
#<=>(other) ⇒ Object
43 44 45 46 47 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 43 def <=>(other) return nil unless other.is_a?(Resque::Plugins::Stages::StagedJob) job_id <=> other.job_id end |
#args ⇒ Object
144 145 146 147 148 149 150 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 144 def args @args = if defined?(@args) @args else decompress_args(Array.wrap(decode_args(stored_values[:args]))) end end |
#args=(value) ⇒ Object
152 153 154 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 152 def args=(value) @args = value.nil? ? [] : Array.wrap(value).dup end |
#blank? ⇒ Boolean
168 169 170 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 168 def blank? !redis.exists(job_key) end |
#completed? ⇒ Boolean
156 157 158 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 156 def completed? %i[failed successful].include? status end |
#compressed? ⇒ Boolean
179 180 181 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 179 def compressed? compressable? && described_class.compressed?(args) end |
#delete ⇒ Object
rubocop:enable Metrics/AbcSize
104 105 106 107 108 109 110 111 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 104 def delete # Make sure the job is loaded into memory so we can use it even though we are going to delete it. stored_values redis.del(job_key) staged_group_stage.remove_job(self) end |
#enqueue_args ⇒ Object
128 129 130 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 128 def enqueue_args [klass, *enqueue_compressed_args] end |
#enqueue_compressed_args ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 132 def enqueue_compressed_args new_args = compressed_args([{ staged_job_id: job_id }, *args]) new_args[0][:staged_job_id] = job_id new_args end |
#enqueue_job ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 113 def enqueue_job case status when :pending self.status = :queued Resque.enqueue(*enqueue_args) when :pending_re_run Resque.enqueue_delayed_selection do |args| # :nocov: klass.perform_job(*Array.wrap(args)).job_id == job_id # :nocov: end end end |
#pending? ⇒ Boolean
164 165 166 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 164 def pending? %i[pending pending_re_run].include? status end |
#queue_time ⇒ Object
53 54 55 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 53 def queue_time @queue_time ||= stored_values[:queue_time].to_time end |
#queued? ⇒ Boolean
160 161 162 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 160 def queued? %i[queued running pending_re_run].include? status end |
#save! ⇒ Object
rubocop:disable Metrics/AbcSize
93 94 95 96 97 98 99 100 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 93 def save! redis.hsetnx(job_key, "queue_time", Time.now) redis.hset(job_key, "class_name", class_name) redis.hset(job_key, "args", encode_args(*compressed_args(args))) redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id) redis.hset(job_key, "status", status) redis.hset(job_key, "status_message", ) end |
#staged_group_stage ⇒ Object
77 78 79 80 81 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 77 def staged_group_stage return nil if staged_group_stage_id.blank? @staged_group_stage ||= Resque::Plugins::Stages::StagedGroupStage.new(staged_group_stage_id) end |
#staged_group_stage=(value) ⇒ Object
83 84 85 86 87 88 89 90 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 83 def staged_group_stage=(value) @staged_group_stage = value @staged_group_stage_id = value.group_stage_id redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id) value.add_job(self) end |
#status ⇒ Object
57 58 59 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 57 def status @status ||= stored_values[:status]&.to_sym || :pending end |
#status=(value) ⇒ Object
61 62 63 64 65 66 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 61 def status=(value) @status = value redis.hset(job_key, "status", status) notify_stage end |
#status_message ⇒ Object
68 69 70 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 68 def @status_message ||= stored_values[:status_message] end |
#status_message=(value) ⇒ Object
72 73 74 75 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 72 def (value) @status_message = value redis.hset(job_key, "status_message", ) end |
#uncompressed_args ⇒ Object
140 141 142 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 140 def uncompressed_args decompress_args(args) end |
#verify ⇒ Object
172 173 174 175 176 177 |
# File 'lib/resque/plugins/stages/staged_job.rb', line 172 def verify return build_new_structure if staged_group_stage.blank? staged_group_stage.verify staged_group_stage.verify_job(self) end |