Class: RocketJob::Sliced::Writer::Input

Inherits:
Object
  • Object
show all
Defined in:
lib/rocket_job/sliced/writer/input.rb

Overview

Internal class for uploading records into input slices

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data_store, on_first: nil, slice_size: nil, slice_batch_size: nil) ⇒ Input

Returns a new instance of Input.



32
33
34
35
36
37
38
39
40
41
# File 'lib/rocket_job/sliced/writer/input.rb', line 32

def initialize(data_store, on_first: nil, slice_size: nil, slice_batch_size: nil)
  @on_first         = on_first
  @record_count     = 0
  @data_store       = data_store
  @slice_size       = slice_size || @data_store.slice_size
  @slice_batch_size = slice_batch_size || 20
  @batch            = []
  @batch_count      = 0
  new_slice
end

Instance Attribute Details

#record_countObject (readonly)

Returns the value of attribute record_count.



6
7
8
# File 'lib/rocket_job/sliced/writer/input.rb', line 6

def record_count
  @record_count
end

Class Method Details

.collect(data_store, **args) ⇒ Object

Batch collection of lines into slices.

Parameters

on_first: [Proc]
  Block to call on the first line only, instead of storing in the slice.
  Useful for extracting the header row
  Default: nil

slice_size: [Integer]
  Override the slice size when uploading for example ranges, where slice is the size
  of the range itself.

slice_batch_size: [Integer]
  The number of slices to batch up and to bulk load.
  For smaller slices this significantly improves upload performance.
  Note: If `slice_batch_size` is too high, it can exceed the maximum BSON block size.


24
25
26
27
28
29
30
# File 'lib/rocket_job/sliced/writer/input.rb', line 24

def self.collect(data_store, **args)
  writer = new(data_store, **args)
  yield(writer)
  writer.record_count
ensure
  writer&.flush
end

Instance Method Details

#<<(line) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rocket_job/sliced/writer/input.rb', line 43

def <<(line)
  if @on_first
    @on_first.call(line)
    @on_first = nil
    return self
  end
  @slice << line
  @record_count += 1
  if @slice.size >= @slice_size
    save_slice
    new_slice
  end
  self
end

#flushObject



58
59
60
61
62
63
64
65
66
67
# File 'lib/rocket_job/sliced/writer/input.rb', line 58

def flush
  if @slice_batch_size
    @batch << @slice if @slice.size.positive?
    @data_store.insert_many(@batch)
    @batch       = []
    @batch_count = 0
  elsif @slice.size.positive?
    @data_store.insert(@slice)
  end
end

#new_sliceObject



69
70
71
# File 'lib/rocket_job/sliced/writer/input.rb', line 69

def new_slice
  @slice = @data_store.new(first_record_number: @record_count + 1)
end

#save_sliceObject



73
74
75
76
77
78
79
80
# File 'lib/rocket_job/sliced/writer/input.rb', line 73

def save_slice
  return flush unless @slice_batch_size

  @batch_count += 1
  return flush if @batch_count >= @slice_batch_size

  @batch << @slice
end