Class: RocketJob::Sliced::Slices
- Inherits:
-
Object
- Object
- RocketJob::Sliced::Slices
- Extended by:
- Forwardable
- Includes:
- Enumerable, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/sliced/slices.rb
Instance Attribute Summary collapse
-
#all ⇒ Object
readonly
Returns the value of attribute all.
-
#collection_name ⇒ Object
Returns the value of attribute collection_name.
-
#slice_class ⇒ Object
Returns the value of attribute slice_class.
-
#slice_size ⇒ Object
Returns the value of attribute slice_size.
Instance Method Summary collapse
-
#completed ⇒ Object
Forwardable generates invalid warnings on these methods.
- #create(params = {}) ⇒ Object
- #create!(params = {}) ⇒ Object
-
#create_indexes ⇒ Object
Index for find_and_modify only if it is not already present.
-
#drop ⇒ Object
Drop this collection when it is no longer needed.
-
#each ⇒ Object
Returns output slices in the order of their id which is usually the order in which they were written.
- #failed ⇒ Object
-
#first ⇒ Object
Mongoid does not apply ordering, add sort rubocop:disable Style/RedundantSort.
-
#group_exceptions ⇒ Object
Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.
-
#initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) ⇒ Slices
constructor
Parameters name: [String] Name of the collection to create slice_size: [Integer] Number of records to store in each slice Default: 100 slice_class: [class] Slice class to use to hold records.
-
#insert(slice, input_slice = nil) ⇒ Object
(also: #<<)
Insert a new slice into the collection.
- #last ⇒ Object
- #new(params = {}) ⇒ Object
- #queued ⇒ Object
- #running ⇒ Object
Constructor Details
#initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) ⇒ Slices
Parameters
name: [String]
Name of the collection to create
slice_size: [Integer]
Number of records to store in each slice
Default: 100
slice_class: [class]
Slice class to use to hold records.
Default: RocketJob::Sliced::Slice
20 21 22 23 24 25 26 27 |
# File 'lib/rocket_job/sliced/slices.rb', line 20 def initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) @slice_class = slice_class @slice_size = slice_size @collection_name = collection_name # Using `Sliced::Slice` avoids having to add `_type` as an index when all slices are the same type anyway. @all = Sliced::Slice.with_collection(collection_name) end |
Instance Attribute Details
#all ⇒ Object (readonly)
Returns the value of attribute all.
9 10 11 |
# File 'lib/rocket_job/sliced/slices.rb', line 9 def all @all end |
#collection_name ⇒ Object
Returns the value of attribute collection_name.
8 9 10 |
# File 'lib/rocket_job/sliced/slices.rb', line 8 def collection_name @collection_name end |
#slice_class ⇒ Object
Returns the value of attribute slice_class.
8 9 10 |
# File 'lib/rocket_job/sliced/slices.rb', line 8 def slice_class @slice_class end |
#slice_size ⇒ Object
Returns the value of attribute slice_size.
8 9 10 |
# File 'lib/rocket_job/sliced/slices.rb', line 8 def slice_size @slice_size end |
Instance Method Details
#completed ⇒ Object
Forwardable generates invalid warnings on these methods.
111 112 113 |
# File 'lib/rocket_job/sliced/slices.rb', line 111 def completed all.completed end |
#create(params = {}) ⇒ Object
33 34 35 36 37 |
# File 'lib/rocket_job/sliced/slices.rb', line 33 def create(params = {}) slice = new(params) slice.save slice end |
#create!(params = {}) ⇒ Object
39 40 41 42 43 |
# File 'lib/rocket_job/sliced/slices.rb', line 39 def create!(params = {}) slice = new(params) slice.save! slice end |
#create_indexes ⇒ Object
Index for find_and_modify only if it is not already present
96 97 98 99 100 |
# File 'lib/rocket_job/sliced/slices.rb', line 96 def create_indexes all.collection.indexes.create_one(state: 1, _id: 1) if all.collection.indexes.none? { |i| i["name"] == "state_1__id_1" } rescue Mongo::Error::OperationFailure all.collection.indexes.create_one(state: 1, _id: 1) end |
#drop ⇒ Object
Drop this collection when it is no longer needed
106 107 108 |
# File 'lib/rocket_job/sliced/slices.rb', line 106 def drop all.collection.drop end |
#each ⇒ Object
Returns output slices in the order of their id which is usually the order in which they were written.
47 48 49 |
# File 'lib/rocket_job/sliced/slices.rb', line 47 def each all.sort(id: 1).each { |document| yield(document) } end |
#failed ⇒ Object
115 116 117 |
# File 'lib/rocket_job/sliced/slices.rb', line 115 def failed all.failed end |
#first ⇒ Object
Mongoid does not apply ordering, add sort rubocop:disable Style/RedundantSort
129 130 131 |
# File 'lib/rocket_job/sliced/slices.rb', line 129 def first all.sort("_id" => 1).first end |
#group_exceptions ⇒ Object
Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.
Each struct consists of:
class_name: [String]
Exception class name.
count: [Integer]
Number of exceptions with this class.
messages: [Array<String>]
Unique list of error .
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/rocket_job/sliced/slices.rb', line 150 def group_exceptions result_struct = Struct.new(:class_name, :count, :messages) result = all.collection.aggregate( [ { "$match" => {state: "failed"} }, { "$group" => { _id: {error_class: "$exception.class_name"}, messages: {"$addToSet" => "$exception.message"}, count: {"$sum" => 1} } } ] ) result.collect do |errors| result_struct.new(errors["_id"]["error_class"], errors["count"], errors["messages"]) end end |
#insert(slice, input_slice = nil) ⇒ Object Also known as: <<
Insert a new slice into the collection
Returns [Integer] the number of records uploaded
Parameters
slice [RocketJob::Sliced::Slice | Array]
The slice to write to the slices collection
If slice is an Array, it will be converted to a Slice before inserting
into the slices collection
input_slice [RocketJob::Sliced::Slice]
The input slice to which this slice corresponds
The id of the input slice is copied across
If the insert results in a duplicate record it is ignored, to support
restarting of jobs that failed in the middle of processing.
A warning is logged that the slice has already been processed.
Note:
`slice_size` is not enforced.
However many records are present in the slice will be written as a
single slice to the slices collection
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/rocket_job/sliced/slices.rb', line 73 def insert(slice, input_slice = nil) slice = new(records: slice) unless slice.is_a?(Slice) # Retain input_slice id in the new output slice if input_slice slice.id = input_slice.id slice.first_record_number = input_slice.first_record_number end begin slice.save! rescue Mongo::Error::OperationFailure => e # Ignore duplicates since it means the job was restarted raise(e) unless e..include?("E11000") logger.warn "Skipped already processed slice# #{slice.id}" end slice end |
#last ⇒ Object
133 134 135 |
# File 'lib/rocket_job/sliced/slices.rb', line 133 def last all.sort("_id" => -1).first end |
#new(params = {}) ⇒ Object
29 30 31 |
# File 'lib/rocket_job/sliced/slices.rb', line 29 def new(params = {}) slice_class.new(params.merge(collection_name: collection_name)) end |
#queued ⇒ Object
119 120 121 |
# File 'lib/rocket_job/sliced/slices.rb', line 119 def queued all.queued end |
#running ⇒ Object
123 124 125 |
# File 'lib/rocket_job/sliced/slices.rb', line 123 def running all.running end |