Module: RocketJob::Plugins::Job::Worker::ClassMethods

Defined in:
lib/rocket_job/plugins/job/worker.rb

Instance Method Summary collapse

Instance Method Details

#perform_later(args, &block) ⇒ Object

Run this job later

Saves it to the database for processing later by workers



14
15
16
17
18
19
20
21
22
23
# File 'lib/rocket_job/plugins/job/worker.rb', line 14

def perform_later(args, &block)
  if RocketJob::Config.inline_mode
    perform_now(args, &block)
  else
    job = new(args)
    block.call(job) if block
    job.save!
    job
  end
end

#perform_now(args, &block) ⇒ Object

Run this job now.

The job is not saved to the database since it is processed entriely in memory As a result before_save and before_destroy callbacks will not be called. Validations are still called however prior to calling #perform



30
31
32
33
34
35
# File 'lib/rocket_job/plugins/job/worker.rb', line 30

def perform_now(args, &block)
  job = new(args)
  block.call(job) if block
  job.perform_now
  job
end

#requeue_dead_server(server_name) ⇒ Object

Requeues all jobs that were running on a server that died



75
76
77
78
79
80
# File 'lib/rocket_job/plugins/job/worker.rb', line 75

def requeue_dead_server(server_name)
  # Need to requeue paused, failed since user may have transitioned job before it finished
  where(:state.in => [:running, :paused, :failed]).each do |job|
    job.requeue!(server_name) if job.may_requeue?(server_name)
  end
end

#rocket_job_next_job(worker_name, filter = {}) ⇒ Object

Returns the next job to work on in priority based order Returns nil if there are currently no queued jobs, or processing batch jobs

with records that require processing

Parameters

worker_name [String]
  Name of the worker that will be processing this job

skip_job_ids [Array<BSON::ObjectId>]
  Job ids to exclude when looking for the next job

Note:

If a job is in queued state it will be started


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/rocket_job/plugins/job/worker.rb', line 50

def rocket_job_next_job(worker_name, filter = {})
  while (job = rocket_job_retrieve(worker_name, filter))
    case
    when job.running?
      # Batch Job
      return job
    when job.expired?
      job.rocket_job_fail_on_exception!(worker_name) { job.destroy }
      logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
    when job.throttle_exceeded?
      # Add jobs filter to the current filter
      throttle_merge_filter(filter, job.throttle_filter)
      # Restore retrieved job so that other workers can process it later
      job.set(worker_name: nil, state: :queued)
    else
      job.worker_name = worker_name
      job.rocket_job_fail_on_exception!(worker_name) do
        defined?(RocketJobPro) ? job.start! : job.start
      end
      return job if job.running?
    end
  end
end