Class: Resque::Plugins::MultiStepTask
- Inherits:
-
Object
- Object
- Resque::Plugins::MultiStepTask
- Extended by:
- AtomicCounters, Constantization
- Includes:
- Constantization
- Defined in:
- lib/resque/plugins/multi_step_task.rb,
lib/resque/plugins/multi_step_task/atomic_counters.rb,
lib/resque/plugins/multi_step_task/constantization.rb,
lib/resque/plugins/multi_step_task/finalization_job.rb,
lib/resque/plugins/multi_step_task/assure_finalization.rb
Defined Under Namespace
Modules: AtomicCounters, Constantization Classes: AssureFinalization, FinalizationAlreadyBegun, FinalizationJob, NoSuchMultiStepTask, NotReadyForFinalization, StdOutLogger
Constant Summary collapse
- NONCE_CHARS =
('a'..'z').to_a + ('A'..'Z').to_a + ('0'..'9').to_a
- @@synchronous =
false
Instance Attribute Summary collapse
-
#completed_count ⇒ Object
readonly
The current value of completed_count.
-
#failed_count ⇒ Object
readonly
The current value of failed_count.
-
#finalize_job_count ⇒ Object
readonly
The current value of finalize_job_count.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#normal_job_count ⇒ Object
readonly
The current value of normal_job_count.
-
#task_id ⇒ Object
readonly
Returns the value of attribute task_id.
Class Method Summary collapse
-
.active?(task_id) ⇒ Boolean
Does a task with the specified id exist?.
-
.create(slug = nil) {|multi_step_task| ... } ⇒ MultiStepTask
Create a brand new multi-step-task.
-
.find(task_id) ⇒ MultiStepTask
Find an existing MultiStepTask.
- .logger ⇒ Object
- .logger=(logger) ⇒ Object
-
.mode=(sync_or_async) ⇒ Object
Normally jobs that are part of a multi-step task are run asynchronously by putting them on a queue.
-
.nonce ⇒ Object
A bit of randomness to ensure tasks are uniquely identified.
-
.perform(task_id, job_module_name, *args) ⇒ Object
Handle job invocation.
- .perform_finalization(task_id, job_module_name, *args) ⇒ Object
- .perform_without_maybe_finalize(task_id, job_module_name, *args) ⇒ Object
-
.redis ⇒ Object
A redis client suitable for storing global mutli-step task info.
- .synchronous? ⇒ Boolean
Instance Method Summary collapse
-
#add_finalization_job(job_type, *args) ⇒ Object
Finalization jobs are performed after all the normal jobs (i.e. the ones registered with #add_job) have been completed.
-
#add_job(job_type, *args) ⇒ Object
Add a job to this task.
- #assure_finalization ⇒ Object
-
#finalizable! ⇒ Object
Make this multi-step task finalizable (see #finalizable?).
-
#finalizable? ⇒ Boolean
A multi-step task is finalizable when all the normal jobs (see #add_job) have been registered.
-
#finalize! ⇒ Object
Finalize this job group.
-
#incomplete_because_of_errors? ⇒ Boolean
If a normal or finalization job fails (i.e. raises an exception) the task as a whole is considered to be incomplete.
-
#initialize(task_id) ⇒ MultiStepTask
constructor
Initialize a newly instantiated parallel job group.
-
#maybe_finalize ⇒ Object
Execute finalization sequence if it is time.
-
#nuke ⇒ Object
Removes all data from redis related to this task.
-
#queue_name ⇒ Object
The name of the queue for jobs what are part of this task.
-
#ready_for_finalization? ⇒ Boolean
Is this task at the point where finalization can occur.
- #redis ⇒ Object
- #sync_finalize! ⇒ Object
- #synchronous? ⇒ Boolean
-
#total_job_count ⇒ Object
The total number of jobs that are part of this task.
- #unfinalized_because_of_errors? ⇒ Boolean
Methods included from Constantization
Methods included from AtomicCounters
Constructor Details
#initialize(task_id) ⇒ MultiStepTask
Initialize a newly instantiated parallel job group.
179 180 181 182 |
# File 'lib/resque/plugins/multi_step_task.rb', line 179 def initialize(task_id) @task_id = task_id redis.setnx 'start-time', Time.now.to_i end |
Instance Attribute Details
#completed_count ⇒ Object (readonly)
Returns the current value of completed_count.
16 17 18 |
# File 'lib/resque/plugins/multi_step_task.rb', line 16 def completed_count @completed_count end |
#failed_count ⇒ Object (readonly)
Returns the current value of failed_count.
16 17 18 |
# File 'lib/resque/plugins/multi_step_task.rb', line 16 def failed_count @failed_count end |
#finalize_job_count ⇒ Object (readonly)
Returns the current value of finalize_job_count.
16 17 18 |
# File 'lib/resque/plugins/multi_step_task.rb', line 16 def finalize_job_count @finalize_job_count end |
#logger ⇒ Object
Returns the value of attribute logger.
163 164 165 |
# File 'lib/resque/plugins/multi_step_task.rb', line 163 def logger @logger end |
#normal_job_count ⇒ Object (readonly)
Returns the current value of normal_job_count.
16 17 18 |
# File 'lib/resque/plugins/multi_step_task.rb', line 16 def normal_job_count @normal_job_count end |
#task_id ⇒ Object (readonly)
Returns the value of attribute task_id.
162 163 164 |
# File 'lib/resque/plugins/multi_step_task.rb', line 162 def task_id @task_id end |
Class Method Details
.active?(task_id) ⇒ Boolean
Does a task with the specified id exist?
42 43 44 |
# File 'lib/resque/plugins/multi_step_task.rb', line 42 def active?(task_id) redis.sismember("active-tasks", task_id) end |
.create(slug = nil) {|multi_step_task| ... } ⇒ MultiStepTask
Create a brand new multi-step-task.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/resque/plugins/multi_step_task.rb', line 56 def create(slug=nil) task_id = if slug.nil? || slug.empty? "multi-step-task" else slug.to_s end task_id << "~" << nonce mst = new(task_id) mst.nuke redis.sadd("active-tasks", task_id) if block_given? yield mst mst.finalizable! end mst end |
.find(task_id) ⇒ MultiStepTask
Find an existing MultiStepTask.
85 86 87 88 89 |
# File 'lib/resque/plugins/multi_step_task.rb', line 85 def find(task_id) raise NoSuchMultiStepTask unless active?(task_id) mst = new(task_id) end |
.logger ⇒ Object
132 133 134 |
# File 'lib/resque/plugins/multi_step_task.rb', line 132 def logger @@logger ||= Logger.new(STDERR) end |
.logger=(logger) ⇒ Object
128 129 130 |
# File 'lib/resque/plugins/multi_step_task.rb', line 128 def logger=(logger) @@logger = logger end |
.mode=(sync_or_async) ⇒ Object
Normally jobs that are part of a multi-step task are run asynchronously by putting them on a queue. However, it is often more convenient to just run the jobs synchronously as they are registered in a development environment. Setting mode to :sync provides a way to do just that.
143 144 145 |
# File 'lib/resque/plugins/multi_step_task.rb', line 143 def mode=(sync_or_async) @@synchronous = (sync_or_async == :sync) end |
.nonce ⇒ Object
A bit of randomness to ensure tasks are uniquely identified.
32 33 34 |
# File 'lib/resque/plugins/multi_step_task.rb', line 32 def nonce 5.times.map{NONCE_CHARS.sample}.join end |
.perform(task_id, job_module_name, *args) ⇒ Object
Handle job invocation
92 93 94 95 |
# File 'lib/resque/plugins/multi_step_task.rb', line 92 def perform(task_id, job_module_name, *args) task = perform_without_maybe_finalize(task_id, job_module_name, *args) task.maybe_finalize end |
.perform_finalization(task_id, job_module_name, *args) ⇒ Object
124 125 126 |
# File 'lib/resque/plugins/multi_step_task.rb', line 124 def perform_finalization(task_id, job_module_name, *args) perform_without_maybe_finalize(task_id, job_module_name, *args) end |
.perform_without_maybe_finalize(task_id, job_module_name, *args) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/resque/plugins/multi_step_task.rb', line 97 def perform_without_maybe_finalize(task_id, job_module_name, *args) task = MultiStepTask.find(task_id) begin start_time = Time.now logger.debug("[Resque Multi-Step-Task] Executing #{job_module_name} job for #{task_id} at #{start_time} (args: #{args})") # perform the task klass = constantize(job_module_name) klass.singleton_class.class_eval "def multi_step_task; MultiStepTask.find(multi_step_task_id); end" unless klass.singleton_class.method_defined? :multi_step_task klass.singleton_class.class_eval "def multi_step_task_id; '#{task_id}'; end" klass.perform(*args) logger.debug("[Resque Multi-Step-Task] Finished executing #{job_module_name} job for #{task_id} at #{Time.now}, taking #{(Time.now - start_time)} seconds.") rescue Exception => e logger.error("[Resque Multi-Step-Task] #{job_module_name} job failed for #{task_id} at #{Time.now} (args: #{args})") logger.info("[Resque Multi-Step-Task] Incrementing failed_count: #{job_module_name} job failed for task id #{task_id} at #{Time.now} (args: #{args})") task.increment_failed_count raise end logger.info("[Resque Multi-Step-Task] Incrementing completed_count: #{job_module_name} job completed for task id #{task_id} at #{Time.now} (args: #{args})") task.increment_completed_count task end |
.redis ⇒ Object
A redis client suitable for storing global mutli-step task info.
37 38 39 |
# File 'lib/resque/plugins/multi_step_task.rb', line 37 def redis @redis ||= Redis::Namespace.new("resque:multisteptask", :redis => Resque.redis) end |
.synchronous? ⇒ Boolean
147 148 149 |
# File 'lib/resque/plugins/multi_step_task.rb', line 147 def synchronous? @@synchronous end |
Instance Method Details
#add_finalization_job(job_type, *args) ⇒ Object
Finalization jobs are performed after all the normal jobs (i.e. the ones registered with #add_job) have been completed. Finalization jobs are performed in the order they are defined.
230 231 232 233 234 235 236 |
# File 'lib/resque/plugins/multi_step_task.rb', line 230 def add_finalization_job(job_type, *args) logger.info("[Resque Multi-Step-Task] Incrementing finalize_job_count: Finalization job #{job_type} for task id #{task_id} at #{Time.now} (args: #{args})") increment_finalize_job_count logger.debug("[Resque Multi-Step-Task] Adding #{job_type} finalization job for #{task_id} (args: #{args})") redis.rpush 'finalize_jobs', Yajl::Encoder.encode([job_type.to_s, *args]) end |
#add_job(job_type, *args) ⇒ Object
Add a job to this task
212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/resque/plugins/multi_step_task.rb', line 212 def add_job(job_type, *args) logger.info("[Resque Multi-Step-Task] Incrementing normal_job_count: #{job_type} job added to task id #{task_id} at #{Time.now} (args: #{args})") increment_normal_job_count logger.debug("[Resque Multi-Step-Task] Adding #{job_type} job for #{task_id} (args: #{args})") if synchronous? self.class.perform(task_id, job_type.to_s, *args) else Resque::Job.create(queue_name, self.class, task_id, job_type.to_s, *args) end end |
#assure_finalization ⇒ Object
261 262 263 |
# File 'lib/resque/plugins/multi_step_task.rb', line 261 def assure_finalization Resque::Job.create(queue_name, AssureFinalization, self.task_id) end |
#finalizable! ⇒ Object
Make this multi-step task finalizable (see #finalizable?).
247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/resque/plugins/multi_step_task.rb', line 247 def finalizable! redis.set 'is_finalizable', true if synchronous? maybe_finalize else # finalization happens after normal jobs, but in the wierd case where # there are only finalization jobs, we need to add a fake normal job # that just kicks off the finalization process # # due to race conditions, always assure finalization - DCM assure_finalization #if normal_job_count == 0 end end |
#finalizable? ⇒ Boolean
A multi-step task is finalizable when all the normal jobs (see #add_job) have been registered. Finalization jobs will not be executed until the task becomes finalizable regardless of the number of jobs that have been completed.
242 243 244 |
# File 'lib/resque/plugins/multi_step_task.rb', line 242 def finalizable? redis.exists 'is_finalizable' end |
#finalize! ⇒ Object
Finalize this job group. Finalization entails running all finalization jobs serially in the order they were defined.
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/resque/plugins/multi_step_task.rb', line 273 def finalize! logger.debug("[Resque Multi-Step-Task] Attempting to finalize #{task_id}") raise FinalizationAlreadyBegun unless MultiStepTask.active?(task_id) raise NotReadyForFinalization if !ready_for_finalization? || incomplete_because_of_errors? # Only one process is allowed to start the finalization # process. This setnx acts a global mutex for other processes # that finish about the same time. raise FinalizationAlreadyBegun unless redis.setnx("i_am_the_finalizer", 1) if synchronous? sync_finalize! else if fin_job_info = redis.lpop('finalize_jobs') fin_job_info = Yajl::Parser.parse(fin_job_info) Resque::Job.create(queue_name, FinalizationJob, self.task_id, *fin_job_info) else # There is nothing left to do so cleanup. logger.debug "[Resque Multi-Step-Task] \"#{task_id}\" finalized successfully at #{Time.now}, taking #{(Time.now - redis.get('start-time').to_i).to_i} seconds." nuke end end end |
#incomplete_because_of_errors? ⇒ Boolean
If a normal or finalization job fails (i.e. raises an exception) the task as a whole is considered to be incomplete. The finalization sequence will not be performed. If the failure occurred during finalization any remaining finalization job will not be run.
If the failed job is retried and succeeds finalization will proceed at usual.
330 331 332 |
# File 'lib/resque/plugins/multi_step_task.rb', line 330 def incomplete_because_of_errors? failed_count > 0 && completed_count < normal_job_count end |
#maybe_finalize ⇒ Object
Execute finalization sequence if it is time.
308 309 310 311 312 313 314 315 |
# File 'lib/resque/plugins/multi_step_task.rb', line 308 def maybe_finalize return unless ready_for_finalization? && !incomplete_because_of_errors? finalize! rescue FinalizationAlreadyBegun # Just eat it the exception. Sometimes multiple normal jobs # will try to finalize a task simultaneously. This is # expected behavior because normal jobs run in parallel. end |
#nuke ⇒ Object
Removes all data from redis related to this task.
198 199 200 201 202 |
# File 'lib/resque/plugins/multi_step_task.rb', line 198 def nuke redis.keys('*').each{|k| redis.del k} Resque.remove_queue queue_name self.class.redis.srem('active-tasks', task_id) end |
#queue_name ⇒ Object
The name of the queue for jobs what are part of this task.
205 206 207 |
# File 'lib/resque/plugins/multi_step_task.rb', line 205 def queue_name task_id end |
#ready_for_finalization? ⇒ Boolean
Is this task at the point where finalization can occur.
318 319 320 |
# File 'lib/resque/plugins/multi_step_task.rb', line 318 def ready_for_finalization? finalizable? && completed_count >= normal_job_count end |
#redis ⇒ Object
188 189 190 |
# File 'lib/resque/plugins/multi_step_task.rb', line 188 def redis @redis ||= Redis::Namespace.new("resque:multisteptask:#{task_id}", :redis => Resque.redis) end |
#sync_finalize! ⇒ Object
297 298 299 300 301 302 303 304 305 |
# File 'lib/resque/plugins/multi_step_task.rb', line 297 def sync_finalize! while fin_job_info = redis.lpop('finalize_jobs') job_class_name, *args = Yajl::Parser.parse(fin_job_info) self.class.perform_finalization(task_id, job_class_name, *args) end logger.debug "[Resque Multi-Step-Task] \"#{task_id}\" finalized successfully at #{Time.now}, taking #{(Time.now - redis.get('start-time').to_i).to_i} seconds." nuke end |
#synchronous? ⇒ Boolean
154 155 156 |
# File 'lib/resque/plugins/multi_step_task.rb', line 154 def synchronous? @@synchronous end |
#total_job_count ⇒ Object
The total number of jobs that are part of this task.
193 194 195 |
# File 'lib/resque/plugins/multi_step_task.rb', line 193 def total_job_count normal_job_count + finalize_job_count end |
#unfinalized_because_of_errors? ⇒ Boolean
334 335 336 |
# File 'lib/resque/plugins/multi_step_task.rb', line 334 def unfinalized_because_of_errors? failed_count > 0 && completed_count < (normal_job_count + finalize_job_count) end |