Module: RocketJob::Plugins::Job::Worker::ClassMethods
- Defined in:
- lib/rocket_job/plugins/job/worker.rb
Instance Method Summary collapse
-
#perform_later(args, &block) ⇒ Object
Run this job later.
-
#perform_now(args, &block) ⇒ Object
Run this job now.
-
#requeue_dead_server(server_name) ⇒ Object
Requeues all jobs that were running on a server that died.
-
#rocket_job_next_job(worker_name, filter = {}) ⇒ Object
Returns the next job to work on in priority based order Returns nil if there are currently no queued jobs, or processing batch jobs with records that require processing.
Instance Method Details
#perform_later(args, &block) ⇒ Object
Run this job later
Saves it to the database for processing later by workers
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 14 def perform_later(args, &block) if RocketJob::Config.inline_mode perform_now(args, &block) else job = new(args) block.call(job) if block job.save! job end end |
#perform_now(args, &block) ⇒ Object
Run this job now.
The job is not saved to the database since it is processed entriely in memory As a result before_save and before_destroy callbacks will not be called. Validations are still called however prior to calling #perform
30 31 32 33 34 35 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 30 def perform_now(args, &block) job = new(args) block.call(job) if block job.perform_now job end |
#requeue_dead_server(server_name) ⇒ Object
Requeues all jobs that were running on a server that died
75 76 77 78 79 80 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 75 def requeue_dead_server(server_name) # Need to requeue paused, failed since user may have transitioned job before it finished where(:state.in => [:running, :paused, :failed]).each do |job| job.requeue!(server_name) if job.may_requeue?(server_name) end end |
#rocket_job_next_job(worker_name, filter = {}) ⇒ Object
Returns the next job to work on in priority based order Returns nil if there are currently no queued jobs, or processing batch jobs
with records that require processing
Parameters
worker_name [String]
Name of the worker that will be processing this job
skip_job_ids [Array<BSON::ObjectId>]
Job ids to exclude when looking for the next job
Note:
If a job is in queued state it will be started
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 50 def rocket_job_next_job(worker_name, filter = {}) while (job = rocket_job_retrieve(worker_name, filter)) case when job.running? # Batch Job return job when job.expired? job.rocket_job_fail_on_exception!(worker_name) { job.destroy } logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" when job.throttle_exceeded? # Add jobs filter to the current filter throttle_merge_filter(filter, job.throttle_filter) # Restore retrieved job so that other workers can process it later job.set(worker_name: nil, state: :queued) else job.worker_name = worker_name job.rocket_job_fail_on_exception!(worker_name) do defined?(RocketJobPro) ? job.start! : job.start end return job if job.running? end end end |