Class: Resque::Plugins::MultiStepTask

Inherits:
Object
  • Object
show all
Extended by:
AtomicCounters, Constantization
Includes:
Constantization
Defined in:
lib/resque/plugins/multi_step_task.rb,
lib/resque/plugins/multi_step_task/atomic_counters.rb,
lib/resque/plugins/multi_step_task/constantization.rb,
lib/resque/plugins/multi_step_task/finalization_job.rb,
lib/resque/plugins/multi_step_task/assure_finalization.rb

Defined Under Namespace

Modules: AtomicCounters, Constantization Classes: AssureFinalization, FinalizationAlreadyBegun, FinalizationJob, NoSuchMultiStepTask, NotReadyForFinalization, StdOutLogger

Constant Summary collapse

NONCE_CHARS =
('a'..'z').to_a + ('A'..'Z').to_a + ('0'..'9').to_a
@@synchronous =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Constantization

constantize

Methods included from AtomicCounters

counter

Constructor Details

#initialize(task_id) ⇒ MultiStepTask

Initialize a newly instantiated parallel job group.

Parameters:

  • task_id (String)

    The UUID of the group of interest.



179
180
181
182
# File 'lib/resque/plugins/multi_step_task.rb', line 179

def initialize(task_id)
  @task_id = task_id
  redis.setnx 'start-time', Time.now.to_i
end

Instance Attribute Details

#completed_countObject (readonly)

Returns the current value of completed_count.

Returns:

  • (Object)

    the current value of completed_count



16
17
18
# File 'lib/resque/plugins/multi_step_task.rb', line 16

def completed_count
  @completed_count
end

#failed_countObject (readonly)

Returns the current value of failed_count.

Returns:

  • (Object)

    the current value of failed_count



16
17
18
# File 'lib/resque/plugins/multi_step_task.rb', line 16

def failed_count
  @failed_count
end

#finalize_job_countObject (readonly)

Returns the current value of finalize_job_count.

Returns:

  • (Object)

    the current value of finalize_job_count



16
17
18
# File 'lib/resque/plugins/multi_step_task.rb', line 16

def finalize_job_count
  @finalize_job_count
end

#loggerObject

Returns the value of attribute logger.



163
164
165
# File 'lib/resque/plugins/multi_step_task.rb', line 163

def logger
  @logger
end

#normal_job_countObject (readonly)

Returns the current value of normal_job_count.

Returns:

  • (Object)

    the current value of normal_job_count



16
17
18
# File 'lib/resque/plugins/multi_step_task.rb', line 16

def normal_job_count
  @normal_job_count
end

#task_idObject (readonly)

Returns the value of attribute task_id.



162
163
164
# File 'lib/resque/plugins/multi_step_task.rb', line 162

def task_id
  @task_id
end

Class Method Details

.active?(task_id) ⇒ Boolean

Does a task with the specified id exist?

Returns:

  • (Boolean)


42
43
44
# File 'lib/resque/plugins/multi_step_task.rb', line 42

def active?(task_id)
  redis.sismember("active-tasks", task_id)
end

.create(slug = nil) {|multi_step_task| ... } ⇒ MultiStepTask

Create a brand new multi-step-task.

Parameters:

  • slug (#to_s) (defaults to: nil)

    The descriptive slug of the new job. Default: a random UUID

Yields:

  • (multi_step_task)

    A block to define the work to take place in parallel

Yield Parameters:

Returns:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/resque/plugins/multi_step_task.rb', line 56

def create(slug=nil)
  task_id = if slug.nil? || slug.empty?
              "multi-step-task" 
            else
              slug.to_s
            end
  task_id << "~" << nonce
  
  mst = new(task_id)
  mst.nuke
  redis.sadd("active-tasks", task_id)
  if block_given?
    yield mst
    mst.finalizable!
  end
  
  mst
end

.find(task_id) ⇒ MultiStepTask

Find an existing MultiStepTask.

Parameters:

  • task_id (#to_s)

    The unique key for the job group of interest.

Returns:

Raises:



85
86
87
88
89
# File 'lib/resque/plugins/multi_step_task.rb', line 85

def find(task_id)
  raise NoSuchMultiStepTask unless active?(task_id)
  
  mst = new(task_id)
end

.loggerObject



132
133
134
# File 'lib/resque/plugins/multi_step_task.rb', line 132

def logger
  @@logger ||= Logger.new(STDERR)
end

.logger=(logger) ⇒ Object



128
129
130
# File 'lib/resque/plugins/multi_step_task.rb', line 128

def logger=(logger)
  @@logger = logger
end

.mode=(sync_or_async) ⇒ Object

Normally jobs that are part of a multi-step task are run asynchronously by putting them on a queue. However, it is often more convenient to just run the jobs synchronously as they are registered in a development environment. Setting mode to :sync provides a way to do just that.

Parameters:

  • sync_or_async (:sync, :async)


143
144
145
# File 'lib/resque/plugins/multi_step_task.rb', line 143

def mode=(sync_or_async)
  @@synchronous = (sync_or_async == :sync)
end

.nonceObject

A bit of randomness to ensure tasks are uniquely identified.



32
33
34
# File 'lib/resque/plugins/multi_step_task.rb', line 32

def nonce
  5.times.map{NONCE_CHARS.sample}.join
end

.perform(task_id, job_module_name, *args) ⇒ Object

Handle job invocation



92
93
94
95
# File 'lib/resque/plugins/multi_step_task.rb', line 92

def perform(task_id, job_module_name, *args)
  task = perform_without_maybe_finalize(task_id, job_module_name, *args)
  task.maybe_finalize
end

.perform_finalization(task_id, job_module_name, *args) ⇒ Object



124
125
126
# File 'lib/resque/plugins/multi_step_task.rb', line 124

def perform_finalization(task_id, job_module_name, *args)
  perform_without_maybe_finalize(task_id, job_module_name, *args)
end

.perform_without_maybe_finalize(task_id, job_module_name, *args) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/resque/plugins/multi_step_task.rb', line 97

def perform_without_maybe_finalize(task_id, job_module_name, *args)
  task = MultiStepTask.find(task_id)
  begin
    start_time = Time.now
    logger.debug("[Resque Multi-Step-Task] Executing #{job_module_name} job for #{task_id} at #{start_time} (args: #{args})")

    # perform the task
    klass = constantize(job_module_name)
    
    klass.singleton_class.class_eval "def multi_step_task; MultiStepTask.find(multi_step_task_id); end" unless
      klass.singleton_class.method_defined? :multi_step_task
    klass.singleton_class.class_eval "def multi_step_task_id; '#{task_id}'; end"

    klass.perform(*args)

    logger.debug("[Resque Multi-Step-Task] Finished executing #{job_module_name} job for #{task_id} at #{Time.now}, taking #{(Time.now - start_time)} seconds.")
  rescue Exception => e
    logger.error("[Resque Multi-Step-Task] #{job_module_name} job failed for #{task_id} at #{Time.now} (args: #{args})")
    logger.info("[Resque Multi-Step-Task] Incrementing failed_count: #{job_module_name} job failed for task id #{task_id} at #{Time.now} (args: #{args})")
    task.increment_failed_count
    raise
  end
  logger.info("[Resque Multi-Step-Task] Incrementing completed_count: #{job_module_name} job completed for task id #{task_id} at #{Time.now} (args: #{args})")
  task.increment_completed_count
  task
end

.redisObject

A redis client suitable for storing global mutli-step task info.



37
38
39
# File 'lib/resque/plugins/multi_step_task.rb', line 37

def redis
  @redis ||= Redis::Namespace.new("resque:multisteptask", :redis => Resque.redis)
end

.synchronous?Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/resque/plugins/multi_step_task.rb', line 147

def synchronous?
  @@synchronous
end

Instance Method Details

#add_finalization_job(job_type, *args) ⇒ Object

Finalization jobs are performed after all the normal jobs (i.e. the ones registered with #add_job) have been completed. Finalization jobs are performed in the order they are defined.

Parameters:

  • job_type (Class, Module)

    The type of job to be performed.



230
231
232
233
234
235
236
# File 'lib/resque/plugins/multi_step_task.rb', line 230

def add_finalization_job(job_type, *args)
  logger.info("[Resque Multi-Step-Task] Incrementing finalize_job_count: Finalization job #{job_type} for task id #{task_id} at #{Time.now} (args: #{args})")
  increment_finalize_job_count
  logger.debug("[Resque Multi-Step-Task] Adding #{job_type} finalization job for #{task_id} (args: #{args})")

  redis.rpush 'finalize_jobs', Yajl::Encoder.encode([job_type.to_s, *args])
end

#add_job(job_type, *args) ⇒ Object

Add a job to this task

Parameters:

  • job_type (Class, Module)

    The type of the job to be performed.



212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/resque/plugins/multi_step_task.rb', line 212

def add_job(job_type, *args)
  logger.info("[Resque Multi-Step-Task] Incrementing normal_job_count: #{job_type} job added to task id #{task_id} at #{Time.now} (args: #{args})")
  
  increment_normal_job_count
  logger.debug("[Resque Multi-Step-Task] Adding #{job_type} job for #{task_id} (args: #{args})")

  if synchronous?
    self.class.perform(task_id, job_type.to_s, *args)
  else
    Resque::Job.create(queue_name, self.class, task_id, job_type.to_s, *args)
  end
end

#assure_finalizationObject



261
262
263
# File 'lib/resque/plugins/multi_step_task.rb', line 261

def assure_finalization
  Resque::Job.create(queue_name, AssureFinalization, self.task_id)
end

#finalizable!Object

Make this multi-step task finalizable (see #finalizable?).



247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/resque/plugins/multi_step_task.rb', line 247

def finalizable!
  redis.set 'is_finalizable', true
  if synchronous?
    maybe_finalize
  else
    # finalization happens after normal jobs, but in the wierd case where 
    # there are only finalization jobs, we need to add a fake normal job
    # that just kicks off the finalization process
    #
    # due to race conditions, always assure finalization - DCM
    assure_finalization #if normal_job_count == 0
  end
end

#finalizable?Boolean

A multi-step task is finalizable when all the normal jobs (see #add_job) have been registered. Finalization jobs will not be executed until the task becomes finalizable regardless of the number of jobs that have been completed.

Returns:

  • (Boolean)


242
243
244
# File 'lib/resque/plugins/multi_step_task.rb', line 242

def finalizable?
  redis.exists 'is_finalizable'
end

#finalize!Object

Finalize this job group. Finalization entails running all finalization jobs serially in the order they were defined.

Raises:



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/resque/plugins/multi_step_task.rb', line 273

def finalize!
  logger.debug("[Resque Multi-Step-Task] Attempting to finalize #{task_id}")
  raise FinalizationAlreadyBegun unless MultiStepTask.active?(task_id)
  raise NotReadyForFinalization if !ready_for_finalization? || incomplete_because_of_errors?

  # Only one process is allowed to start the finalization
  # process.  This setnx acts a global mutex for other processes
  # that finish about the same time.
  raise FinalizationAlreadyBegun unless redis.setnx("i_am_the_finalizer", 1)
  
  if synchronous?
    sync_finalize!
  else
    if fin_job_info = redis.lpop('finalize_jobs')
      fin_job_info = Yajl::Parser.parse(fin_job_info)
      Resque::Job.create(queue_name, FinalizationJob, self.task_id, *fin_job_info)
    else
      # There is nothing left to do so cleanup.
      logger.debug "[Resque Multi-Step-Task] \"#{task_id}\" finalized successfully at #{Time.now}, taking #{(Time.now - redis.get('start-time').to_i).to_i} seconds."
      nuke
    end
  end
end

#incomplete_because_of_errors?Boolean

If a normal or finalization job fails (i.e. raises an exception) the task as a whole is considered to be incomplete. The finalization sequence will not be performed. If the failure occurred during finalization any remaining finalization job will not be run.

If the failed job is retried and succeeds finalization will proceed at usual.

Returns:

  • (Boolean)


330
331
332
# File 'lib/resque/plugins/multi_step_task.rb', line 330

def incomplete_because_of_errors?
  failed_count > 0 && completed_count < normal_job_count
end

#maybe_finalizeObject

Execute finalization sequence if it is time.



308
309
310
311
312
313
314
315
# File 'lib/resque/plugins/multi_step_task.rb', line 308

def maybe_finalize
  return unless ready_for_finalization? && !incomplete_because_of_errors?
  finalize!
rescue FinalizationAlreadyBegun
  # Just eat it the exception.  Sometimes multiple normal jobs
  # will try to finalize a task simultaneously.  This is
  # expected behavior because normal jobs run in parallel.
end

#nukeObject

Removes all data from redis related to this task.



198
199
200
201
202
# File 'lib/resque/plugins/multi_step_task.rb', line 198

def nuke
  redis.keys('*').each{|k| redis.del k}
  Resque.remove_queue queue_name
  self.class.redis.srem('active-tasks', task_id)
end

#queue_nameObject

The name of the queue for jobs what are part of this task.



205
206
207
# File 'lib/resque/plugins/multi_step_task.rb', line 205

def queue_name
  task_id
end

#ready_for_finalization?Boolean

Is this task at the point where finalization can occur.

Returns:

  • (Boolean)


318
319
320
# File 'lib/resque/plugins/multi_step_task.rb', line 318

def ready_for_finalization?
  finalizable? && completed_count >= normal_job_count
end

#redisObject



188
189
190
# File 'lib/resque/plugins/multi_step_task.rb', line 188

def redis
  @redis ||= Redis::Namespace.new("resque:multisteptask:#{task_id}", :redis => Resque.redis)
end

#sync_finalize!Object



297
298
299
300
301
302
303
304
305
# File 'lib/resque/plugins/multi_step_task.rb', line 297

def sync_finalize!
  while fin_job_info = redis.lpop('finalize_jobs')
    job_class_name, *args = Yajl::Parser.parse(fin_job_info)
    self.class.perform_finalization(task_id, job_class_name, *args)
  end

  logger.debug "[Resque Multi-Step-Task] \"#{task_id}\" finalized successfully at #{Time.now}, taking #{(Time.now - redis.get('start-time').to_i).to_i} seconds."
  nuke
end

#synchronous?Boolean

Returns:

  • (Boolean)


154
155
156
# File 'lib/resque/plugins/multi_step_task.rb', line 154

def synchronous?
  @@synchronous
end

#total_job_countObject

The total number of jobs that are part of this task.



193
194
195
# File 'lib/resque/plugins/multi_step_task.rb', line 193

def total_job_count
  normal_job_count + finalize_job_count
end

#unfinalized_because_of_errors?Boolean

Returns:

  • (Boolean)


334
335
336
# File 'lib/resque/plugins/multi_step_task.rb', line 334

def unfinalized_because_of_errors?
  failed_count > 0 && completed_count < (normal_job_count + finalize_job_count)
end