Module: RocketJob::Batch::IO
- Extended by:
- ActiveSupport::Concern
- Included in:
- RocketJob::Batch
- Defined in:
- lib/rocket_job/batch/io.rb
Overview
IO methods for sliced jobs
Instance Method Summary collapse
-
#download(file_name_or_io = nil, category: :main, **args, &block) ⇒ Object
Download the output data into the supplied file_name or stream.
-
#input(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Input] input collection for holding input slices.
-
#output(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected.
-
#upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block) ⇒ Object
Upload the supplied file_name or stream.
-
#upload_arel(arel, *column_names, category: :main, &block) ⇒ Object
Upload results from an Arel into RocketJob::SlicedJob.
-
#upload_integer_range(start_id, last_id, category: :main) ⇒ Object
Upload sliced range of integer requests as arrays of start and end ids.
-
#upload_integer_range_in_reverse_order(start_id, last_id, category: :main) ⇒ Object
Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first.
-
#upload_mongo_query(criteria, *column_names, category: :main, &block) ⇒ Object
Upload the result of a MongoDB query to the input collection for processing Useful when an entire MongoDB collection, or part thereof needs to be processed by a job.
-
#upload_slice(slice) ⇒ Object
Upload the supplied slices for processing by workers.
-
#write_output(result = nil, input_slice = nil, &block) ⇒ Object
Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.
Instance Method Details
#download(file_name_or_io = nil, category: :main, **args, &block) ⇒ Object
Download the output data into the supplied file_name or stream
Parameters
file_name_or_io [String|IO]
The file_name of the file to write to, or an IO Stream that implements #write.
options:
category [Symbol]
The category of output to download
Default: :main
See RocketJob::Sliced::Output#download for remaining options
Returns [Integer] the number of records downloaded
343 344 345 346 347 |
# File 'lib/rocket_job/batch/io.rb', line 343 def download(file_name_or_io = nil, category: :main, **args, &block) raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing? output(category).download(file_name_or_io, **args, &block) end |
#input(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Input] input collection for holding input slices
Parameters:
category [Symbol]
The name of the category to access or upload data into
Default: None ( Uses the single default input collection for this job )
Validates: This value must be one of those listed in #input_categories
16 17 18 19 20 21 22 23 |
# File 'lib/rocket_job/batch/io.rb', line 16 def input(category = :main) raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" unless input_categories.include?(category) || (category == :main) collection_name = "rocket_job.inputs.#{id}" collection_name << ".#{category}" unless category == :main (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(slice_arguments(collection_name)) end |
#output(category = :main) ⇒ Object
Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected
Parameters:
category [Symbol]
The name of the category to access or download data from
Default: None ( Uses the single default output collection for this job )
Validates: This value must be one of those listed in #output_categories
33 34 35 36 37 38 39 40 |
# File 'lib/rocket_job/batch/io.rb', line 33 def output(category = :main) raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" unless output_categories.include?(category) || (category == :main) collection_name = "rocket_job.outputs.#{id}" collection_name << ".#{category}" unless category == :main (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(slice_arguments(collection_name)) end |
#upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block) ⇒ Object
Upload the supplied file_name or stream.
Returns [Integer] the number of records uploaded.
Parameters
file_name_or_io [String | IO]
Full path and file name to stream into the job,
Or, an IO Stream that responds to: :read
streams [Symbol|Array]
Streams to convert the data whilst it is being read.
When nil, the file_name extensions will be inspected to determine what
streams should be applied.
Default: nil
delimiter[String]
Line / Record delimiter to use to break the stream up into records
Any string to break the stream up by
The records when saved will not include this delimiter
Default: nil
Automatically detect line endings and break up by line
Searches for the first "\r\n" or "\n" and then uses that as the
delimiter for all subsequent records
buffer_size [Integer]
Size of the blocks when reading from the input file / stream.
Default: 65536 ( 64K )
encoding: [String|Encoding]
Encode returned data with this encoding.
'US-ASCII': Original 7 bit ASCII Format
'ASCII-8BIT': 8-bit ASCII Format
'UTF-8': UTF-8 Format
Etc.
Default: 'UTF-8'
encode_replace: [String]
The character to replace with when a character cannot be converted to the target encoding.
nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised.
Default: nil
encode_cleaner: [nil|symbol|Proc]
Cleanse data read from the input stream.
nil: No cleansing
:printable Cleanse all non-printable characters except \r and \n
Proc/lambda Proc to call after every read to cleanse the data
Default: :printable
stream_mode: [:line | :row | :record]
:line
Uploads the file a line (String) at a time for processing by workers.
:row
Parses each line from the file as an Array and uploads each array for processing by workers.
:record
Parses each line from the file into a Hash and uploads each hash for processing by workers.
See IOStream#each_line, IOStream#each_row, and IOStream#each_record.
Example:
# Load plain text records from a file
job.input.upload('hello.csv')
Example:
# Load plain text records from a file, stripping all non-printable characters,
# as well as any characters that cannot be converted to UTF-8
job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')
Example: Zip
# Since csv is not known to RocketJob it is ignored
job.input.upload('myfile.csv.zip')
Example: Encrypted Zip
job.input.upload('myfile.csv.zip.enc')
Example: Explicitly set the streams
job.input.upload('myfile.ze', streams: [:zip, :enc])
Example: Supply custom options
job.input.upload('myfile.csv.enc', streams: :enc])
Example: Extract streams from filename but write to a temp file
streams = IOStreams.streams_for_file_name('myfile.gz.enc')
t = Tempfile.new('my_project')
job.input.upload(t.to_path, streams: streams)
Example: Upload by writing records one at a time to the upload stream
job.upload do |writer|
10.times { |i| writer << i }
end
Notes:
-
Only call from one thread at a time against a single instance of this job.
-
The record_count for the job is set to the number of records returned by the arel.
-
If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.
-
By default all data read from the file/stream is converted into UTF-8 before being persisted. This is recommended since Mongo only supports UTF-8 strings.
-
When zip format, the Zip file/stream must contain only one file, the first file found will be loaded into the job
-
If an io stream is supplied, it is read until it returns nil.
-
Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.
-
CSV parsing is slow, so it is usually left for the workers to do.
143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/rocket_job/batch/io.rb', line 143 def upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block) if file_name self.upload_file_name = file_name elsif file_name_or_io.is_a?(String) self.upload_file_name = file_name_or_io end count = input(category).upload(file_name_or_io, file_name: file_name, **args, &block) self.record_count = (record_count || 0) + count count rescue StandardError => exc input(category).delete_all raise(exc) end |
#upload_arel(arel, *column_names, category: :main, &block) ⇒ Object
Upload results from an Arel into RocketJob::SlicedJob.
Params
column_names
When a block is not supplied, supply the names of the columns to be returned
and uploaded into the job
These columns are automatically added to the select list to reduce overhead
If a Block is supplied it is passed the model returned from the database and should return the work item to be uploaded into the job.
Returns [Integer] the number of records uploaded
Example: Upload id’s for all users
arel = User.all
job.upload_arel(arel)
Example: Upload selected user id’s
arel = User.where(country_code: 'US')
job.upload_arel(arel)
Example: Upload user_name and zip_code
arel = User.where(country_code: 'US')
job.upload_arel(arel, :user_name, :zip_code)
Notes:
-
Only call from one thread at a time against a single instance of this job.
-
The record_count for the job is set to the number of records returned by the arel.
-
If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.
187 188 189 190 191 192 193 194 |
# File 'lib/rocket_job/batch/io.rb', line 187 def upload_arel(arel, *column_names, category: :main, &block) count = input(category).upload_arel(arel, *column_names, &block) self.record_count = (record_count || 0) + count count rescue StandardError => exc input(category).delete_all raise(exc) end |
#upload_integer_range(start_id, last_id, category: :main) ⇒ Object
Upload sliced range of integer requests as arrays of start and end ids.
Returns [Integer] last_id - start_id + 1.
Uploads one range per slice so that the response can return multiple records for each slice processed
Example
job.slice_size = 100
job.upload_integer_range(200, 421)
# Equivalent to calling:
job.input.insert([200,299])
job.input.insert([300,399])
job.input.insert([400,421])
Notes:
-
Only call from one thread at a time against a single instance of this job.
-
The record_count for the job is set to: last_id - start_id + 1.
-
If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.
261 262 263 264 265 266 267 268 269 |
# File 'lib/rocket_job/batch/io.rb', line 261 def upload_integer_range(start_id, last_id, category: :main) input(category).upload_integer_range(start_id, last_id) count = last_id - start_id + 1 self.record_count = (record_count || 0) + count count rescue StandardError => exc input(category).delete_all raise(exc) end |
#upload_integer_range_in_reverse_order(start_id, last_id, category: :main) ⇒ Object
Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first
Returns [Integer] last_id - start_id + 1.
Uploads one range per slice so that the response can return multiple records for each slice processed. Useful for when the highest order integer values should be processed before the lower integer value ranges. For example when processing every record in a database based on the id column
Example
job.slice_size = 100
job.upload_integer_range_in_reverse_order(200, 421)
# Equivalent to calling:
job.input.insert([400,421])
job.input.insert([300,399])
job.input.insert([200,299])
Notes:
-
Only call from one thread at a time against a single instance of this job.
-
The record_count for the job is set to: last_id - start_id + 1.
-
If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.
296 297 298 299 300 301 302 303 304 |
# File 'lib/rocket_job/batch/io.rb', line 296 def upload_integer_range_in_reverse_order(start_id, last_id, category: :main) input(category).upload_integer_range_in_reverse_order(start_id, last_id) count = last_id - start_id + 1 self.record_count = (record_count || 0) + count count rescue StandardError => exc input(category).delete_all raise(exc) end |
#upload_mongo_query(criteria, *column_names, category: :main, &block) ⇒ Object
Upload the result of a MongoDB query to the input collection for processing Useful when an entire MongoDB collection, or part thereof needs to be processed by a job.
Returns [Integer] the number of records uploaded
If a Block is supplied it is passed the document returned from the database and should return a record for processing
If no Block is supplied then the record will be the :fields returned from MongoDB
Note:
This method uses the collection and not the MongoMapper document to
avoid the overhead of constructing a Model with every document returned
by the query
Note:
The Block must return types that can be serialized to BSON.
Valid Types: Hash | Array | String | Integer | Float | Symbol | Regexp | Time
Invalid: Date, etc.
Example: Upload document ids
criteria = User.where(state: 'FL')
job.record_count = job.upload_mongo_query(criteria)
Example: Upload just the supplied column
criteria = User.where(state: 'FL')
job.record_count = job.upload_mongo_query(criteria, :zip_code)
Notes:
-
Only call from one thread at a time against a single instance of this job.
-
The record_count for the job is set to the number of records returned by the monqo query.
-
If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.
231 232 233 234 235 236 237 238 |
# File 'lib/rocket_job/batch/io.rb', line 231 def upload_mongo_query(criteria, *column_names, category: :main, &block) count = input(category).upload_mongo_query(criteria, *column_names, &block) self.record_count = (record_count || 0) + count count rescue StandardError => exc input(category).delete_all raise(exc) end |
#upload_slice(slice) ⇒ Object
Upload the supplied slices for processing by workers
Updates the record_count after adding the records
Returns [Integer] the number of records uploaded
Parameters
`slice` [ Array<Hash | Array | String | Integer | Float | Symbol | Regexp | Time> ]
All elements in `array` must be serializable to BSON
For example the following types are not supported: Date
Note:
The caller should honor `:slice_size`, the entire slice is loaded as-is.
Note:
Not thread-safe. Only call from one thread at a time
322 323 324 325 326 327 |
# File 'lib/rocket_job/batch/io.rb', line 322 def upload_slice(slice) input.insert(slice) count = slice.size self.record_count = (record_count || 0) + count count end |
#write_output(result = nil, input_slice = nil, &block) ⇒ Object
Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.
If a block is supplied, the block is supplied with a writer that should be used to accumulate the results.
Examples
job.write_output(‘hello world’)
job.write_output do |writer|
writer << 'hello world'
end
job.write_output do |writer|
result = RocketJob::Batch::Results
result << RocketJob::Batch::Result.new(:main, 'hello world')
result << RocketJob::Batch::Result.new(:errors, 'errors')
writer << result
end
result = RocketJob::Batch::Results result << RocketJob::Batch::Result.new(:main, ‘hello world’) result << RocketJob::Batch::Result.new(:errors, ‘errors’) job.write_output(result)
373 374 375 376 377 378 379 380 |
# File 'lib/rocket_job/batch/io.rb', line 373 def write_output(result = nil, input_slice = nil, &block) if block RocketJob::Sliced::Writer::Output.collect(self, input_slice, &block) else raise(ArgumentError, 'result parameter is required when no block is supplied') unless result RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result } end end |