Module: RocketJob::Batch::Worker

Extended by:
ActiveSupport::Concern
Included in:
RocketJob::Batch
Defined in:
lib/rocket_job/batch/worker.rb

Instance Method Summary collapse

Instance Method Details

#rocket_job_active_workers(server_name = nil) ⇒ Object

Returns [Array<ActiveWorker>] All workers actively working on this job



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/rocket_job/batch/worker.rb', line 100

def rocket_job_active_workers(server_name = nil)
  servers = []
  case sub_state
  when :before, :after
    unless server_name && !worker_on_server?(server_name)
      servers << ActiveWorker.new(worker_name, started_at, self) if running?
    end
  when :processing
    query = input.running
    query = query.where(worker_name: /\A#{server_name}/) if server_name
    query.each do |slice|
      servers << ActiveWorker.new(slice.worker_name, slice.started_at, self)
    end
  end
  servers
end

#rocket_job_work(worker, re_raise_exceptions = false) ⇒ Object

Processes records in each available slice for this job. Slices are processed one at a time to allow for concurrent calls to this method to increase throughput. Processing will continue until there are no more slices available for this job.

Returns [true|false] whether any work was performed.

Slices are destroyed after their records are successfully processed

Results are stored in the output collection if ‘collect_output?` `nil` results from workers are kept if `collect_nil_output`

If an exception was thrown the entire slice of records is marked as failed.

If the mongo_ha gem has been loaded, then the connection to mongo is automatically re-established and the job will resume anytime a Mongo connection failure occurs.

Thread-safe, can be called by multiple threads at the same time



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/rocket_job/batch/worker.rb', line 36

def rocket_job_work(worker, re_raise_exceptions = false)
  raise "Job must be started before calling #rocket_job_work" unless running?

  start_time = Time.now
  if sub_state != :processing
    fail_on_exception!(re_raise_exceptions) { rocket_job_batch_callbacks(worker) }
    return false unless running?
  end

  SemanticLogger.named_tagged(job: id.to_s) do
    until worker.shutdown?
      if slice = input.next_slice(worker.name)
        # Grab a slice before checking the throttle to reduce concurrency race condition.
        return true if slice.fail_on_exception!(re_raise_exceptions) { rocket_job_batch_throttled?(slice, worker) }
        next if slice.failed?

        slice.fail_on_exception!(re_raise_exceptions) { rocket_job_process_slice(slice) }
      elsif record_count && rocket_job_batch_complete?(worker.name)
        return false
      else
        logger.debug "No more work available for this job"
        worker.add_to_current_filter(throttle_filter_id)
        return true
      end

      # Allow new jobs with a higher priority to interrupt this job
      break if (Time.now - start_time) >= Config.re_check_seconds
    end
  end
  false
end

#work_first_slice(&block) ⇒ Object

Prior to a job being made available for processing it can be processed one slice at a time.

For example, to extract the header row which would be in the first slice.

Returns [Integer] the number of records processed in the slice

Note: The slice will be removed from processing when this method completes



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/rocket_job/batch/worker.rb', line 76

def work_first_slice(&block)
  raise "#work_first_slice can only be called from within before_batch callbacks" unless sub_state == :before

  # TODO: Make these settings configurable
  count        = 0
  wait_seconds = 5
  while input.first.nil?
    break if count > 10

    logger.info "First slice has not arrived yet, sleeping for #{wait_seconds} seconds"
    sleep wait_seconds
    count += 1
  end

  slice = input.first
  # No records processed
  return 0 unless slice

  # TODO: Persist that the first slice is being processed by this worker
  slice.start
  rocket_job_process_slice(slice, &block)
end