Class: JobIteration::EnumeratorBuilder

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/job-iteration/enumerator_builder.rb

Defined Under Namespace

Classes: Wrapper

Instance Method Summary collapse

Constructor Details

#initialize(job, wrapper: Wrapper) ⇒ EnumeratorBuilder

Returns a new instance of EnumeratorBuilder.



27
28
29
30
# File 'lib/job-iteration/enumerator_builder.rb', line 27

def initialize(job, wrapper: Wrapper)
  @job = job
  @wrapper = wrapper
end

Instance Method Details

#build_active_record_enumerator_on_batches(scope, cursor:, columns: nil, batch_size: nil) ⇒ Object Also known as: active_record_on_batches

Builds Enumerator from Active Record Relation and enumerates on batches. Each Enumerator tick moves the cursor +batch_size+ rows forward.

+batch_size:+ sets how many records will be fetched in one batch. Defaults to 100.

For the rest of arguments, see documentation for #build_active_record_enumerator_on_records



113
114
115
116
117
118
119
120
121
# File 'lib/job-iteration/enumerator_builder.rb', line 113

def build_active_record_enumerator_on_batches(scope, cursor:, columns: nil, batch_size: nil)
  enum = build_active_record_enumerator(
    scope,
    cursor: cursor,
    columns: columns,
    batch_size: batch_size,
  ).batches
  wrap(self, enum)
end

#build_active_record_enumerator_on_records(scope, cursor:, columns: nil, batch_size: nil) ⇒ Object Also known as: active_record_on_records

Builds Enumerator from Active Record Relation. Each Enumerator tick moves the cursor one row forward.

+columns:+ argument is used to build the actual query for iteration. +columns+: defaults to primary key:

1) SELECT * FROM users ORDER BY id LIMIT 100

When iteration is resumed, +cursor:+ and +columns:+ values will be used to continue from the point where iteration stopped:

2) SELECT * FROM users WHERE id > $CURSOR ORDER BY id LIMIT 100

+columns:+ can also take more than one column. In that case, +cursor+ will contain serialized values of all columns at the point where iteration stopped.

Consider this example with +columns: [:created_at, :id]+. Here's the query will use on the first iteration:

1) SELECT * FROM products ORDER BY created_at, id LIMIT 100

And the query on the next iteration:

2) SELECT * FROM products WHERE (created_at > '$LAST_CREATED_AT_CURSOR' OR (created_at = '$LAST_CREATED_AT_CURSOR' AND (id > '$LAST_ID_CURSOR'))) ORDER BY created_at, id LIMIT 100



97
98
99
100
101
102
103
104
105
# File 'lib/job-iteration/enumerator_builder.rb', line 97

def build_active_record_enumerator_on_records(scope, cursor:, columns: nil, batch_size: nil)
  enum = build_active_record_enumerator(
    scope,
    cursor: cursor,
    columns: columns,
    batch_size: batch_size,
  ).records
  wrap(self, enum)
end

#build_array_enumerator(enumerable, cursor:) ⇒ Object Also known as: array

Builds Enumerator object from a given array, using +cursor+ as an offset.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/job-iteration/enumerator_builder.rb', line 46

def build_array_enumerator(enumerable, cursor:)
  unless enumerable.is_a?(Array)
    raise ArgumentError, "enumerable must be an Array"
  end
  if enumerable.any? { |i| defined?(ActiveRecord) && i.is_a?(ActiveRecord::Base) }
    raise ArgumentError, "array cannot contain ActiveRecord objects"
  end
  drop =
    if cursor.nil?
      0
    else
      cursor + 1
    end

  wrap(self, enumerable.each_with_index.drop(drop).to_enum { enumerable.size })
end

#build_lock_queue_enumerator(lock_queue, at_most_once:) ⇒ Object

Builds Enumerator from a lock queue instance that belongs to a job. The helper is only to be used from jobs that use LockQueue module.



65
66
67
68
69
70
71
# File 'lib/job-iteration/enumerator_builder.rb', line 65

def build_lock_queue_enumerator(lock_queue, at_most_once:)
  unless lock_queue.is_a?(BackgroundQueue::LockQueue::RedisQueue) ||
      lock_queue.is_a?(BackgroundQueue::LockQueue::RolloutRedisQueue)
    raise ArgumentError, "an argument to #build_lock_queue_enumerator must be a LockQueue"
  end
  wrap(self, BackgroundQueue::LockQueueEnumerator.new(lock_queue, at_most_once: at_most_once).to_enum)
end

#build_once_enumerator(cursor:) ⇒ Object Also known as: once

Builds Enumerator objects that iterates once.



35
36
37
# File 'lib/job-iteration/enumerator_builder.rb', line 35

def build_once_enumerator(cursor:)
  wrap(self, build_times_enumerator(1, cursor: cursor))
end

#build_times_enumerator(number, cursor:) ⇒ Object Also known as: times

Builds Enumerator objects that iterates N times and yields number starting from zero.

Raises:

  • (ArgumentError)


40
41
42
43
# File 'lib/job-iteration/enumerator_builder.rb', line 40

def build_times_enumerator(number, cursor:)
  raise ArgumentError, "First argument must be an Integer" unless number.is_a?(Integer)
  wrap(self, build_array_enumerator(number.times.to_a, cursor: cursor))
end