Class: RocketJob::Sliced::Input

Inherits:
Slices
  • Object
show all
Defined in:
lib/rocket_job/sliced/input.rb

Instance Attribute Summary

Attributes inherited from Slices

#all, #collection_name, #slice_class, #slice_size

Instance Method Summary collapse

Methods inherited from Slices

#completed, #create, #create!, #create_indexes, #drop, #each, #failed, #first, #group_exceptions, #initialize, #insert, #last, #new, #queued, #running

Constructor Details

This class inherits a constructor from RocketJob::Sliced::Slices

Instance Method Details

#each_failed_recordObject

Iterate over each failed record, if any Since each slice can only contain 1 failed record, only the failed record is returned along with the slice containing the exception details

Example:

job.each_failed_record do |record, slice|
  ap slice
end


109
110
111
112
113
114
# File 'lib/rocket_job/sliced/input.rb', line 109

def each_failed_record
  failed.each do |slice|
    record = slice.failed_record
    yield(record, slice) unless record.nil?
  end
end

#next_slice(worker_name) ⇒ Object

Returns the next slice to work on in id order Returns nil if there are currently no queued slices

If a slice is in queued state it will be started and assigned to this worker



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rocket_job/sliced/input.rb', line 136

def next_slice(worker_name)
  # TODO: Will it perform faster without the id sort?
  # I.e. Just process on a FIFO basis?
  document                 = all.queued.
                             sort("_id" => 1).
                             find_one_and_update(
                               {"$set" => {worker_name: worker_name, state: :running, started_at: Time.now}},
                               return_document: :after
                             )
  document.collection_name = collection_name if document
  document
end

#requeue_failedObject

Requeue all failed slices



117
118
119
120
121
122
# File 'lib/rocket_job/sliced/input.rb', line 117

def requeue_failed
  failed.update_all(
    "$unset" => {worker_name: nil, started_at: nil},
    "$set"   => {state: :queued}
  )
end

#requeue_running(worker_name) ⇒ Object

Requeue all running slices for a server or worker that is no longer available



125
126
127
128
129
130
# File 'lib/rocket_job/sliced/input.rb', line 125

def requeue_running(worker_name)
  running.where(worker_name: /\A#{worker_name}/).update_all(
    "$unset" => {worker_name: nil, started_at: nil},
    "$set"   => {state: :queued}
  )
end

#upload(on_first: nil, &block) ⇒ Object



4
5
6
7
8
9
10
11
# File 'lib/rocket_job/sliced/input.rb', line 4

def upload(on_first: nil, &block)
  # Create indexes before uploading
  create_indexes
  Writer::Input.collect(self, on_first: on_first, &block)
rescue StandardError => e
  drop
  raise(e)
end

#upload_arel(arel, *column_names, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/rocket_job/sliced/input.rb', line 45

def upload_arel(arel, *column_names, &block)
  unless block
    column_names = column_names.empty? ? [:id] : column_names.collect(&:to_sym)

    block =
      if column_names.size == 1
        column = column_names.first
        ->(model) { model.send(column) }
      else
        ->(model) { column_names.collect { |c| model.send(c) } }
      end
    # find_each requires the :id column in the query
    selection = column_names.include?(:id) ? column_names : column_names + [:id]
    arel      = arel.select(selection)
  end

  upload { |records| arel.find_each { |model| records << block.call(model) } }
end

#upload_integer_range(start_id, last_id) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/rocket_job/sliced/input.rb', line 64

def upload_integer_range(start_id, last_id)
  # Create indexes before uploading
  create_indexes
  count = 0
  while start_id <= last_id
    end_id = start_id + slice_size - 1
    end_id = last_id if end_id > last_id
    create!(records: [[start_id, end_id]])
    start_id += slice_size
    count    += 1
  end
  count
rescue StandardError => e
  drop
  raise(e)
end

#upload_integer_range_in_reverse_order(start_id, last_id) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/rocket_job/sliced/input.rb', line 81

def upload_integer_range_in_reverse_order(start_id, last_id)
  # Create indexes before uploading
  create_indexes
  end_id = last_id
  count  = 0
  while end_id >= start_id
    first_id = end_id - slice_size + 1
    first_id = start_id if first_id.negative? || (first_id < start_id)
    create!(records: [[first_id, end_id]])
    end_id -= slice_size
    count  += 1
  end
  count
rescue StandardError => e
  drop
  raise(e)
end

#upload_mongo_query(criteria, *column_names, &block) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/rocket_job/sliced/input.rb', line 13

def upload_mongo_query(criteria, *column_names, &block)
  options = criteria.options

  # Without a block extract the fields from the supplied criteria
  if block
    # Criteria is returning old school :fields instead of :projections
    options[:projection] = options.delete(:fields) if options.key?(:fields)
  else
    column_names = column_names.collect(&:to_s)
    column_names << "_id" if column_names.size.zero?

    fields = options.delete(:fields) || {}
    column_names.each { |col| fields[col] = 1 }
    options[:projection] = fields

    block =
      if column_names.size == 1
        column = column_names.first
        ->(document) { document[column] }
      else
        ->(document) { column_names.collect { |c| document[c] } }
      end
  end

  upload do |records|
    # Drop down to the mongo driver level to avoid constructing a Model for each document returned
    criteria.klass.collection.find(criteria.selector, options).each do |document|
      records << block.call(document)
    end
  end
end