Class: Massive::Job

Inherits:
Object
  • Object
show all
Includes:
Cancelling, MemoryConsumption, Retry, Status, TimingSupport, Mongoid::Document, Mongoid::Timestamps
Defined in:
lib/massive/job.rb

Direct Known Subclasses

FileJob

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Cancelling

#cancelling

Methods included from Retry

#retrying

Methods included from TimingSupport

#elapsed_time

Methods included from MemoryConsumption

#current_memory_consumption

Methods included from Status

#completed?, #enqueued?, #failed?, #start!, #started?

Class Method Details

.cancel_when_failed(value = nil) ⇒ Object



46
47
48
49
# File 'lib/massive/job.rb', line 46

def self.cancel_when_failed(value=nil)
  @cancel_when_failed = value if !value.nil?
  @cancel_when_failed
end

.perform(process_id, step_id, job_id) ⇒ Object



24
25
26
# File 'lib/massive/job.rb', line 24

def self.perform(process_id, step_id, job_id)
  Massive::Process.find_job(process_id, step_id, job_id).work
end

.queueObject



28
29
30
31
32
33
34
# File 'lib/massive/job.rb', line 28

def self.queue
  if split_jobs
    :"#{queue_prefix}_#{Kernel.rand(split_jobs) + 1}"
  else
    queue_prefix
  end
end

.queue_prefix(value = nil) ⇒ Object



36
37
38
39
# File 'lib/massive/job.rb', line 36

def self.queue_prefix(value=nil)
  @queue_prefix = value if !value.nil?
  @queue_prefix || :massive_job
end

.split_jobs(value = nil) ⇒ Object



41
42
43
44
# File 'lib/massive/job.rb', line 41

def self.split_jobs(value=nil)
  @split_jobs = value if !value.nil?
  @split_jobs.nil? ? Massive.split_jobs : @split_jobs
end

Instance Method Details

#each_item(&block) ⇒ Object



83
84
85
# File 'lib/massive/job.rb', line 83

def each_item(&block)
  # iterate through each item within offset/limit range
end

#enqueueObject



51
52
53
# File 'lib/massive/job.rb', line 51

def enqueue
  Resque.enqueue(self.class, process.id.to_s, step.id.to_s, id.to_s)
end

#finish!Object



77
78
79
80
81
# File 'lib/massive/job.rb', line 77

def finish!
  update_attributes(finished_at: Time.now, memory_consumption: current_memory_consumption)

  step.complete
end

#process_each(item, index) ⇒ Object



87
88
89
# File 'lib/massive/job.rb', line 87

def process_each(item, index)
  # process an item
end

#workObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/massive/job.rb', line 55

def work
  handle_errors do
    cancelling do
      start!

      run_callbacks :work do
        each_item do |item, index|
          retrying do
            cancelling do
              process_each(item, index)
              increment_processed
              notify(:progress)
            end
          end
        end
      end

      finish!
    end
  end
end