Class: RocketJob::Sliced::Slices

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable, SemanticLogger::Loggable
Defined in:
lib/rocket_job/sliced/slices.rb

Direct Known Subclasses

Input, Output

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100) ⇒ Slices

Parameters

name: [String]
  Name of the collection to create
slice_size: [Integer]
  Number of records to store in each slice
  Default: 100
slice_class: [class]
  Slice class to use to hold records.
  Default: RocketJob::Sliced::Slice


20
21
22
23
24
25
26
27
# File 'lib/rocket_job/sliced/slices.rb', line 20

def initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100)
  @slice_class     = slice_class
  @slice_size      = slice_size
  @collection_name = collection_name

  # Using `Sliced::Slice` avoids having to add `_type` as an index when all slices are the same type anyway.
  @all = Sliced::Slice.with_collection(collection_name)
end

Instance Attribute Details

#allObject (readonly)

Returns the value of attribute all.



9
10
11
# File 'lib/rocket_job/sliced/slices.rb', line 9

def all
  @all
end

#collection_nameObject

Returns the value of attribute collection_name.



8
9
10
# File 'lib/rocket_job/sliced/slices.rb', line 8

def collection_name
  @collection_name
end

#slice_classObject

Returns the value of attribute slice_class.



8
9
10
# File 'lib/rocket_job/sliced/slices.rb', line 8

def slice_class
  @slice_class
end

#slice_sizeObject

Returns the value of attribute slice_size.



8
9
10
# File 'lib/rocket_job/sliced/slices.rb', line 8

def slice_size
  @slice_size
end

Instance Method Details

#completedObject

Forwardable generates invalid warnings on these methods.



111
112
113
# File 'lib/rocket_job/sliced/slices.rb', line 111

def completed
  all.completed
end

#create(params = {}) ⇒ Object



33
34
35
36
37
# File 'lib/rocket_job/sliced/slices.rb', line 33

def create(params = {})
  slice = new(params)
  slice.save
  slice
end

#create!(params = {}) ⇒ Object



39
40
41
42
43
# File 'lib/rocket_job/sliced/slices.rb', line 39

def create!(params = {})
  slice = new(params)
  slice.save!
  slice
end

#create_indexesObject

Index for find_and_modify only if it is not already present



96
97
98
99
100
# File 'lib/rocket_job/sliced/slices.rb', line 96

def create_indexes
  all.collection.indexes.create_one(state: 1, _id: 1) if all.collection.indexes.none? { |i| i["name"] == "state_1__id_1" }
rescue Mongo::Error::OperationFailure
  all.collection.indexes.create_one(state: 1, _id: 1)
end

#dropObject

Drop this collection when it is no longer needed



106
107
108
# File 'lib/rocket_job/sliced/slices.rb', line 106

def drop
  all.collection.drop
end

#eachObject

Returns output slices in the order of their id which is usually the order in which they were written.



47
48
49
# File 'lib/rocket_job/sliced/slices.rb', line 47

def each
  all.sort(id: 1).each { |document| yield(document) }
end

#failedObject



115
116
117
# File 'lib/rocket_job/sliced/slices.rb', line 115

def failed
  all.failed
end

#firstObject

Mongoid does not apply ordering, add sort rubocop:disable Style/RedundantSort



129
130
131
# File 'lib/rocket_job/sliced/slices.rb', line 129

def first
  all.sort("_id" => 1).first
end

#group_exceptionsObject

Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.

Each struct consists of:

class_name: [String]
  Exception class name.

count: [Integer]
  Number of exceptions with this class.

messages: [Array<String>]
  Unique list of error messages.


150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rocket_job/sliced/slices.rb', line 150

def group_exceptions
  result_struct = Struct.new(:class_name, :count, :messages)
  result        = all.collection.aggregate(
    [
      {
        "$match" => {state: "failed"}
      },
      {
        "$group" => {
          _id:      {error_class: "$exception.class_name"},
          messages: {"$addToSet" => "$exception.message"},
          count:    {"$sum" => 1}
        }
      }
    ]
  )
  result.collect do |errors|
    result_struct.new(errors["_id"]["error_class"], errors["count"], errors["messages"])
  end
end

#insert(slice, input_slice = nil) ⇒ Object Also known as: <<

Insert a new slice into the collection

Returns [Integer] the number of records uploaded

Parameters

slice [RocketJob::Sliced::Slice | Array]
  The slice to write to the slices collection
  If slice is an Array, it will be converted to a Slice before inserting
  into the slices collection

input_slice [RocketJob::Sliced::Slice]
  The input slice to which this slice corresponds
  The id of the input slice is copied across
  If the insert results in a duplicate record it is ignored, to support
  restarting of jobs that failed in the middle of processing.
  A warning is logged that the slice has already been processed.

Note:

`slice_size` is not enforced.
However many records are present in the slice will be written as a
single slice to the slices collection


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rocket_job/sliced/slices.rb', line 73

def insert(slice, input_slice = nil)
  slice = new(records: slice) unless slice.is_a?(Slice)

  # Retain input_slice id in the new output slice
  if input_slice
    slice.id                  = input_slice.id
    slice.first_record_number = input_slice.first_record_number
  end

  begin
    slice.save!
  rescue Mongo::Error::OperationFailure => e
    # Ignore duplicates since it means the job was restarted
    raise(e) unless e.message.include?("E11000")

    logger.warn "Skipped already processed slice# #{slice.id}"
  end
  slice
end

#lastObject



133
134
135
# File 'lib/rocket_job/sliced/slices.rb', line 133

def last
  all.sort("_id" => -1).first
end

#new(params = {}) ⇒ Object



29
30
31
# File 'lib/rocket_job/sliced/slices.rb', line 29

def new(params = {})
  slice_class.new(params.merge(collection_name: collection_name))
end

#queuedObject



119
120
121
# File 'lib/rocket_job/sliced/slices.rb', line 119

def queued
  all.queued
end

#runningObject



123
124
125
# File 'lib/rocket_job/sliced/slices.rb', line 123

def running
  all.running
end