Module: RocketJob::Batch::IO

Extended by:
ActiveSupport::Concern
Included in:
RocketJob::Batch
Defined in:
lib/rocket_job/batch/io.rb

Overview

IO methods for sliced jobs

Instance Method Summary collapse

Instance Method Details

#download(stream = nil, category: :main, header_line: nil, **args, &block) ⇒ Object

Download the output data into the supplied file, io, IOStreams::Path, or IOStreams::Stream. Returns [Integer] the number of records / lines downloaded.

Parameters

stream [String | IO | IOStreams::Path | IOStreams::Stream]
  Full path and file name to stream into the job,
  Or, an IO stream that responds to: :write
  Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3

Example: Zip

# Since csv is not known to RocketJob it is ignored
job.download('myfile.csv.zip')

Example: Encrypted Zip

job.download('myfile.csv.zip.enc')

Example: Explicitly set the streams

path = IOStreams.path('myfile.ze').stream(:zip).stream(:enc)
job.download(path)

Example: Supply custom options

path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false)
job.download(path)

Example: Supply custom options. Set the file name within the zip file.

path = IOStreams.path('myfile.csv.zip').option(:zip, zip_file_name: 'myfile.csv')
job.download(path)

Example: Download into a tempfile, or stream, using the original file name to determine the streams to apply:

tempfile = Tempfile.new('my_project')
stream = IOStreams.stream(tempfile).file_name('myfile.gz.enc')
job.download(stream)

Example: Add a header and/or trailer record to the downloaded file:

IOStreams.path('/tmp/file.txt.gz').writer do |writer|
  writer << "Header\n"
  job.download do |line|
    writer << line + "\n"
  end
  writer << "Trailer\n"
end

Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks:

IOStreams.path('/tmp/file.txt.gz').writer(:line) do |writer|
  writer << "Header"
  job.download do |line|
    writer << line
  end
  writer << "Trailer"
end

Notes:

  • The records are returned in ‘_id’ order. Usually this is the order in which the records were originally loaded.



353
354
355
356
357
358
359
360
361
# File 'lib/rocket_job/batch/io.rb', line 353

def download(stream = nil, category: :main, header_line: nil, **args, &block)
  raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing?

  return output(category).download(header_line: header_line, &block) if block

  IOStreams.new(stream).writer(:line, **args) do |io|
    output(category).download(header_line: header_line) { |record| io << record }
  end
end

#input(category = :main) ⇒ Object

Returns [RocketJob::Sliced::Input] input collection for holding input slices

Parameters:

category [Symbol]
  The name of the category to access or upload data into
  Default: None ( Uses the single default input collection for this job )
  Validates: This value must be one of those listed in #input_categories


16
17
18
19
20
21
22
# File 'lib/rocket_job/batch/io.rb', line 16

def input(category = :main)
  unless input_categories.include?(category) || (category == :main)
    raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}"
  end

  (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(rocket_job_io_slice_arguments("inputs", category))
end

#output(category = :main) ⇒ Object

Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected

Parameters:

category [Symbol]
  The name of the category to access or download data from
  Default: None ( Uses the single default output collection for this job )
  Validates: This value must be one of those listed in #output_categories


32
33
34
35
36
37
38
# File 'lib/rocket_job/batch/io.rb', line 32

def output(category = :main)
  unless output_categories.include?(category) || (category == :main)
    raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}"
  end

  (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(rocket_job_io_slice_arguments("outputs", category))
end

#upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block) ⇒ Object

Upload the supplied file, io, IOStreams::Path, or IOStreams::Stream.

Returns [Integer] the number of records uploaded.

Parameters

stream [String | IO | IOStreams::Path | IOStreams::Stream]
  Full path and file name to stream into the job,
  Or, an IO Stream that responds to: :read
  Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3

delimiter[String]
  Line / Record delimiter to use to break the stream up into records
    Any string to break the stream up by
    The records when saved will not include this delimiter
  Default: nil
    Automatically detect line endings and break up by line
    Searches for the first "\r\n" or "\n" and then uses that as the
    delimiter for all subsequent records

stream_mode: [:line | :array | :hash]
  :line
    Uploads the file a line (String) at a time for processing by workers.
  :array
    Parses each line from the file as an Array and uploads each array for processing by workers.
  :hash
    Parses each line from the file into a Hash and uploads each hash for processing by workers.
  See IOStreams::Stream#each.

Example:

# Load plain text records from a file
job.upload('hello.csv')

Example:

# Load plain text records from a file, stripping all non-printable characters,
# as well as any characters that cannot be converted to UTF-8
path = IOStreams.path('hello.csv').option(:encode, cleaner: :printable, replace: '')
job.upload(path)

Example: Zip

# Since csv is not known to RocketJob it is ignored
job.upload('myfile.csv.zip')

Example: Encrypted Zip

job.upload('myfile.csv.zip.enc')

Example: Explicitly set the streams

path = IOStreams.path('myfile.ze').stream(:encode, encoding: 'UTF-8').stream(:zip).stream(:enc)
job.upload(path)

Example: Supply custom options

path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false).option(:encode, encoding: 'UTF-8')
job.upload(path)

Example: Read from a tempfile and use the original file name to determine which streams to apply

temp_file = Tempfile.new('my_project')
temp_file.write(gzip_and_encrypted_data)
stream = IOStreams.stream(temp_file).file_name('myfile.gz.enc')
job.upload(stream)

Example: Upload by writing records one at a time to the upload stream

job.upload do |writer|
  10.times { |i| writer << i }
end

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to the number of records returned by the arel.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.

  • By default all data read from the file/stream is converted into UTF-8 before being persisted. This is recommended since Mongo only supports UTF-8 strings.

  • When zip format, the Zip file/stream must contain only one file, the first file found will be loaded into the job

  • If an io stream is supplied, it is read until it returns nil.

  • Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.

  • CSV parsing is slow, so it is usually left for the workers to do.

Raises:

  • (ArgumentError)


116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rocket_job/batch/io.rb', line 116

def upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block)
  raise(ArgumentError, "Either stream, or a block must be supplied") unless stream || block

  stream_mode = stream_mode.to_sym
  # Backward compatibility with existing v4 jobs
  stream_mode = :array if stream_mode == :row
  stream_mode = :hash if stream_mode == :record

  count             =
    if block
      input(category).upload(on_first: on_first, &block)
    else
      path                  = IOStreams.new(stream)
      path.file_name        = file_name if file_name
      self.upload_file_name = path.file_name
      input(category).upload(on_first: on_first) do |io|
        path.each(stream_mode, **args) { |line| io << line }
      end
    end
  self.record_count = (record_count || 0) + count
  count
end

#upload_arel(arel, *column_names, category: :main, &block) ⇒ Object

Upload results from an Arel into RocketJob::SlicedJob.

Params

column_names
  When a block is not supplied, supply the names of the columns to be returned
  and uploaded into the job
  These columns are automatically added to the select list to reduce overhead

If a Block is supplied it is passed the model returned from the database and should return the work item to be uploaded into the job.

Returns [Integer] the number of records uploaded

Example: Upload id’s for all users

arel = User.all
job.upload_arel(arel)

Example: Upload selected user id’s

arel = User.where(country_code: 'US')
job.upload_arel(arel)

Example: Upload user_name and zip_code

arel = User.where(country_code: 'US')
job.upload_arel(arel, :user_name, :zip_code)

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to the number of records returned by the arel.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



169
170
171
172
173
# File 'lib/rocket_job/batch/io.rb', line 169

def upload_arel(arel, *column_names, category: :main, &block)
  count             = input(category).upload_arel(arel, *column_names, &block)
  self.record_count = (record_count || 0) + count
  count
end

#upload_integer_range(start_id, last_id, category: :main) ⇒ Object

Upload sliced range of integer requests as arrays of start and end ids.

Returns [Integer] last_id - start_id + 1.

Uploads one range per slice so that the response can return multiple records for each slice processed

Example

job.slice_size = 100
job.upload_integer_range(200, 421)

# Equivalent to calling:
job.input.insert([200,299])
job.input.insert([300,399])
job.input.insert([400,421])

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to: last_id - start_id + 1.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



237
238
239
240
241
242
# File 'lib/rocket_job/batch/io.rb', line 237

def upload_integer_range(start_id, last_id, category: :main)
  input(category).upload_integer_range(start_id, last_id)
  count             = last_id - start_id + 1
  self.record_count = (record_count || 0) + count
  count
end

#upload_integer_range_in_reverse_order(start_id, last_id, category: :main) ⇒ Object

Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first

Returns [Integer] last_id - start_id + 1.

Uploads one range per slice so that the response can return multiple records for each slice processed. Useful for when the highest order integer values should be processed before the lower integer value ranges. For example when processing every record in a database based on the id column

Example

job.slice_size = 100
job.upload_integer_range_in_reverse_order(200, 421)

# Equivalent to calling:
job.input.insert([400,421])
job.input.insert([300,399])
job.input.insert([200,299])

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to: last_id - start_id + 1.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



269
270
271
272
273
274
# File 'lib/rocket_job/batch/io.rb', line 269

def upload_integer_range_in_reverse_order(start_id, last_id, category: :main)
  input(category).upload_integer_range_in_reverse_order(start_id, last_id)
  count             = last_id - start_id + 1
  self.record_count = (record_count || 0) + count
  count
end

#upload_mongo_query(criteria, *column_names, category: :main, &block) ⇒ Object

Upload the result of a MongoDB query to the input collection for processing Useful when an entire MongoDB collection, or part thereof needs to be processed by a job.

Returns [Integer] the number of records uploaded

If a Block is supplied it is passed the document returned from the database and should return a record for processing

If no Block is supplied then the record will be the :fields returned from MongoDB

Note:

This method uses the collection and not the MongoMapper document to
avoid the overhead of constructing a Model with every document returned
by the query

Note:

The Block must return types that can be serialized to BSON.
Valid Types: Hash | Array | String | Integer | Float | Symbol | Regexp | Time
Invalid: Date, etc.

Example: Upload document ids

criteria = User.where(state: 'FL')
job.record_count = job.upload_mongo_query(criteria)

Example: Upload just the supplied column

criteria = User.where(state: 'FL')
job.record_count = job.upload_mongo_query(criteria, :zip_code)

Notes:

  • Only call from one thread at a time against a single instance of this job.

  • The record_count for the job is set to the number of records returned by the monqo query.

  • If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.



210
211
212
213
214
# File 'lib/rocket_job/batch/io.rb', line 210

def upload_mongo_query(criteria, *column_names, category: :main, &block)
  count             = input(category).upload_mongo_query(criteria, *column_names, &block)
  self.record_count = (record_count || 0) + count
  count
end

#upload_slice(slice) ⇒ Object

Upload the supplied slices for processing by workers

Updates the record_count after adding the records

Returns [Integer] the number of records uploaded

Parameters

`slice` [ Array<Hash | Array | String | Integer | Float | Symbol | Regexp | Time> ]
  All elements in `array` must be serializable to BSON
  For example the following types are not supported: Date

Note:

The caller should honor `:slice_size`, the entire slice is loaded as-is.

Note:

Not thread-safe. Only call from one thread at a time


292
293
294
295
296
297
# File 'lib/rocket_job/batch/io.rb', line 292

def upload_slice(slice)
  input.insert(slice)
  count             = slice.size
  self.record_count = (record_count || 0) + count
  count
end

#write_output(result = nil, input_slice = nil, &block) ⇒ Object

Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.

If a block is supplied, the block is supplied with a writer that should be used to accumulate the results.

Examples

job.write_output(‘hello world’)

job.write_output do |writer|

writer << 'hello world'

end

job.write_output do |writer|

result = RocketJob::Batch::Results
result << RocketJob::Batch::Result.new(:main, 'hello world')
result << RocketJob::Batch::Result.new(:errors, 'errors')
writer << result

end

result = RocketJob::Batch::Results result << RocketJob::Batch::Result.new(:main, ‘hello world’) result << RocketJob::Batch::Result.new(:errors, ‘errors’) job.write_output(result)



387
388
389
390
391
392
393
394
395
# File 'lib/rocket_job/batch/io.rb', line 387

def write_output(result = nil, input_slice = nil, &block)
  if block
    RocketJob::Sliced::Writer::Output.collect(self, input_slice, &block)
  else
    raise(ArgumentError, "result parameter is required when no block is supplied") unless result

    RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result }
  end
end