Class: BulkProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/bulk_processor.rb,
lib/bulk_processor/config.rb,
lib/bulk_processor/s3_file.rb,
lib/bulk_processor/version.rb,
lib/bulk_processor/back_end.rb,
lib/bulk_processor/split_csv.rb,
lib/bulk_processor/process_csv.rb,
lib/bulk_processor/csv_processor.rb,
lib/bulk_processor/file_splitter.rb,
lib/bulk_processor/validated_csv.rb,
lib/bulk_processor/stream_encoder.rb,
lib/bulk_processor/back_end/dynosaur.rb,
lib/bulk_processor/payload_serializer.rb,
lib/bulk_processor/back_end/active_job.rb,
lib/bulk_processor/csv_processor/result.rb,
lib/bulk_processor/row_chunker/balanced.rb,
lib/bulk_processor/row_chunker/boundary.rb,
lib/bulk_processor/csv_processor/no_op_handler.rb,
lib/bulk_processor/csv_processor/row_processor.rb,
lib/bulk_processor/back_end/dynosaur/split_csv_task.rb,
lib/bulk_processor/back_end/active_job/split_csv_job.rb,
lib/bulk_processor/back_end/dynosaur/process_csv_task.rb,
lib/bulk_processor/csv_processor/no_op_post_processor.rb,
lib/bulk_processor/back_end/active_job/process_csv_job.rb

Overview

Process large CSV files in the background.

Defined Under Namespace

Modules: BackEnd, PayloadSerializer, RowChunker Classes: CSVProcessor, Config, FileSplitter, ProcessCSV, S3File, SplitCSV, StreamEncoder, ValidatedCSV

Constant Summary collapse

VERSION =
'0.8.0'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(key:, stream:, processor_class:, payload: {}) ⇒ BulkProcessor

Returns a new instance of BulkProcessor.



28
29
30
31
32
33
34
# File 'lib/bulk_processor.rb', line 28

def initialize(key:, stream:, processor_class:, payload: {})
  @key = key
  @stream = stream
  @processor_class = processor_class
  @payload = payload
  @errors = []
end

Instance Attribute Details

#errorsObject (readonly)

Returns the value of attribute errors.



26
27
28
# File 'lib/bulk_processor.rb', line 26

def errors
  @errors
end

Class Method Details

.configObject



17
18
19
# File 'lib/bulk_processor.rb', line 17

def config
  @config ||= Config.new
end

.configure {|config| ... } ⇒ Object

Yields:



21
22
23
# File 'lib/bulk_processor.rb', line 21

def configure
  yield config
end

Instance Method Details

#start(num_processes = 1) ⇒ Object

Validate the CSV and enqueue if for processing in the background.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/bulk_processor.rb', line 37

def start(num_processes = 1)
  if BulkProcessor.config.file_class.new(key).exists?
    errors << "Already processing #{key}, please wait for it to finish"
    return false
  end

  encoded_contents = StreamEncoder.new(stream).encoded

  csv = ValidatedCSV.new(
    encoded_contents,
    processor_class.required_columns,
    processor_class.optional_columns
  )

  if csv.valid?
    start_backend(encoded_contents, num_processes)
  else
    errors.concat(csv.errors)
  end
  errors.empty?
end