Class: RocketJob::Sliced::Input

Inherits:
Slices
  • Object
show all
Defined in:
lib/rocket_job/sliced/input.rb

Instance Attribute Summary

Attributes inherited from Slices

#all, #collection_name, #slice_class, #slice_size

Instance Method Summary collapse

Methods inherited from Slices

#append, #completed, #create, #create!, #create_indexes, #drop, #each, #failed, #first, #group_exceptions, #initialize, #insert, #insert_many, #last, #new, #queued, #running

Constructor Details

This class inherits a constructor from RocketJob::Sliced::Slices

Instance Method Details

#each_failed_recordObject

Iterate over each failed record, if any Since each slice can only contain 1 failed record, only the failed record is returned along with the slice containing the exception details

Example:

job.each_failed_record do |record, slice|
  ap slice
end


97
98
99
100
101
102
# File 'lib/rocket_job/sliced/input.rb', line 97

def each_failed_record
  failed.each do |slice|
    record = slice.failed_record
    yield(record, slice) unless record.nil?
  end
end

#next_slice(worker_name) ⇒ Object

Returns the next slice to work on in id order Returns nil if there are currently no queued slices

If a slice is in queued state it will be started and assigned to this worker



124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rocket_job/sliced/input.rb', line 124

def next_slice(worker_name)
  # TODO: Will it perform faster without the id sort?
  # I.e. Just process on a FIFO basis?
  document                 = all.queued.
    sort("_id" => 1).
    find_one_and_update(
      {"$set" => {worker_name: worker_name, state: "running", started_at: Time.now}},
      return_document: :after
    )
  document.collection_name = collection_name if document
  document
end

#requeue_failedObject

Requeue all failed slices



105
106
107
108
109
110
# File 'lib/rocket_job/sliced/input.rb', line 105

def requeue_failed
  failed.update_all(
    "$unset" => {worker_name: nil, started_at: nil},
    "$set"   => {state: "queued"}
  )
end

#requeue_running(worker_name) ⇒ Object

Requeue all running slices for a server or worker that is no longer available



113
114
115
116
117
118
# File 'lib/rocket_job/sliced/input.rb', line 113

def requeue_running(worker_name)
  running.where(worker_name: /\A#{worker_name}/).update_all(
    "$unset" => {worker_name: nil, started_at: nil},
    "$set"   => {state: "queued"}
  )
end

#upload(**args, &block) ⇒ Object



4
5
6
7
8
9
10
11
# File 'lib/rocket_job/sliced/input.rb', line 4

def upload(**args, &block)
  # Create indexes before uploading
  create_indexes
  Writer::Input.collect(self, **args, &block)
rescue Exception => e
  drop
  raise(e)
end

#upload_arel(arel, columns: nil, slice_batch_size: nil, &block) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rocket_job/sliced/input.rb', line 43

def upload_arel(arel, columns: nil, slice_batch_size: nil, &block)
  unless block
    columns = columns.blank? ? [:id] : columns.collect(&:to_sym)

    block =
      if columns.size == 1
        column = columns.first
        ->(model) { model.public_send(column) }
      else
        ->(model) { columns.collect { |c| model.public_send(c) } }
      end
    # find_each requires the :id column in the query
    selection = columns.include?(:id) ? columns : columns + [:id]
    arel      = arel.select(selection)
  end

  upload(slice_batch_size: slice_batch_size) { |records| arel.find_each { |model| records << block.call(model) } }
end

#upload_integer_range(start_id, last_id, slice_batch_size: 1_000) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/rocket_job/sliced/input.rb', line 62

def upload_integer_range(start_id, last_id, slice_batch_size: 1_000)
  # Each "record" is actually a range of Integers which makes up each slice
  upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records|
    while start_id <= last_id
      end_id = start_id + slice_size - 1
      end_id = last_id if end_id > last_id
      records << [start_id, end_id]
      start_id += slice_size
    end
  end
end

#upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: 1_000) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/rocket_job/sliced/input.rb', line 74

def upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: 1_000)
  # Each "record" is actually a range of Integers which makes up each slice
  upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records|
    end_id = last_id
    while end_id >= start_id
      first_id = end_id - slice_size + 1
      first_id = start_id if first_id.negative? || (first_id < start_id)
      records << [first_id, end_id]
      end_id -= slice_size
    end
  end
end

#upload_mongo_query(criteria, columns: [], slice_batch_size: nil, &block) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rocket_job/sliced/input.rb', line 13

def upload_mongo_query(criteria, columns: [], slice_batch_size: nil, &block)
  options = criteria.options

  # Without a block extract the fields from the supplied criteria
  if block
    # Criteria is returning old school :fields instead of :projections
    options[:projection] = options.delete(:fields) if options.key?(:fields)
  else
    columns = columns.blank? ? ["_id"] : columns.collect(&:to_s)
    fields  = options.delete(:fields) || {}
    columns.each { |col| fields[col] = 1 }
    options[:projection] = fields

    block =
      if columns.size == 1
        column = columns.first
        ->(document) { document[column] }
      else
        ->(document) { columns.collect { |c| document[c] } }
      end
  end

  upload(slice_batch_size: slice_batch_size) do |records|
    # Drop down to the mongo driver level to avoid constructing a Model for each document returned
    criteria.klass.collection.find(criteria.selector, options).each do |document|
      records << block.call(document)
    end
  end
end