Module: IOStreams
- Defined in:
- lib/iostreams.rb,
lib/io_streams/s3.rb,
lib/io_streams/pgp.rb,
lib/io_streams/errors.rb,
lib/io_streams/streams.rb,
lib/io_streams/tabular.rb,
lib/io_streams/version.rb,
lib/io_streams/s3/reader.rb,
lib/io_streams/s3/writer.rb,
lib/io_streams/io_streams.rb,
lib/io_streams/pgp/reader.rb,
lib/io_streams/pgp/writer.rb,
lib/io_streams/row/reader.rb,
lib/io_streams/row/writer.rb,
lib/io_streams/zip/reader.rb,
lib/io_streams/zip/writer.rb,
lib/io_streams/file/reader.rb,
lib/io_streams/file/writer.rb,
lib/io_streams/gzip/reader.rb,
lib/io_streams/gzip/writer.rb,
lib/io_streams/line/reader.rb,
lib/io_streams/line/writer.rb,
lib/io_streams/sftp/reader.rb,
lib/io_streams/sftp/writer.rb,
lib/io_streams/xlsx/reader.rb,
lib/io_streams/bzip2/reader.rb,
lib/io_streams/bzip2/writer.rb,
lib/io_streams/encode/reader.rb,
lib/io_streams/encode/writer.rb,
lib/io_streams/record/reader.rb,
lib/io_streams/record/writer.rb,
lib/io_streams/tabular/header.rb,
lib/io_streams/tabular/parser/csv.rb,
lib/io_streams/tabular/parser/psv.rb,
lib/io_streams/tabular/parser/base.rb,
lib/io_streams/tabular/parser/hash.rb,
lib/io_streams/tabular/parser/json.rb,
lib/io_streams/tabular/parser/array.rb,
lib/io_streams/tabular/parser/fixed.rb,
lib/io_streams/tabular/utility/csv_row.rb
Overview
Streaming library for Ruby
Stream types / extensions supported:
.zip Zip File [ :zip ]
.gz, .gzip GZip File [ :gzip ]
.enc File Encrypted using symmetric encryption [ :enc ]
etc...
other All other extensions will be returned as: []
When a file is encrypted, it may also be compressed:
.zip.enc [ :zip, :enc ]
.gz.enc [ :gz, :enc ]
Defined Under Namespace
Modules: Bzip2, Encode, Errors, File, Gzip, Line, Pgp, Record, Row, S3, SFTP, Xlsx, Zip Classes: Extension, StreamStruct, Streams, Tabular
Constant Summary collapse
- VERSION =
'0.16.2'
- UTF8_ENCODING =
Encoding.find('UTF-8').freeze
- BINARY_ENCODING =
Encoding.find('BINARY').freeze
Class Method Summary collapse
-
.blank?(value) ⇒ Boolean
Helper method: Returns [true|false] if a value is blank?.
-
.compressed?(file_name) ⇒ Boolean
Returns [true|false] whether the file is compressed.
-
.copy(source_file_name_or_io, target_file_name_or_io, buffer_size: 65536, source_options: {}, target_options: {}) ⇒ Object
Copies the source file/stream to the target file/stream.
-
.deregister_extension(extension) ⇒ Object
De-Register a file extension.
-
.each_line(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning one line at a time.
-
.each_record(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Returns [Hash] of every record in a file or stream with support for headers.
-
.each_row(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning one line at a time.
-
.encrypted?(file_name) ⇒ Boolean
Returns [true|false] whether the file is encrypted.
-
.line_reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning each record/line one at a time.
- .line_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
-
.reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object
Returns a Reader for reading a file / stream.
-
.reader_stream?(file_name_or_io) ⇒ Boolean
Returns [true|false] whether the supplied file_name_or_io is a reader stream.
-
.record_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning each line as a hash, one at a time.
- .record_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
-
.register_extension(extension, reader_class, writer_class) ⇒ Object
Register a file extension and the reader and writer streaming classes.
-
.row_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning each line as an array, one at a time.
- .row_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
-
.streams_for_file_name(file_name) ⇒ Object
Returns [Array] the formats required to process the file by looking at its extension(s).
-
.writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object
Returns a Writer for writing to a file / stream.
-
.writer_stream?(file_name_or_io) ⇒ Boolean
Returns [true|false] whether the supplied file_name_or_io is a reader stream.
Class Method Details
.blank?(value) ⇒ Boolean
Helper method: Returns [true|false] if a value is blank?
405 406 407 408 409 410 411 412 413 |
# File 'lib/io_streams/io_streams.rb', line 405 def self.blank?(value) if value.nil? true elsif value.is_a?(String) value !~ /\S/ else value.respond_to?(:empty?) ? value.empty? : !value end end |
.compressed?(file_name) ⇒ Boolean
Returns [true|false] whether the file is compressed. Note: Currently only looks at the file name extension
280 281 282 |
# File 'lib/io_streams/io_streams.rb', line 280 def self.compressed?(file_name) !(file_name =~ /\.(zip|gz|gzip|xls.|)\z/i).nil? end |
.copy(source_file_name_or_io, target_file_name_or_io, buffer_size: 65536, source_options: {}, target_options: {}) ⇒ Object
Copies the source file/stream to the target file/stream. Returns [Integer] the number of bytes copied
Example: Copy between 2 files
IOStreams.copy('a.csv', 'b.csv')
Example: Read content from a Xlsx file and write it out in CSV form.
IOStreams.copy('a.xlsx', 'b.csv')
Example:
# Read content from a JSON file and write it out in CSV form.
#
# The output header for the CSV file is extracted from the first row in the JSON file.
# If the first JSON row does not contain all the column names then they will be ignored
# for the rest of the file.
IOStreams.copy('a.json', 'b.csv')
Example:
# Read a PSV file and write out a CSV file from it.
IOStreams.copy('a.psv', 'b.csv')
Example:
# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name already includes `.enc` in the filename, it is automatically
# encrypted.
IOStreams.copy('a.csv', 'b.csv.enc')
Example:
# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name does not include `.enc` in the filename, to encrypt it
# the encryption stream is added.
IOStreams.copy('a.csv', 'b', target_options: [:enc])
Example:
# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name does not include `.enc` in the filename, to encrypt it
# the encryption stream is added, along with the optional compression option.
IOStreams.copy('a.csv', 'b', target_options: [enc: { compress: true }])
Example:
# Create a pgp encrypted file.
# For PGP Encryption the recipients email address is required.
IOStreams.copy('a.xlsx', 'b.csv.pgp', target_options: [:csv, pgp: { recipient_email: '[email protected]' }])
Example: Copy between 2 existing streams
IOStreams.reader('a.csv') do |source_stream|
IOStreams.writer('b.csv.enc') do |target_stream|
IOStreams.copy(source_stream, target_stream)
end
end
Example:
# Copy between 2 csv files, reducing the number of columns present and encrypting the
# target file with Symmetric Encryption
output_headers = %w[name address]
IOStreams.copy(
'a.csv',
'b.csv.enc',
target_options: [csv:{headers: output_headers}, enc: {compress: true}]
)
Example:
# Copy a locally encrypted file to AWS S3.
# Decrypts the file, then compresses it with gzip as it is being streamed into S3.
# Useful for when the entire bucket is encrypted on S3.
IOStreams.copy('a.csv.enc', 's3://my_bucket/b.csv.gz')
254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/io_streams/io_streams.rb', line 254 def self.copy(source_file_name_or_io, target_file_name_or_io, buffer_size: 65536, source_options: {}, target_options: {}) bytes = 0 reader(source_file_name_or_io, **) do |source_stream| writer(target_file_name_or_io, **) do |target_stream| while data = source_stream.read(buffer_size) break if data.size == 0 bytes += data.size target_stream.write(data) end end end bytes end |
.deregister_extension(extension) ⇒ Object
De-Register a file extension
Returns [Symbol] the extension removed, or nil if the extension was not registered
Example:
register_extension(:xls)
399 400 401 402 |
# File 'lib/io_streams/io_streams.rb', line 399 def self.deregister_extension(extension) raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.to_s =~ /\A\w+\Z/ @extensions.delete(extension.to_sym) end |
.each_line(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning one line at a time.
73 74 75 76 77 |
# File 'lib/io_streams/io_streams.rb', line 73 def self.each_line(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) line_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |line_stream| line_stream.each(&block) end end |
.each_record(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Returns [Hash] of every record in a file or stream with support for headers.
Reading a delimited stream and converting to tabular form.
Each record / line is returned one at a time so that very large files can be read without having to load the entire file into memory.
Example:
file_name = 'customer_data.csv.pgp'
IOStreams.each_record(file_name) do |hash|
p hash
end
98 99 100 101 102 |
# File 'lib/io_streams/io_streams.rb', line 98 def self.each_record(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) record_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |record_stream| record_stream.each(&block) end end |
.each_row(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning one line at a time.
80 81 82 83 84 |
# File 'lib/io_streams/io_streams.rb', line 80 def self.each_row(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) row_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |row_stream| row_stream.each(&block) end end |
.encrypted?(file_name) ⇒ Boolean
Returns [true|false] whether the file is encrypted. Note: Currently only looks at the file name extension
286 287 288 |
# File 'lib/io_streams/io_streams.rb', line 286 def self.encrypted?(file_name) !(file_name =~ /\.(enc|pgp|gpg)\z/i).nil? end |
.line_reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning each record/line one at a time.
318 319 320 321 322 323 324 |
# File 'lib/io_streams/io_streams.rb', line 318 def self.line_reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Line::Reader) || file_name_or_io.is_a?(Array) reader(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io| IOStreams::Line::Reader.open(io, **args, &block) end end |
.line_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
160 161 162 163 164 165 166 |
# File 'lib/io_streams/io_streams.rb', line 160 def self.line_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Line::Writer) || file_name_or_io.is_a?(Array) writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io| IOStreams::Line::Writer.open(io, **args, &block) end end |
.reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object
Returns a Reader for reading a file / stream
Parameters
file_name_or_io [String|IO]
The file_name of the file to write to, or an IO Stream that implements
#read.
streams [Symbol|Array]
The formats/streams that be used to convert the data whilst it is
being read.
When nil, the file_name will be inspected to try and determine what
streams should be applied.
Default: nil
file_name [String]
When `streams` is not supplied, `file_name` can be used for determining the streams
to apply to read the file/stream.
This is particularly useful when `file_name_or_io` is a stream, or a temporary file name.
Default: nil
Example: Zip
IOStreams.reader('myfile.zip') do |stream|
puts stream.read
end
Example: Encrypted Zip
IOStreams.reader('myfile.zip.enc') do |stream|
puts stream.read
end
Example: Explicitly set the streams
IOStreams.reader('myfile.zip.enc', [:zip, :enc]) do |stream|
puts stream.read
end
Example: Supply custom options
# Encrypt the file and get Symmetric Encryption to also compress it
IOStreams.reader('myfile.csv.enc', streams: enc: {compress: true}) do |stream|
puts stream.read
end
Note:
-
Passes the file_name_or_io as-is into the block if it is already a reader stream AND no streams are passed in.
68 69 70 |
# File 'lib/io_streams/io_streams.rb', line 68 def self.reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) stream(:reader, file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, &block) end |
.reader_stream?(file_name_or_io) ⇒ Boolean
Returns [true|false] whether the supplied file_name_or_io is a reader stream
269 270 271 |
# File 'lib/io_streams/io_streams.rb', line 269 def self.reader_stream?(file_name_or_io) file_name_or_io.respond_to?(:read) end |
.record_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning each line as a hash, one at a time.
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/io_streams/io_streams.rb', line 354 def self.record_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Record::Reader) line_reader( file_name_or_io, streams: streams, delimiter: delimiter, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace ) do |io| file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String) IOStreams::Record::Reader.open(io, file_name: file_name, **args, &block) end end |
.record_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/io_streams/io_streams.rb', line 178 def self.record_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Record::Writer) line_writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io| file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String) IOStreams::Record::Writer.open(io, file_name: file_name, **args, &block) end end |
.register_extension(extension, reader_class, writer_class) ⇒ Object
Register a file extension and the reader and writer streaming classes
Example:
# MyXls::Reader and MyXls::Writer must implement .open
register_extension(:xls, MyXls::Reader, MyXls::Writer)
388 389 390 391 |
# File 'lib/io_streams/io_streams.rb', line 388 def self.register_extension(extension, reader_class, writer_class) raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.nil? || extension.to_s =~ /\A\w+\Z/ @extensions[extension.nil? ? nil : extension.to_sym] = Extension.new(reader_class, writer_class) end |
.row_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
Iterate over a file / stream returning each line as an array, one at a time.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/io_streams/io_streams.rb', line 327 def self.row_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Row::Reader) line_reader( file_name_or_io, streams: streams, delimiter: delimiter, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace ) do |io| file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String) IOStreams::Row::Reader.open(io, file_name: file_name, **args, &block) end end |
.row_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object
168 169 170 171 172 173 174 175 176 |
# File 'lib/io_streams/io_streams.rb', line 168 def self.row_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Row::Writer) line_writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io| file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String) IOStreams::Row::Writer.open(io, file_name: file_name, **args, &block) end end |
.streams_for_file_name(file_name) ⇒ Object
Returns [Array] the formats required to process the file by looking at its extension(s)
Example Zip file:
RocketJob::Formatter::Formats.streams_for_file_name('myfile.zip')
=> [ :zip ]
Example Encrypted Gzip file:
RocketJob::Formatter::Formats.streams_for_file_name('myfile.csv.gz.enc')
=> [ :gz, :enc ]
Example plain text / binary file:
RocketJob::Formatter::Formats.streams_for_file_name('myfile.csv')
=> [ :file ]
304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/io_streams/io_streams.rb', line 304 def self.streams_for_file_name(file_name) raise ArgumentError.new('File name cannot be nil') if file_name.nil? raise ArgumentError.new("File name must be a string: #{file_name.inspect}, class: #{file_name.class}") unless file_name.is_a?(String) parts = file_name.split('.') extensions = [] while extension = parts.pop sym = extension.downcase.to_sym break unless @extensions[sym] extensions.unshift(sym) end extensions end |
.writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object
Returns a Writer for writing to a file / stream
Parameters
file_name_or_io [String|IO]
The file_name of the file to write to, or an IO Stream that implements
#write.
streams [Symbol|Array]
The formats/streams that be used to convert the data whilst it is
being written.
When nil, the file_name will be inspected to try and determine what
streams should be applied.
Default: nil
Stream types / extensions supported:
.zip Zip File [ :zip ]
.gz, .gzip GZip File [ :gzip ]
.enc File Encrypted using symmetric encryption [ :enc ]
other All other extensions will be returned as: [ :file ]
When a file is encrypted, it may also be compressed:
.zip.enc [ :zip, :enc ]
.gz.enc [ :gz, :enc ]
Example: Zip
IOStreams.writer('myfile.zip') do |stream|
stream.write(data)
end
Example: Encrypted Zip
IOStreams.writer('myfile.zip.enc') do |stream|
stream.write(data)
end
Example: Explicitly set the streams
IOStreams.writer('myfile.zip.enc', [:zip, :enc]) do |stream|
stream.write(data)
end
Example: Supply custom options
IOStreams.writer('myfile.csv.enc', [enc: { compress: true }]) do |stream|
stream.write(data)
end
Example: Set internal filename when creating a zip file
IOStreams.writer('myfile.csv.zip', zip: { zip_file_name: 'myfile.csv' }) do |stream|
stream.write(data)
end
Note:
-
Passes the file_name_or_io as-is into the block if it is already a writer stream AND no streams are passed in.
156 157 158 |
# File 'lib/io_streams/io_streams.rb', line 156 def self.writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) stream(:writer, file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, &block) end |
.writer_stream?(file_name_or_io) ⇒ Boolean
Returns [true|false] whether the supplied file_name_or_io is a reader stream
274 275 276 |
# File 'lib/io_streams/io_streams.rb', line 274 def self.writer_stream?(file_name_or_io) file_name_or_io.respond_to?(:write) end |