Module: Resque::Plugins::BatchedJob

Defined in:
lib/resque/plugins/batched_job.rb,
lib/resque/plugins/batched_job/version.rb

Constant Summary collapse

VERSION =
'1.9.0'

Instance Method Summary collapse

Instance Method Details

#after_dequeue_batch(id, *args) ⇒ Object

After a job is removed, also remove it from the batch.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s



58
59
60
# File 'lib/resque/plugins/batched_job.rb', line 58

def after_dequeue_batch(id, *args)
  remove_batched_job(id, *args)
end

#after_enqueue_batch(id, *args) ⇒ Object

Resque hook that handles batching the job. (closes #2)

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s



34
35
36
37
38
# File 'lib/resque/plugins/batched_job.rb', line 34

def after_enqueue_batch(id, *args)
  mutex(id) do |bid|
    redis.rpush(bid, Resque.encode(:class => self.name, :args => args))
  end
end

#after_perform_batch(id, *args) ⇒ Object

After the job is performed, remove it from the batched job list. If the current job is the last in the batch to be performed, invoke the after_batch hooks.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s



45
46
47
48
49
50
51
52
53
# File 'lib/resque/plugins/batched_job.rb', line 45

def after_perform_batch(id, *args)
  if remove_batched_job(id, *args) == 0
    
    after_batch_hooks = Resque::Plugin.after_batch_hooks(self)
    after_batch_hooks.each do |hook|
      send(hook, id, *args)
    end
  end
end

#batch(id) ⇒ String

Helper method used to generate the batch key.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s

Returns:

  • (String)

    Used to identify batch Redis List key



27
28
29
# File 'lib/resque/plugins/batched_job.rb', line 27

def batch(id)
  "batch:#{id}"
end

#batch_complete?(id) ⇒ Boolean

Checks the size of the batched job list and returns true if the list is empty or if the key does not exist.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s

Returns:

  • (Boolean)


66
67
68
69
70
# File 'lib/resque/plugins/batched_job.rb', line 66

def batch_complete?(id)
  mutex(id) do |bid|
    redis.llen(bid) == 0
  end
end

#batch_exist?(id) ⇒ Boolean

Check to see if the Redis key exists.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s

Returns:

  • (Boolean)


75
76
77
78
79
# File 'lib/resque/plugins/batched_job.rb', line 75

def batch_exist?(id)
  mutex(id) do |bid|
    redis.exists(bid)
  end
end

#batched_jobs(id) ⇒ Object

Build a collection of Resque::Job objects that represent each job in a batch of the same class.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s



106
107
108
109
110
111
112
113
114
# File 'lib/resque/plugins/batched_job.rb', line 106

def batched_jobs(id)
  bid = batch(id)
  regexp = /\A\{\"class\":\"#{self.name}\",\"args\":\[/
  redis.lrange(bid, 0, redis.llen(bid)-1).grep(regexp).map do |string|
    payload = Resque.decode(string)
    payload['args'].unshift(id)
    Resque::Job.new(@queue, payload)
  end
end

#recreate_batched_jobs(id) ⇒ Object

Collect and recreate all jobs for the given batch.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s



120
121
122
123
124
# File 'lib/resque/plugins/batched_job.rb', line 120

def recreate_batched_jobs(id)
  batched_jobs(id).each do |job|
    job.recreate
  end.size
end

#remove_batched_job(id, *args) ⇒ Object

Remove a job from the batch list. (closes #6)

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s



84
85
86
87
88
89
90
91
92
# File 'lib/resque/plugins/batched_job.rb', line 84

def remove_batched_job(id, *args)
  mutex(id) do |bid|
    removed_count = redis.lrem(bid, 1, Resque.encode(:class => self.name, :args => args))

    raise "Failed to remove batched job, id: #{id}, args: #{args.join(', ')}" if removed_count != 1

    redis.llen(bid)
  end
end

#remove_batched_job!(id, *args) ⇒ Object

Remove a job from the batch list and run after hooks if necessary.

Parameters:

  • id (Object, #to_s)

    Batch identifier. Any Object that responds to #to_s



97
98
99
# File 'lib/resque/plugins/batched_job.rb', line 97

def remove_batched_job!(id, *args)
  after_perform_batch(id, *args)
end