Module: IOStreams

Defined in:
lib/iostreams.rb,
lib/io_streams/s3.rb,
lib/io_streams/pgp.rb,
lib/io_streams/errors.rb,
lib/io_streams/streams.rb,
lib/io_streams/tabular.rb,
lib/io_streams/version.rb,
lib/io_streams/s3/reader.rb,
lib/io_streams/s3/writer.rb,
lib/io_streams/io_streams.rb,
lib/io_streams/pgp/reader.rb,
lib/io_streams/pgp/writer.rb,
lib/io_streams/row/reader.rb,
lib/io_streams/row/writer.rb,
lib/io_streams/zip/reader.rb,
lib/io_streams/zip/writer.rb,
lib/io_streams/file/reader.rb,
lib/io_streams/file/writer.rb,
lib/io_streams/gzip/reader.rb,
lib/io_streams/gzip/writer.rb,
lib/io_streams/line/reader.rb,
lib/io_streams/line/writer.rb,
lib/io_streams/sftp/reader.rb,
lib/io_streams/sftp/writer.rb,
lib/io_streams/xlsx/reader.rb,
lib/io_streams/bzip2/reader.rb,
lib/io_streams/bzip2/writer.rb,
lib/io_streams/encode/reader.rb,
lib/io_streams/encode/writer.rb,
lib/io_streams/record/reader.rb,
lib/io_streams/record/writer.rb,
lib/io_streams/tabular/header.rb,
lib/io_streams/tabular/parser/csv.rb,
lib/io_streams/tabular/parser/psv.rb,
lib/io_streams/tabular/parser/base.rb,
lib/io_streams/tabular/parser/hash.rb,
lib/io_streams/tabular/parser/json.rb,
lib/io_streams/tabular/parser/array.rb,
lib/io_streams/tabular/parser/fixed.rb,
lib/io_streams/tabular/utility/csv_row.rb

Overview

Streaming library for Ruby

Stream types / extensions supported:

.zip       Zip File                                   [ :zip ]
.gz, .gzip GZip File                                  [ :gzip ]
.enc       File Encrypted using symmetric encryption  [ :enc ]
etc...
other      All other extensions will be returned as:  []

When a file is encrypted, it may also be compressed:

.zip.enc  [ :zip, :enc ]
.gz.enc   [ :gz,  :enc ]

Defined Under Namespace

Modules: Bzip2, Encode, Errors, File, Gzip, Line, Pgp, Record, Row, S3, SFTP, Xlsx, Zip Classes: Extension, StreamStruct, Streams, Tabular

Constant Summary collapse

VERSION =
'0.16.2'
UTF8_ENCODING =
Encoding.find('UTF-8').freeze
BINARY_ENCODING =
Encoding.find('BINARY').freeze

Class Method Summary collapse

Class Method Details

.blank?(value) ⇒ Boolean

Helper method: Returns [true|false] if a value is blank?

Returns:

  • (Boolean)


405
406
407
408
409
410
411
412
413
# File 'lib/io_streams/io_streams.rb', line 405

def self.blank?(value)
  if value.nil?
    true
  elsif value.is_a?(String)
    value !~ /\S/
  else
    value.respond_to?(:empty?) ? value.empty? : !value
  end
end

.compressed?(file_name) ⇒ Boolean

Returns [true|false] whether the file is compressed. Note: Currently only looks at the file name extension

Returns:

  • (Boolean)


280
281
282
# File 'lib/io_streams/io_streams.rb', line 280

def self.compressed?(file_name)
  !(file_name =~ /\.(zip|gz|gzip|xls.|)\z/i).nil?
end

.copy(source_file_name_or_io, target_file_name_or_io, buffer_size: 65536, source_options: {}, target_options: {}) ⇒ Object

Copies the source file/stream to the target file/stream. Returns [Integer] the number of bytes copied

Example: Copy between 2 files

IOStreams.copy('a.csv', 'b.csv')

Example: Read content from a Xlsx file and write it out in CSV form.

IOStreams.copy('a.xlsx', 'b.csv')

Example:

# Read content from a JSON file and write it out in CSV form.
#
# The output header for the CSV file is extracted from the first row in the JSON file.
# If the first JSON row does not contain all the column names then they will be ignored
# for the rest of the file.
IOStreams.copy('a.json', 'b.csv')

Example:

# Read a PSV file and write out a CSV file from it.
IOStreams.copy('a.psv', 'b.csv')

Example:

# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name already includes `.enc` in the filename, it is automatically
# encrypted.
IOStreams.copy('a.csv', 'b.csv.enc')

Example:

# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name does not include `.enc` in the filename, to encrypt it
# the encryption stream is added.
IOStreams.copy('a.csv', 'b', target_options: [:enc])

Example:

# Copy between 2 files, encrypting the target file with Symmetric Encryption
# Since the target file_name does not include `.enc` in the filename, to encrypt it
# the encryption stream is added, along with the optional compression option.
IOStreams.copy('a.csv', 'b', target_options: [enc: { compress: true }])

Example:

# Create a pgp encrypted file.
# For PGP Encryption the recipients email address is required.
IOStreams.copy('a.xlsx', 'b.csv.pgp', target_options: [:csv, pgp: { recipient_email: '[email protected]' }])

Example: Copy between 2 existing streams

IOStreams.reader('a.csv') do |source_stream|
  IOStreams.writer('b.csv.enc') do |target_stream|
    IOStreams.copy(source_stream, target_stream)
  end
end

Example:

# Copy between 2 csv files, reducing the number of columns present and encrypting the
# target file with Symmetric Encryption
output_headers = %w[name address]
IOStreams.copy(
  'a.csv',
  'b.csv.enc',
  target_options: [csv:{headers: output_headers}, enc: {compress: true}]
)

Example:

# Copy a locally encrypted file to AWS S3.
# Decrypts the file, then compresses it with gzip as it is being streamed into S3.
# Useful for when the entire bucket is encrypted on S3.
IOStreams.copy('a.csv.enc', 's3://my_bucket/b.csv.gz')


254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/io_streams/io_streams.rb', line 254

def self.copy(source_file_name_or_io, target_file_name_or_io, buffer_size: 65536, source_options: {}, target_options: {})
  bytes = 0
  reader(source_file_name_or_io, **source_options) do |source_stream|
    writer(target_file_name_or_io, **target_options) do |target_stream|
      while data = source_stream.read(buffer_size)
        break if data.size == 0
        bytes += data.size
        target_stream.write(data)
      end
    end
  end
  bytes
end

.deregister_extension(extension) ⇒ Object

De-Register a file extension

Returns [Symbol] the extension removed, or nil if the extension was not registered

Example:

register_extension(:xls)

Raises:

  • (ArgumentError)


399
400
401
402
# File 'lib/io_streams/io_streams.rb', line 399

def self.deregister_extension(extension)
  raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.to_s =~ /\A\w+\Z/
  @extensions.delete(extension.to_sym)
end

.each_line(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning one line at a time.



73
74
75
76
77
# File 'lib/io_streams/io_streams.rb', line 73

def self.each_line(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  line_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |line_stream|
    line_stream.each(&block)
  end
end

.each_record(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Returns [Hash] of every record in a file or stream with support for headers.

Reading a delimited stream and converting to tabular form.

Each record / line is returned one at a time so that very large files can be read without having to load the entire file into memory.

Example:

file_name = 'customer_data.csv.pgp'
IOStreams.each_record(file_name) do |hash|
  p hash
end


98
99
100
101
102
# File 'lib/io_streams/io_streams.rb', line 98

def self.each_record(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  record_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |record_stream|
    record_stream.each(&block)
  end
end

.each_row(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning one line at a time.



80
81
82
83
84
# File 'lib/io_streams/io_streams.rb', line 80

def self.each_row(file_name_or_io, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  row_reader(file_name_or_io, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, **args) do |row_stream|
    row_stream.each(&block)
  end
end

.encrypted?(file_name) ⇒ Boolean

Returns [true|false] whether the file is encrypted. Note: Currently only looks at the file name extension

Returns:

  • (Boolean)


286
287
288
# File 'lib/io_streams/io_streams.rb', line 286

def self.encrypted?(file_name)
  !(file_name =~ /\.(enc|pgp|gpg)\z/i).nil?
end

.line_reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning each record/line one at a time.



318
319
320
321
322
323
324
# File 'lib/io_streams/io_streams.rb', line 318

def self.line_reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Line::Reader) || file_name_or_io.is_a?(Array)

  reader(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    IOStreams::Line::Reader.open(io, **args, &block)
  end
end

.line_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object



160
161
162
163
164
165
166
# File 'lib/io_streams/io_streams.rb', line 160

def self.line_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Line::Writer) || file_name_or_io.is_a?(Array)

  writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    IOStreams::Line::Writer.open(io, **args, &block)
  end
end

.reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object

Returns a Reader for reading a file / stream

Parameters

file_name_or_io [String|IO]
  The file_name of the file to write to, or an IO Stream that implements
  #read.

streams [Symbol|Array]
  The formats/streams that be used to convert the data whilst it is
  being read.
  When nil, the file_name will be inspected to try and determine what
  streams should be applied.
  Default: nil

file_name [String]
  When `streams` is not supplied, `file_name` can be used for determining the streams
  to apply to read the file/stream.
  This is particularly useful when `file_name_or_io` is a stream, or a temporary file name.
  Default: nil

Example: Zip

IOStreams.reader('myfile.zip') do |stream|
  puts stream.read
end

Example: Encrypted Zip

IOStreams.reader('myfile.zip.enc') do |stream|
  puts stream.read
end

Example: Explicitly set the streams

IOStreams.reader('myfile.zip.enc', [:zip, :enc]) do |stream|
  puts stream.read
end

Example: Supply custom options

# Encrypt the file and get Symmetric Encryption to also compress it
IOStreams.reader('myfile.csv.enc', streams: enc: {compress: true}) do |stream|
  puts stream.read
end

Note:

  • Passes the file_name_or_io as-is into the block if it is already a reader stream AND no streams are passed in.



68
69
70
# File 'lib/io_streams/io_streams.rb', line 68

def self.reader(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block)
  stream(:reader, file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, &block)
end

.reader_stream?(file_name_or_io) ⇒ Boolean

Returns [true|false] whether the supplied file_name_or_io is a reader stream

Returns:

  • (Boolean)


269
270
271
# File 'lib/io_streams/io_streams.rb', line 269

def self.reader_stream?(file_name_or_io)
  file_name_or_io.respond_to?(:read)
end

.record_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning each line as a hash, one at a time.



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/io_streams/io_streams.rb', line 354

def self.record_reader(file_name_or_io,
  streams: nil,
  delimiter: nil,
  file_name: nil,
  encoding: nil,
  encode_cleaner: nil,
  encode_replace: nil,
  **args,
  &block)

  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Record::Reader)

  line_reader(
    file_name_or_io,
    streams:        streams,
    delimiter:      delimiter,
    file_name:      file_name,
    encoding:       encoding,
    encode_cleaner: encode_cleaner,
    encode_replace: encode_replace
  ) do |io|

    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)
    IOStreams::Record::Reader.open(io, file_name: file_name, **args, &block)
  end
end

.record_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/io_streams/io_streams.rb', line 178

def self.record_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Record::Writer)

  line_writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)

    IOStreams::Record::Writer.open(io, file_name: file_name, **args, &block)
  end
end

.register_extension(extension, reader_class, writer_class) ⇒ Object

Register a file extension and the reader and writer streaming classes

Example:

# MyXls::Reader and MyXls::Writer must implement .open
register_extension(:xls, MyXls::Reader, MyXls::Writer)

Raises:

  • (ArgumentError)


388
389
390
391
# File 'lib/io_streams/io_streams.rb', line 388

def self.register_extension(extension, reader_class, writer_class)
  raise(ArgumentError, "Invalid extension #{extension.inspect}") unless extension.nil? || extension.to_s =~ /\A\w+\Z/
  @extensions[extension.nil? ? nil : extension.to_sym] = Extension.new(reader_class, writer_class)
end

.row_reader(file_name_or_io, streams: nil, delimiter: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object

Iterate over a file / stream returning each line as an array, one at a time.



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/io_streams/io_streams.rb', line 327

def self.row_reader(file_name_or_io,
  streams: nil,
  delimiter: nil,
  file_name: nil,
  encoding: nil,
  encode_cleaner: nil,
  encode_replace: nil,
  **args,
  &block)

  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Row::Reader)

  line_reader(
    file_name_or_io,
    streams:        streams,
    delimiter:      delimiter,
    file_name:      file_name,
    encoding:       encoding,
    encode_cleaner: encode_cleaner,
    encode_replace: encode_replace
  ) do |io|
    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)
    IOStreams::Row::Reader.open(io, file_name: file_name, **args, &block)
  end
end

.row_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block) ⇒ Object



168
169
170
171
172
173
174
175
176
# File 'lib/io_streams/io_streams.rb', line 168

def self.row_writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, **args, &block)
  return yield(file_name_or_io) if file_name_or_io.is_a?(IOStreams::Row::Writer)

  line_writer(file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) do |io|
    file_name = file_name_or_io if file_name.nil? && file_name_or_io.is_a?(String)

    IOStreams::Row::Writer.open(io, file_name: file_name, **args, &block)
  end
end

.streams_for_file_name(file_name) ⇒ Object

Returns [Array] the formats required to process the file by looking at its extension(s)

Example Zip file:

RocketJob::Formatter::Formats.streams_for_file_name('myfile.zip')
=> [ :zip ]

Example Encrypted Gzip file:

RocketJob::Formatter::Formats.streams_for_file_name('myfile.csv.gz.enc')
=> [ :gz, :enc ]

Example plain text / binary file:

RocketJob::Formatter::Formats.streams_for_file_name('myfile.csv')
=> [ :file ]

Raises:

  • (ArgumentError)


304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/io_streams/io_streams.rb', line 304

def self.streams_for_file_name(file_name)
  raise ArgumentError.new('File name cannot be nil') if file_name.nil?
  raise ArgumentError.new("File name must be a string: #{file_name.inspect}, class: #{file_name.class}") unless file_name.is_a?(String)
  parts      = file_name.split('.')
  extensions = []
  while extension = parts.pop
    sym = extension.downcase.to_sym
    break unless @extensions[sym]
    extensions.unshift(sym)
  end
  extensions
end

.writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block) ⇒ Object

Returns a Writer for writing to a file / stream

Parameters

file_name_or_io [String|IO]
  The file_name of the file to write to, or an IO Stream that implements
  #write.

streams [Symbol|Array]
  The formats/streams that be used to convert the data whilst it is
  being written.
  When nil, the file_name will be inspected to try and determine what
  streams should be applied.
  Default: nil

Stream types / extensions supported:

.zip       Zip File                                   [ :zip ]
.gz, .gzip GZip File                                  [ :gzip ]
.enc       File Encrypted using symmetric encryption  [ :enc ]
other      All other extensions will be returned as:  [ :file ]

When a file is encrypted, it may also be compressed:

.zip.enc  [ :zip, :enc ]
.gz.enc   [ :gz,  :enc ]

Example: Zip

IOStreams.writer('myfile.zip') do |stream|
  stream.write(data)
end

Example: Encrypted Zip

IOStreams.writer('myfile.zip.enc') do |stream|
  stream.write(data)
end

Example: Explicitly set the streams

IOStreams.writer('myfile.zip.enc', [:zip, :enc]) do |stream|
  stream.write(data)
end

Example: Supply custom options

IOStreams.writer('myfile.csv.enc', [enc: { compress: true }]) do |stream|
  stream.write(data)
end

Example: Set internal filename when creating a zip file

IOStreams.writer('myfile.csv.zip', zip: { zip_file_name: 'myfile.csv' }) do |stream|
  stream.write(data)
end

Note:

  • Passes the file_name_or_io as-is into the block if it is already a writer stream AND no streams are passed in.



156
157
158
# File 'lib/io_streams/io_streams.rb', line 156

def self.writer(file_name_or_io, streams: nil, file_name: nil, encoding: nil, encode_cleaner: nil, encode_replace: nil, &block)
  stream(:writer, file_name_or_io, streams: streams, file_name: file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace, &block)
end

.writer_stream?(file_name_or_io) ⇒ Boolean

Returns [true|false] whether the supplied file_name_or_io is a reader stream

Returns:

  • (Boolean)


274
275
276
# File 'lib/io_streams/io_streams.rb', line 274

def self.writer_stream?(file_name_or_io)
  file_name_or_io.respond_to?(:write)
end