Class: RocketJob::Sliced::Input
- Defined in:
- lib/rocket_job/sliced/input.rb
Instance Attribute Summary
Attributes inherited from Slices
#all, #collection_name, #slice_class, #slice_size
Instance Method Summary collapse
-
#each_failed_record ⇒ Object
Iterate over each failed record, if any Since each slice can only contain 1 failed record, only the failed record is returned along with the slice containing the exception details.
-
#next_slice(worker_name) ⇒ Object
Returns the next slice to work on in id order Returns nil if there are currently no queued slices.
-
#requeue_failed ⇒ Object
Requeue all failed slices.
-
#requeue_running(worker_name) ⇒ Object
Requeue all running slices for a server or worker that is no longer available.
- #upload(on_first: nil, &block) ⇒ Object
- #upload_arel(arel, *column_names, &block) ⇒ Object
- #upload_integer_range(start_id, last_id) ⇒ Object
- #upload_integer_range_in_reverse_order(start_id, last_id) ⇒ Object
- #upload_mongo_query(criteria, *column_names, &block) ⇒ Object
Methods inherited from Slices
#completed, #create, #create!, #create_indexes, #drop, #each, #failed, #first, #group_exceptions, #initialize, #insert, #last, #new, #queued, #running
Constructor Details
This class inherits a constructor from RocketJob::Sliced::Slices
Instance Method Details
#each_failed_record ⇒ Object
Iterate over each failed record, if any Since each slice can only contain 1 failed record, only the failed record is returned along with the slice containing the exception details
Example:
job.each_failed_record do |record, slice|
ap slice
end
109 110 111 112 113 114 |
# File 'lib/rocket_job/sliced/input.rb', line 109 def each_failed_record failed.each do |slice| record = slice.failed_record yield(record, slice) unless record.nil? end end |
#next_slice(worker_name) ⇒ Object
Returns the next slice to work on in id order Returns nil if there are currently no queued slices
If a slice is in queued state it will be started and assigned to this worker
136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/rocket_job/sliced/input.rb', line 136 def next_slice(worker_name) # TODO: Will it perform faster without the id sort? # I.e. Just process on a FIFO basis? document = all.queued. sort("_id" => 1). find_one_and_update( {"$set" => {worker_name: worker_name, state: :running, started_at: Time.now}}, return_document: :after ) document.collection_name = collection_name if document document end |
#requeue_failed ⇒ Object
Requeue all failed slices
117 118 119 120 121 122 |
# File 'lib/rocket_job/sliced/input.rb', line 117 def requeue_failed failed.update_all( "$unset" => {worker_name: nil, started_at: nil}, "$set" => {state: :queued} ) end |
#requeue_running(worker_name) ⇒ Object
Requeue all running slices for a server or worker that is no longer available
125 126 127 128 129 130 |
# File 'lib/rocket_job/sliced/input.rb', line 125 def requeue_running(worker_name) running.where(worker_name: /\A#{worker_name}/).update_all( "$unset" => {worker_name: nil, started_at: nil}, "$set" => {state: :queued} ) end |
#upload(on_first: nil, &block) ⇒ Object
4 5 6 7 8 9 10 11 |
# File 'lib/rocket_job/sliced/input.rb', line 4 def upload(on_first: nil, &block) # Create indexes before uploading create_indexes Writer::Input.collect(self, on_first: on_first, &block) rescue StandardError => e drop raise(e) end |
#upload_arel(arel, *column_names, &block) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/rocket_job/sliced/input.rb', line 45 def upload_arel(arel, *column_names, &block) unless block column_names = column_names.empty? ? [:id] : column_names.collect(&:to_sym) block = if column_names.size == 1 column = column_names.first ->(model) { model.send(column) } else ->(model) { column_names.collect { |c| model.send(c) } } end # find_each requires the :id column in the query selection = column_names.include?(:id) ? column_names : column_names + [:id] arel = arel.select(selection) end upload { |records| arel.find_each { |model| records << block.call(model) } } end |
#upload_integer_range(start_id, last_id) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/rocket_job/sliced/input.rb', line 64 def upload_integer_range(start_id, last_id) # Create indexes before uploading create_indexes count = 0 while start_id <= last_id end_id = start_id + slice_size - 1 end_id = last_id if end_id > last_id create!(records: [[start_id, end_id]]) start_id += slice_size count += 1 end count rescue StandardError => e drop raise(e) end |
#upload_integer_range_in_reverse_order(start_id, last_id) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/rocket_job/sliced/input.rb', line 81 def upload_integer_range_in_reverse_order(start_id, last_id) # Create indexes before uploading create_indexes end_id = last_id count = 0 while end_id >= start_id first_id = end_id - slice_size + 1 first_id = start_id if first_id.negative? || (first_id < start_id) create!(records: [[first_id, end_id]]) end_id -= slice_size count += 1 end count rescue StandardError => e drop raise(e) end |
#upload_mongo_query(criteria, *column_names, &block) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rocket_job/sliced/input.rb', line 13 def upload_mongo_query(criteria, *column_names, &block) = criteria. # Without a block extract the fields from the supplied criteria if block # Criteria is returning old school :fields instead of :projections [:projection] = .delete(:fields) if .key?(:fields) else column_names = column_names.collect(&:to_s) column_names << "_id" if column_names.size.zero? fields = .delete(:fields) || {} column_names.each { |col| fields[col] = 1 } [:projection] = fields block = if column_names.size == 1 column = column_names.first ->(document) { document[column] } else ->(document) { column_names.collect { |c| document[c] } } end end upload do |records| # Drop down to the mongo driver level to avoid constructing a Model for each document returned criteria.klass.collection.find(criteria.selector, ).each do |document| records << block.call(document) end end end |