Class: Resque::Plugins::Stages::StagedJob

Inherits:
Object
  • Object
show all
Includes:
Comparable, RedisAccess
Defined in:
lib/resque/plugins/stages/staged_job.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary

Constants included from RedisAccess

RedisAccess::NAME_SPACE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RedisAccess

#redis

Constructor Details

#initialize(job_id) ⇒ StagedJob

Returns a new instance of StagedJob.



39
40
41
# File 'lib/resque/plugins/stages/staged_job.rb', line 39

def initialize(job_id)
  @job_id = job_id
end

Instance Attribute Details

#class_nameObject



49
50
51
# File 'lib/resque/plugins/stages/staged_job.rb', line 49

def class_name
  @class_name ||= stored_values[:class_name]
end

#job_idObject (readonly)

Returns the value of attribute job_id.



21
22
23
# File 'lib/resque/plugins/stages/staged_job.rb', line 21

def job_id
  @job_id
end

Class Method Details

.create_job(staged_group_stage, klass, *args) ⇒ Object

Creates a job to be queued to Resque that has an ID that we can track its status with.



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/resque/plugins/stages/staged_job.rb', line 26

def create_job(staged_group_stage, klass, *args)
  job = Resque::Plugins::Stages::StagedJob.new(SecureRandom.uuid)

  job.staged_group_stage = staged_group_stage
  job.class_name         = klass.name
  job.args               = args

  job.save!

  job
end

Instance Method Details

#<=>(other) ⇒ Object



43
44
45
46
47
# File 'lib/resque/plugins/stages/staged_job.rb', line 43

def <=>(other)
  return nil unless other.is_a?(Resque::Plugins::Stages::StagedJob)

  job_id <=> other.job_id
end

#argsObject



144
145
146
147
148
149
150
# File 'lib/resque/plugins/stages/staged_job.rb', line 144

def args
  @args = if defined?(@args)
            @args
          else
            decompress_args(Array.wrap(decode_args(stored_values[:args])))
          end
end

#args=(value) ⇒ Object



152
153
154
# File 'lib/resque/plugins/stages/staged_job.rb', line 152

def args=(value)
  @args = value.nil? ? [] : Array.wrap(value).dup
end

#blank?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/resque/plugins/stages/staged_job.rb', line 168

def blank?
  !redis.exists(job_key)
end

#completed?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/resque/plugins/stages/staged_job.rb', line 156

def completed?
  %i[failed successful].include? status
end

#compressed?Boolean

Returns:

  • (Boolean)


179
180
181
# File 'lib/resque/plugins/stages/staged_job.rb', line 179

def compressed?
  compressable? && described_class.compressed?(args)
end

#deleteObject

rubocop:enable Metrics/AbcSize



104
105
106
107
108
109
110
111
# File 'lib/resque/plugins/stages/staged_job.rb', line 104

def delete
  # Make sure the job is loaded into memory so we can use it even though we are going to delete it.
  stored_values

  redis.del(job_key)

  staged_group_stage.remove_job(self)
end

#enqueue_argsObject



128
129
130
# File 'lib/resque/plugins/stages/staged_job.rb', line 128

def enqueue_args
  [klass, *enqueue_compressed_args]
end

#enqueue_compressed_argsObject



132
133
134
135
136
137
138
# File 'lib/resque/plugins/stages/staged_job.rb', line 132

def enqueue_compressed_args
  new_args = compressed_args([{ staged_job_id: job_id }, *args])

  new_args[0][:staged_job_id] = job_id

  new_args
end

#enqueue_jobObject



113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/resque/plugins/stages/staged_job.rb', line 113

def enqueue_job
  case status
    when :pending
      self.status = :queued
      Resque.enqueue(*enqueue_args)

    when :pending_re_run
      Resque.enqueue_delayed_selection do |args|
        # :nocov:
        klass.perform_job(*Array.wrap(args)).job_id == job_id
        # :nocov:
      end
  end
end

#pending?Boolean

Returns:

  • (Boolean)


164
165
166
# File 'lib/resque/plugins/stages/staged_job.rb', line 164

def pending?
  %i[pending pending_re_run].include? status
end

#queue_timeObject



53
54
55
# File 'lib/resque/plugins/stages/staged_job.rb', line 53

def queue_time
  @queue_time ||= stored_values[:queue_time].to_time
end

#queued?Boolean

Returns:

  • (Boolean)


160
161
162
# File 'lib/resque/plugins/stages/staged_job.rb', line 160

def queued?
  %i[queued running pending_re_run].include? status
end

#save!Object

rubocop:disable Metrics/AbcSize



93
94
95
96
97
98
99
100
# File 'lib/resque/plugins/stages/staged_job.rb', line 93

def save!
  redis.hsetnx(job_key, "queue_time", Time.now)
  redis.hset(job_key, "class_name", class_name)
  redis.hset(job_key, "args", encode_args(*compressed_args(args)))
  redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id)
  redis.hset(job_key, "status", status)
  redis.hset(job_key, "status_message", status_message)
end

#staged_group_stageObject



77
78
79
80
81
# File 'lib/resque/plugins/stages/staged_job.rb', line 77

def staged_group_stage
  return nil if staged_group_stage_id.blank?

  @staged_group_stage ||= Resque::Plugins::Stages::StagedGroupStage.new(staged_group_stage_id)
end

#staged_group_stage=(value) ⇒ Object



83
84
85
86
87
88
89
90
# File 'lib/resque/plugins/stages/staged_job.rb', line 83

def staged_group_stage=(value)
  @staged_group_stage    = value
  @staged_group_stage_id = value.group_stage_id

  redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id)

  value.add_job(self)
end

#statusObject



57
58
59
# File 'lib/resque/plugins/stages/staged_job.rb', line 57

def status
  @status ||= stored_values[:status]&.to_sym || :pending
end

#status=(value) ⇒ Object



61
62
63
64
65
66
# File 'lib/resque/plugins/stages/staged_job.rb', line 61

def status=(value)
  @status = value
  redis.hset(job_key, "status", status)

  notify_stage
end

#status_messageObject



68
69
70
# File 'lib/resque/plugins/stages/staged_job.rb', line 68

def status_message
  @status_message ||= stored_values[:status_message]
end

#status_message=(value) ⇒ Object



72
73
74
75
# File 'lib/resque/plugins/stages/staged_job.rb', line 72

def status_message=(value)
  @status_message = value
  redis.hset(job_key, "status_message", status_message)
end

#uncompressed_argsObject



140
141
142
# File 'lib/resque/plugins/stages/staged_job.rb', line 140

def uncompressed_args
  decompress_args(args)
end

#verifyObject



172
173
174
175
176
177
# File 'lib/resque/plugins/stages/staged_job.rb', line 172

def verify
  return build_new_structure if staged_group_stage.blank?

  staged_group_stage.verify
  staged_group_stage.verify_job(self)
end