Module: IOStreams

Defined in:
lib/iostreams.rb,
lib/io_streams/s3.rb,
lib/io_streams/pgp.rb,
lib/io_streams/errors.rb,
lib/io_streams/streams.rb,
lib/io_streams/tabular.rb,
lib/io_streams/version.rb,
lib/io_streams/s3/reader.rb,
lib/io_streams/s3/writer.rb,
lib/io_streams/io_streams.rb,
lib/io_streams/pgp/reader.rb,
lib/io_streams/pgp/writer.rb,
lib/io_streams/row/reader.rb,
lib/io_streams/row/writer.rb,
lib/io_streams/zip/reader.rb,
lib/io_streams/zip/writer.rb,
lib/io_streams/file/reader.rb,
lib/io_streams/file/writer.rb,
lib/io_streams/gzip/reader.rb,
lib/io_streams/gzip/writer.rb,
lib/io_streams/line/reader.rb,
lib/io_streams/line/writer.rb,
lib/io_streams/sftp/reader.rb,
lib/io_streams/sftp/writer.rb,
lib/io_streams/xlsx/reader.rb,
lib/io_streams/bzip2/reader.rb,
lib/io_streams/bzip2/writer.rb,
lib/io_streams/encode/reader.rb,
lib/io_streams/encode/writer.rb,
lib/io_streams/record/reader.rb,
lib/io_streams/record/writer.rb,
lib/io_streams/tabular/header.rb,
lib/io_streams/tabular/parser/csv.rb,
lib/io_streams/tabular/parser/psv.rb,
lib/io_streams/tabular/parser/base.rb,
lib/io_streams/tabular/parser/hash.rb,
lib/io_streams/tabular/parser/json.rb,
lib/io_streams/tabular/parser/array.rb,
lib/io_streams/tabular/parser/fixed.rb,
lib/io_streams/tabular/utility/csv_row.rb

Overview

Streaming library for Ruby

Stream types / extensions supported:

.zip       Zip File                                   [ :zip ]
.gz, .gzip GZip File                                  [ :gzip ]
.enc       File Encrypted using symmetric encryption  [ :enc ]
etc...
other      All other extensions will be returned as:  []

When a file is encrypted, it may also be compressed:

.zip.enc  [ :zip, :enc ]
.gz.enc   [ :gz,  :enc ]

Defined Under Namespace

Modules: Bzip2, Encode, Errors, File, Gzip, Line, Pgp, Record, Row, S3, SFTP, Xlsx, Zip Classes: Extension, StreamStruct, Streams, Tabular

Constant Summary collapse

VERSION =
'0.17.3'
UTF8_ENCODING =
Encoding.find('UTF-8').freeze
BINARY_ENCODING =
Encoding.find('BINARY').freeze

Class Method Summary collapse

Class Method Details

.blank?(value) ⇒ Boolean

Helper method: Returns [true|false] if a value is blank?

Returns:

  • (Boolean)


466
467
468
469
470
471
472
473
474
# File 'lib/io_streams/io_streams.rb', line 466

def self.blank?(value)
  if value.nil?
    true
  elsif value.is_a?(String)
    value !~ /\S/
  else
    value.respond_to?(:empty?) ? value.empty? : !value
  end
end

.compressed?(file_name) ⇒ Boolean

Returns [true|false] whether the file is compressed. Note: Currently only looks at the file name extension

Returns:

  • (Boolean)


307
308
309
# File 'lib/io_streams/io_streams.rb', line 307

def self.compressed?(file_name)
  !(file_name =~ /\.(zip|gz|gzip|xls.|)\z/i).nil?
end

.copy(source_file_name_or_io, target_file_name_or_io, buffer_size: 65536, source_options: {}, target_options: {}) ⇒ Object

Copies the source file/stream to the target file/stream. Returns [Integer] the number of bytes copied

Example: Copy between 2 files

IOStreams.copy('a.csv', 'b.csv')

Example: Read content from a Xlsx file and write it out in CSV form.

IOStreams.copy('a.xlsx', 'b.csv')

Example:

# Read content from a JSON file and write it out in CSV form.
#
# The output header for the CSV file is extracted from the first row in the JSON file.
# If the first JSON row does not contain all the column names then they will be ignored
# for the rest of the file.
IOStreams.copy('a.json', 'b.csv')

Example:

# Read a PSV file and write out a CSV file from it.
IOStreams.copy('a.psv', 'b.csv')

Example:

# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name already includes `.enc` in the filename, it is automatically
# encrypted.
IOStreams.copy('a.csv', 'b.csv.enc')

Example:

# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name does not include `.enc` in the filename, to encrypt it
# the encryption stream is added.
IOStreams.copy('a.csv', 'b', target_options: [:enc])

Example:

# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name does not include `.enc` in the filename, to encrypt it
# the encryption stream is added, along with the optional compression option.
IOStreams.copy('a.csv', 'b', target_options: [enc: { compress: true }])

Example:

# Create a pgp encrypted file.
# For PGP Encryption the recipients email address is required.
IOStreams.copy('a.xlsx', 'b.csv.pgp', target_options: [:csv, pgp: { recipient_email: '[email protected]' }])

Example: Copy between 2 existing streams

IOStreams.reader('a.csv') do |source_stream|
  IOStreams.writer('b.csv.enc') do |target_stream|
    IOStreams.copy(source_stream, target_stream)
  end
end

Example:

# Copy between 2 csv files, reducing the number of columns present and encrypting the
# target file with Symmetric Encryption
output_headers = %w[name address]
IOStreams.copy(
  'a.csv',
  'b.csv.enc',
  target_options: [csv:{headers: output_headers}, enc: {compress: true}]
)

Example:

# Copy a locally encrypted file to AWS S3.
# Decrypts the file, then compresses it with gzip as it is being streamed into S3.
# Useful for when the entire bucket is encrypted on S3.
IOStreams.copy('a.csv.enc', 's3://my_bucket/b.csv.gz')


281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/io_streams/io_streams.rb', line 281

def self.copy(source_file_name_or_io, target_file_name_or_io, buffer_size: 65536, source_options: {}, target_options: {})
  bytes = 0
  reader(source_file_name_or_io, **source_options) do |source_stream|
    writer(target_file_name_or_io, **target_options) do |target_stream|
      while data = source_stream.read(buffer_size)
        break if data.size == 0
        bytes += data.size
        target_stream.write(data)
      end
    end
  end
  bytes
end

.deregister_extension(extension) ⇒ Object

De-Register a file extension

Returns [Symbol] the extension removed, or nil if the extension was not registered

Example:

register_extension(:xls)

Raises:

  • (ArgumentError)


450
451
452
453
# File 'lib/io_streams/io_streams.rb', line 450

def self.deregister_extension(extension)
  raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.to_s =~ /\A\w+\Z/
  @extensions.delete(extension.to_sym)
end

.each_line(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning one line at a time. Embedded lines (within double quotes) will be skipped if

1. The file name contains .csv
2. Or the embedded_within argument is set

Example: Supply custom options

IOStreams.each_line(file_name, embedded_within: '"') do |line|
  puts line
end


82
83
84
85
86
# File 'lib/io_streams/io_streams.rb', line 82

def self.each_line(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  line_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |line_stream|
    line_stream.each(&block)
  end
end

.each_record(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Returns [Hash] of every record in a file or stream with support for headers.

Reading a delimited stream and converting to tabular form.

Each record / line is returned one at a time so that very large files can be read without having to load the entire file into memory.

Embedded lines (within double quotes) will be skipped if

1. The file name contains .csv
2. Or the embedded_within argument is set

Example: Supply custom options

IOStreams.each_record(file_name, embedded_within: '"') do |line|
  puts line
end

Example:

file_name = 'customer_data.csv.pgp'
IOStreams.each_record(file_name) do |hash|
  p hash
end


125
126
127
128
129
# File 'lib/io_streams/io_streams.rb', line 125

def self.each_record(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  record_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |record_stream|
    record_stream.each(&block)
  end
end

.each_row(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning one line at a time. Embedded lines (within double quotes) will be skipped if

1. The file name contains .csv
2. Or the embedded_within argument is set

Example: Supply custom options

IOStreams.each_row(file_name, embedded_within: '"') do |line|
  puts line
end


98
99
100
101
102
# File 'lib/io_streams/io_streams.rb', line 98

def self.each_row(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  row_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |row_stream|
    row_stream.each(&block)
  end
end

.encrypted?(file_name) ⇒ Boolean

Returns [true|false] whether the file is encrypted. Note: Currently only looks at the file name extension

Returns:

  • (Boolean)


313
314
315
# File 'lib/io_streams/io_streams.rb', line 313

def self.encrypted?(file_name)
  !(file_name =~ /\.(enc|pgp|gpg)\z/i).nil?
end

.line_reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, embedded_within: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning each record/line one at a time. It will apply the embedded_within argument if the file or input_stream contain .csv in its name.



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/io_streams/io_streams.rb', line 357

def self.line_reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, embedded_within: nil, **args, &block)

  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Line::Reader) || file_name_or_io.is_a?(Array)

  # TODO: needs to be improved
  if embedded_within.nil? && file_name_or_io.is_a?(String)
    embedded_within = '"' if file_name_or_io.include?('.csv')
  elsif embedded_within.nil? && file_name
    embedded_within = '"' if file_name.include?('.csv')
  end

  reader(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    IOStreams::Line::Reader.open(io, embedded_within: embedded_within, **args, &block)
  end
end

.line_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object



187
188
189
190
191
192
193
# File 'lib/io_streams/io_streams.rb', line 187

def self.line_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Line::Writer) || file_name_or_io.is_a?(Array)

  writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    IOStreams::Line::Writer.open(io, **args, &block)
  end
end

.reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object

Returns a Reader for reading a file / stream

Parameters

file_name_or_io [String|IO]
  The file_name of the file to write to, or an IO Stream that implements
  #read.

streams [Symbol|Array]
  The formats/streams that be used to convert the data whilst it is
  being read.
  When nil, the file_name will be inspected to try and determine what
  streams should be applied.
  Default: nil

file_name [String]
  When `streams` is not supplied, `file_name` can be used for determining the streams
  to apply to read the file/stream.
  This is particularly useful when `file_name_or_io` is a stream, or a temporary file name.
  Default: nil

Example: Zip

IOStreams.reader('myfile.zip') do |stream|
  puts stream.read
end

Example: Encrypted Zip

IOStreams.reader('myfile.zip.enc') do |stream|
  puts stream.read
end

Example: Explicitly set the streams

IOStreams.reader('myfile.zip.enc', [:zip, :enc]) do |stream|
  puts stream.read
end

Example: Supply custom options

# Encrypt the file and get Symmetric Encryption to also compress it
IOStreams.reader('myfile.csv.enc', streams: enc: {compress: true}) do |stream|
  puts stream.read
end

Note:

  • Passes the file_name_or_io as-is into the block if it is already a reader stream AND no streams are passed in.



68
69
70
# File 'lib/io_streams/io_streams.rb', line 68

def self.reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block)
  stream(:reader, file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, &block)
end

.reader_stream?(file_name_or_io) ⇒ Boolean

Returns [true|false] whether the supplied file_name_or_io is a reader stream

Returns:

  • (Boolean)


296
297
298
# File 'lib/io_streams/io_streams.rb', line 296

def self.reader_stream?(file_name_or_io)
  file_name_or_io.respond_to?(:read)
end

.record_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, embedded_within: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning each line as a hash, one at a time.



403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/io_streams/io_streams.rb', line 403

def self.record_reader(file_name_or_io,
  streams: nil,
  delimiter: nil,
  file_name: nil,
  encoding: nil,
  encode_cleaner: nil,
  encode_replace: nil,
  embedded_within: nil,
  **args,
  &block)

  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Record::Reader)

  line_reader(file_name_or_io,
              streams:         streams,
              delimiter:       delimiter,
              file_name:       file_name,
              encoding:        encoding,
              encode_cleaner:  encode_cleaner,
              encode_replace:  encode_replace,
              embedded_within: embedded_within
  ) do |io|


    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)
    IOStreams::Record::Reader.open(io, file_name: file_name, **args, &block)
  end
end

.record_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object



205
206
207
208
209
210
211
212
213
# File 'lib/io_streams/io_streams.rb', line 205

def self.record_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Record::Writer)

  line_writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)

    IOStreams::Record::Writer.open(io, file_name: file_name, **args, &block)
  end
end

.register_extension(extension, reader_class, writer_class) ⇒ Object

Register a file extension and the reader and writer streaming classes

Example:

# MyXls::Reader and MyXls::Writer must implement .open
register_extension(:xls, MyXls::Reader, MyXls::Writer)

Raises:

  • (ArgumentError)


439
440
441
442
# File 'lib/io_streams/io_streams.rb', line 439

def self.register_extension(extension, reader_class, writer_class)
  raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.nil? || extension.to_s =~ /\A\w+\Z/
  @extensions[extension.nil? ? nil : extension.to_sym] = Extension.new(reader_class, writer_class)
end

.register_scheme(scheme, reader_class, writer_class) ⇒ Object

Register a file extension and the reader and writer streaming classes

Example:

# MyXls::Reader and MyXls::Writer must implement .open
register_extension(:xls, MyXls::Reader, MyXls::Writer)

Raises:

  • (ArgumentError)


460
461
462
463
# File 'lib/io_streams/io_streams.rb', line 460

def self.register_scheme(scheme, reader_class, writer_class)
  raise(ArgumentError, "Invalid scheme #{scheme.inspect}") unless scheme.nil? || scheme.to_s =~ /\A\w+\Z/
  @schemes[scheme.nil? ? nil : scheme.to_sym] = Extension.new(reader_class, writer_class)
end

.row_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, embedded_within: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning each line as an array, one at a time.



374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/io_streams/io_streams.rb', line 374

def self.row_reader(file_name_or_io,
  streams: nil,
  delimiter: nil,
  file_name: nil,
  encoding: nil,
  encode_cleaner: nil,
  encode_replace: nil,
  embedded_within: nil,
  **args,
  &block)

  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Row::Reader)

  line_reader(
    file_name_or_io,
    streams:         streams,
    delimiter:       delimiter,
    file_name:       file_name,
    encoding:        encoding,
    encode_cleaner:  encode_cleaner,
    encode_replace:  encode_replace,
    embedded_within: embedded_within
  ) do |io|
    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)
    IOStreams::Row::Reader.open(io, file_name: file_name, **args, &block)
  end
end

.row_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object



195
196
197
198
199
200
201
202
203
# File 'lib/io_streams/io_streams.rb', line 195

def self.row_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Row::Writer)

  line_writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)

    IOStreams::Row::Writer.open(io, file_name: file_name, **args, &block)
  end
end

.scheme_for_file_name(file_name) ⇒ Object

Extract URI if any was supplied

Raises:

  • (ArgumentError)


346
347
348
349
350
351
352
353
# File 'lib/io_streams/io_streams.rb', line 346

def self.scheme_for_file_name(file_name)
  raise ArgumentError.new('File name cannot be nil') if file_name.nil?
  raise ArgumentError.new("File name must be a string: #{file_name.inspect}, class: #{file_name.class}") unless file_name.is_a?(String)

  if matches = file_name.match(/\A(\w+):\/\//)
    matches[1].downcase.to_sym
  end
end

.streams_for_file_name(file_name) ⇒ Object

Returns [Array] the formats required to process the file by looking at its extension(s)

Example Zip file:

IOStreams.streams_for_file_name('myfile.zip')
=> [ :zip ]

Example Encrypted Gzip file:

IOStreams.streams_for_file_name('myfile.csv.gz.enc')
=> [ :gz, :enc ]

Example plain text / binary file:

IOStreams.streams_for_file_name('myfile.csv')
=> []

Raises:

  • (ArgumentError)


331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/io_streams/io_streams.rb', line 331

def self.streams_for_file_name(file_name)
  raise ArgumentError.new('File name cannot be nil') if file_name.nil?
  raise ArgumentError.new("File name must be a string: #{file_name.inspect}, class: #{file_name.class}") unless file_name.is_a?(String)

  parts      = ::File.basename(file_name).split('.')
  extensions = []
  while extension = parts.pop
    sym = extension.downcase.to_sym
    break unless @extensions[sym]
    extensions.unshift(sym)
  end
  extensions
end

.writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object

Returns a Writer for writing to a file / stream

Parameters

file_name_or_io [String|IO]
  The file_name of the file to write to, or an IO Stream that implements
  #write.

streams [Symbol|Array]
  The formats/streams that be used to convert the data whilst it is
  being written.
  When nil, the file_name will be inspected to try and determine what
  streams should be applied.
  Default: nil

Stream types / extensions supported:

.zip       Zip File                                   [ :zip ]
.gz, .gzip GZip File                                  [ :gzip ]
.enc       File Encrypted using symmetric encryption  [ :enc ]
other      All other extensions will be returned as:  [ :file ]

When a file is encrypted, it may also be compressed:

.zip.enc  [ :zip, :enc ]
.gz.enc   [ :gz,  :enc ]

Example: Zip

IOStreams.writer('myfile.zip') do |stream|
  stream.write(data)
end

Example: Encrypted Zip

IOStreams.writer('myfile.zip.enc') do |stream|
  stream.write(data)
end

Example: Explicitly set the streams

IOStreams.writer('myfile.zip.enc', [:zip, :enc]) do |stream|
  stream.write(data)
end

Example: Supply custom options

IOStreams.writer('myfile.csv.enc', [enc: { compress: true }]) do |stream|
  stream.write(data)
end

Example: Set internal filename when creating a zip file

IOStreams.writer('myfile.csv.zip', zip: { zip_file_name: 'myfile.csv' }) do |stream|
  stream.write(data)
end

Note:

  • Passes the file_name_or_io as-is into the block if it is already a writer stream AND no streams are passed in.



183
184
185
# File 'lib/io_streams/io_streams.rb', line 183

def self.writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block)
  stream(:writer, file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, &block)
end

.writer_stream?(file_name_or_io) ⇒ Boolean

Returns [true|false] whether the supplied file_name_or_io is a reader stream

Returns:

  • (Boolean)


301
302
303
# File 'lib/io_streams/io_streams.rb', line 301

def self.writer_stream?(file_name_or_io)
  file_name_or_io.respond_to?(:write)
end