Module: RocketJob::Batch::IO

Extended by:
ActiveSupport::Concern
Included in:
RocketJob::Batch
Defined in:
lib/rocket_job/batch/io.rb

Overview

IO methods for sliced jobs

Instance Method Summary collapse

Instance Method Details

#download(file_name_or_io = nil, category: :main, **args, &block) ⇒ Object

Download the output data into the supplied file_name or stream

Parameters

file_name_or_io [String|IO]
  The file_name of the file to write to, or an IO Stream that implements #write.

options:
  category [Symbol]
    The category of output to download
    Default: :main

See RocketJob::Sliced::Output#download for remaining options

Returns [Integer] the number of records downloaded



343
344
345
346
347
# File 'lib/rocket_job/batch/io.rb', line 343

def download(file_name_or_io = nil, category: :main, **args, &block)
  raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing?

  output(category).download(file_name_or_io, **args, &block)
end

#input(category = :main) ⇒ Object

Returns [RocketJob::Sliced::Input] input collection for holding input slices

Parameters:

category [Symbol]
  The name of the category to access or upload data into
  Default: None ( Uses the single default input collection for this job )
  Validates: This value must be one of those listed in #input_categories


16
17
18
19
20
21
22
23
# File 'lib/rocket_job/batch/io.rb', line 16

def input(category = :main)
  raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" unless input_categories.include?(category) || (category == :main)

  collection_name = "rocket_job.inputs.#{id}"
  collection_name << ".#{category}" unless category == :main

  (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(slice_arguments(collection_name))
end

#output(category = :main) ⇒ Object

Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected

Parameters:

category [Symbol]
  The name of the category to access or download data from
  Default: None ( Uses the single default output collection for this job )
  Validates: This value must be one of those listed in #output_categories


33
34
35
36
37
38
39
40
# File 'lib/rocket_job/batch/io.rb', line 33

def output(category = :main)
  raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" unless output_categories.include?(category) || (category == :main)

  collection_name = "rocket_job.outputs.#{id}"
  collection_name << ".#{category}" unless category == :main

  (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(slice_arguments(collection_name))
end

#upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block) ⇒ Object

Upload the supplied file_name or stream.

Returns [Integer] the number of records uploaded.

Parameters

file_name_or_io [String | IO]
  Full path and file name to stream into the job,
  Or, an IO Stream that responds to: :read

streams [Symbol|Array]
  Streams to convert the data whilst it is being read.
  When nil, the file_name extensions will be inspected to determine what
  streams should be applied.
  Default: nil

delimiter[String]
  Line / Record delimiter to use to break the stream up into records
    Any string to break the stream up by
    The records when saved will not include this delimiter
  Default: nil
    Automatically detect line endings and break up by line
    Searches for the first "\r\n" or "\n" and then uses that as the
    delimiter for all subsequent records

buffer_size [Integer]
  Size of the blocks when reading from the input file / stream.
  Default: 65536 ( 64K )

encoding: [String|Encoding]
  Encode returned data with this encoding.
  'US-ASCII':   Original 7 bit ASCII Format
  'ASCII-8BIT': 8-bit ASCII Format
  'UTF-8':      UTF-8 Format
  Etc.
  Default: 'UTF-8'

encode_replace: [String]
  The character to replace with when a character cannot be converted to the target encoding.
  nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised.
  Default: nil

encode_cleaner: [nil|symbol|Proc]
  Cleanse data read from the input stream.
  nil:           No cleansing
  :printable Cleanse all non-printable characters except \r and \n
  Proc/lambda    Proc to call after every read to cleanse the data
  Default: :printable

stream_mode: [:line | :row | :record]
  :line
    Uploads the file a line (String) at a time for processing by workers.
  :row
    Parses each line from the file as an Array and uploads each array for processing by workers.
  :record
    Parses each line from the file into a Hash and uploads each hash for processing by workers.
  See IOStream#each_line, IOStream#each_row, and IOStream#each_record.

Example:

# Load plain text records from a file
job.input.upload('hello.csv')

Example:

# Load plain text records from a file, stripping all non-printable characters,
# as well as any characters that cannot be converted to UTF-8
job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')

Example: Zip

# Since csv is not known to RocketJob it is ignored
job.input.upload('myfile.csv.zip')

Example: Encrypted Zip

job.input.upload('myfile.csv.zip.enc')

Example: Explicitly set the streams

job.input.upload('myfile.ze', streams: [:zip, :enc])

Example: Supply custom options

job.input.upload('myfile.csv.enc', streams: :enc])

Example: Extract streams from filename but write to a temp file

streams = IOStreams.streams_for_file_name('myfile.gz.enc')
t = Tempfile.new('my_project')
job.input.upload(t.to_path, streams: streams)

Example: Upload by writing records one at a time to the upload stream

job.upload do |writer|
  10.times { |i| writer << i }
end

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to the number of records returned by the arel.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.

  • By default all data read from the file/stream is converted into UTF-8 before being persisted. This is recommended since Mongo only supports UTF-8 strings.

  • When zip format, the Zip file/stream must contain only one file, the first file found will be loaded into the job

  • If an io stream is supplied, it is read until it returns nil.

  • Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.

  • CSV parsing is slow, so it is usually left for the workers to do.



143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/rocket_job/batch/io.rb', line 143

def upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block)
  if file_name
    self.upload_file_name = file_name
  elsif file_name_or_io.is_a?(String)
    self.upload_file_name = file_name_or_io
  end
  count             = input(category).upload(file_name_or_io, file_name: file_name, **args, &block)
  self.record_count = (record_count || 0) + count
  count
rescue StandardError => exc
  input(category).delete_all
  raise(exc)
end

#upload_arel(arel, *column_names, category: :main, &block) ⇒ Object

Upload results from an Arel into RocketJob::SlicedJob.

Params

column_names
  When a block is not supplied, supply the names of the columns to be returned
  and uploaded into the job
  These columns are automatically added to the select list to reduce overhead

If a Block is supplied it is passed the model returned from the database and should return the work item to be uploaded into the job.

Returns [Integer] the number of records uploaded

Example: Upload id’s for all users

arel = User.all
job.upload_arel(arel)

Example: Upload selected user id’s

arel = User.where(country_code: 'US')
job.upload_arel(arel)

Example: Upload user_name and zip_code

arel = User.where(country_code: 'US')
job.upload_arel(arel, :user_name, :zip_code)

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to the number of records returned by the arel.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



187
188
189
190
191
192
193
194
# File 'lib/rocket_job/batch/io.rb', line 187

def upload_arel(arel, *column_names, category: :main, &block)
  count             = input(category).upload_arel(arel, *column_names, &block)
  self.record_count = (record_count || 0) + count
  count
rescue StandardError => exc
  input(category).delete_all
  raise(exc)
end

#upload_integer_range(start_id, last_id, category: :main) ⇒ Object

Upload sliced range of integer requests as arrays of start and end ids.

Returns [Integer] last_id - start_id + 1.

Uploads one range per slice so that the response can return multiple records for each slice processed

Example

job.slice_size = 100
job.upload_integer_range(200, 421)

# Equivalent to calling:
job.input.insert([200,299])
job.input.insert([300,399])
job.input.insert([400,421])

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to: last_id - start_id + 1.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



261
262
263
264
265
266
267
268
269
# File 'lib/rocket_job/batch/io.rb', line 261

def upload_integer_range(start_id, last_id, category: :main)
  input(category).upload_integer_range(start_id, last_id)
  count             = last_id - start_id + 1
  self.record_count = (record_count || 0) + count
  count
rescue StandardError => exc
  input(category).delete_all
  raise(exc)
end

#upload_integer_range_in_reverse_order(start_id, last_id, category: :main) ⇒ Object

Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first

Returns [Integer] last_id - start_id + 1.

Uploads one range per slice so that the response can return multiple records for each slice processed. Useful for when the highest order integer values should be processed before the lower integer value ranges. For example when processing every record in a database based on the id column

Example

job.slice_size = 100
job.upload_integer_range_in_reverse_order(200, 421)

# Equivalent to calling:
job.input.insert([400,421])
job.input.insert([300,399])
job.input.insert([200,299])

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to: last_id - start_id + 1.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



296
297
298
299
300
301
302
303
304
# File 'lib/rocket_job/batch/io.rb', line 296

def upload_integer_range_in_reverse_order(start_id, last_id, category: :main)
  input(category).upload_integer_range_in_reverse_order(start_id, last_id)
  count             = last_id - start_id + 1
  self.record_count = (record_count || 0) + count
  count
rescue StandardError => exc
  input(category).delete_all
  raise(exc)
end

#upload_mongo_query(criteria, *column_names, category: :main, &block) ⇒ Object

Upload the result of a MongoDB query to the input collection for processing Useful when an entire MongoDB collection, or part thereof needs to be processed by a job.

Returns [Integer] the number of records uploaded

If a Block is supplied it is passed the document returned from the database and should return a record for processing

If no Block is supplied then the record will be the :fields returned from MongoDB

Note:

This method uses the collection and not the MongoMapper document to
avoid the overhead of constructing a Model with every document returned
by the query

Note:

The Block must return types that can be serialized to BSON.
Valid Types: Hash | Array | String | Integer | Float | Symbol | Regexp | Time
Invalid: Date, etc.

Example: Upload document ids

criteria = User.where(state: 'FL')
job.record_count = job.upload_mongo_query(criteria)

Example: Upload just the supplied column

criteria = User.where(state: 'FL')
job.record_count = job.upload_mongo_query(criteria, :zip_code)

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to the number of records returned by the monqo query.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



231
232
233
234
235
236
237
238
# File 'lib/rocket_job/batch/io.rb', line 231

def upload_mongo_query(criteria, *column_names, category: :main, &block)
  count             = input(category).upload_mongo_query(criteria, *column_names, &block)
  self.record_count = (record_count || 0) + count
  count
rescue StandardError => exc
  input(category).delete_all
  raise(exc)
end

#upload_slice(slice) ⇒ Object

Upload the supplied slices for processing by workers

Updates the record_count after adding the records

Returns [Integer] the number of records uploaded

Parameters

`slice` [ Array<Hash | Array | String | Integer | Float | Symbol | Regexp | Time> ]
  All elements in `array` must be serializable to BSON
  For example the following types are not supported: Date

Note:

The caller should honor `:slice_size`, the entire slice is loaded as-is.

Note:

Not thread-safe. Only call from one thread at a time


322
323
324
325
326
327
# File 'lib/rocket_job/batch/io.rb', line 322

def upload_slice(slice)
  input.insert(slice)
  count             = slice.size
  self.record_count = (record_count || 0) + count
  count
end

#write_output(result = nil, input_slice = nil, &block) ⇒ Object

Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.

If a block is supplied, the block is supplied with a writer that should be used to accumulate the results.

Examples

job.write_output(‘hello world’)

job.write_output do |writer|

writer << 'hello world'

end

job.write_output do |writer|

result = RocketJob::Batch::Results
result << RocketJob::Batch::Result.new(:main, 'hello world')
result << RocketJob::Batch::Result.new(:errors, 'errors')
writer << result

end

result = RocketJob::Batch::Results result << RocketJob::Batch::Result.new(:main, ‘hello world’) result << RocketJob::Batch::Result.new(:errors, ‘errors’) job.write_output(result)



373
374
375
376
377
378
379
380
# File 'lib/rocket_job/batch/io.rb', line 373

def write_output(result = nil, input_slice = nil, &block)
  if block
    RocketJob::Sliced::Writer::Output.collect(self, input_slice, &block)
  else
    raise(ArgumentError, 'result parameter is required when no block is supplied') unless result
    RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result }
  end
end