Module: RocketJob::Batch::Worker
- Extended by:
- ActiveSupport::Concern
- Included in:
- RocketJob::Batch
- Defined in:
- lib/rocket_job/batch/worker.rb
Instance Method Summary collapse
-
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Array<ActiveWorker>] All workers actively working on this job.
-
#rocket_job_work(worker, re_raise_exceptions = false) ⇒ Object
Processes records in each available slice for this job.
-
#work_first_slice(&block) ⇒ Object
Prior to a job being made available for processing it can be processed one slice at a time.
Instance Method Details
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Array<ActiveWorker>] All workers actively working on this job
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/rocket_job/batch/worker.rb', line 100 def rocket_job_active_workers(server_name = nil) servers = [] case sub_state when :before, :after unless server_name && !worker_on_server?(server_name) servers << ActiveWorker.new(worker_name, started_at, self) if running? end when :processing query = input.running query = query.where(worker_name: /\A#{server_name}/) if server_name query.each do |slice| servers << ActiveWorker.new(slice.worker_name, slice.started_at, self) end end servers end |
#rocket_job_work(worker, re_raise_exceptions = false) ⇒ Object
Processes records in each available slice for this job. Slices are processed one at a time to allow for concurrent calls to this method to increase throughput. Processing will continue until there are no more slices available for this job.
Returns [true|false] whether any work was performed.
Slices are destroyed after their records are successfully processed
Results are stored in the output collection if ‘collect_output?` `nil` results from workers are kept if `collect_nil_output`
If an exception was thrown the entire slice of records is marked as failed.
If the mongo_ha gem has been loaded, then the connection to mongo is automatically re-established and the job will resume anytime a Mongo connection failure occurs.
Thread-safe, can be called by multiple threads at the same time
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/rocket_job/batch/worker.rb', line 36 def rocket_job_work(worker, re_raise_exceptions = false) raise "Job must be started before calling #rocket_job_work" unless running? start_time = Time.now if sub_state != :processing fail_on_exception!(re_raise_exceptions) { rocket_job_batch_callbacks(worker) } return false unless running? end SemanticLogger.named_tagged(job: id.to_s) do until worker.shutdown? if slice = input.next_slice(worker.name) # Grab a slice before checking the throttle to reduce concurrency race condition. return true if slice.fail_on_exception!(re_raise_exceptions) { rocket_job_batch_throttled?(slice, worker) } next if slice.failed? slice.fail_on_exception!(re_raise_exceptions) { rocket_job_process_slice(slice) } elsif record_count && rocket_job_batch_complete?(worker.name) return false else logger.debug "No more work available for this job" worker.add_to_current_filter(throttle_filter_id) return true end # Allow new jobs with a higher priority to interrupt this job break if (Time.now - start_time) >= Config.re_check_seconds end end false end |
#work_first_slice(&block) ⇒ Object
Prior to a job being made available for processing it can be processed one slice at a time.
For example, to extract the header row which would be in the first slice.
Returns [Integer] the number of records processed in the slice
Note: The slice will be removed from processing when this method completes
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/rocket_job/batch/worker.rb', line 76 def work_first_slice(&block) raise "#work_first_slice can only be called from within before_batch callbacks" unless sub_state == :before # TODO: Make these settings configurable count = 0 wait_seconds = 5 while input.first.nil? break if count > 10 logger.info "First slice has not arrived yet, sleeping for #{wait_seconds} seconds" sleep wait_seconds count += 1 end slice = input.first # No records processed return 0 unless slice # TODO: Persist that the first slice is being processed by this worker slice.start rocket_job_process_slice(slice, &block) end |