Class: ZipTricks::Streamer

Inherits:
Object
  • Object
show all
Defined in:
lib/zip_tricks/streamer.rb

Overview

Is used to write streamed ZIP archives into the provided IO-ish object. The output IO is never going to be rewound or seeked, so the output of this object can be coupled directly to, say, a Rack output. The output can also be a String, Array or anything that responds to <<.

Allows for splicing raw files (for "stored" entries without compression) and splicing of deflated files (for "deflated" storage mode).

For stored entries, you need to know the CRC32 (as a uint) and the filesize upfront, before the writing of the entry body starts.

Any object that responds to << can be used as the Streamer target - you can use a String, an Array, a Socket or a File, at your leisure.

Using the Streamer with runtime compression

You can use the Streamer with data descriptors (the CRC32 and the sizes will be written after the file data). This allows non-rewinding on-the-fly compression. If you are compressing large files, the Deflater object that the Streamer controls will be regularly flushed to prevent memory inflation.

ZipTricks::Streamer.open(file_socket_or_string) do |zip|
  zip.write_stored_file('mov.mp4') do |sink|
    File.open('mov.mp4', 'rb'){|source| IO.copy_stream(source, sink) }
  end
  zip.write_deflated_file('long-novel.txt') do |sink|
    File.open('novel.txt', 'rb'){|source| IO.copy_stream(source, sink) }
  end
end

The central directory will be written automatically at the end of the block.

Using the Streamer with entries of known size and having a known CRC32 checksum

Streamer allows "IO splicing" - in this mode it will only control the metadata output, but you can write the data to the socket/file outside of the Streamer. For example, when using the sendfile gem:

ZipTricks::Streamer.open(socket) do | zip |
  zip.add_stored_entry(filename: "myfile1.bin", size: 9090821, crc32: 12485)
  socket.sendfile(tempfile1)
  zip.simulate_write(tempfile1.size)

  zip.add_stored_entry(filename: "myfile2.bin", size: 458678, crc32: 89568)
  socket.sendfile(tempfile2)
  zip.simulate_write(tempfile2.size)
end

Note that you need to use simulate_write in this case. This needs to happen since Streamer writes absolute offsets into the ZIP (local file header offsets and the like), and it relies on the output object to tell it how many bytes have been written so far. When using sendfile the Ruby write methods get bypassed entirely, and the offsets in the IO will not be updated - which will result in an invalid ZIP.

On-the-fly deflate -using the Streamer with async/suspended writes and data descriptors

If you are unable to use the block versions of write_deflated_file and write_stored_file there is an option to use a separate writer object. It gets returned from write_deflated_file and write_stored_file if you do not provide them with a block, and will accept data writes.

ZipTricks::Streamer.open(socket) do | zip |
  w = zip.write_stored_file('mov.mp4')
  w << data
  w.close
end

The central directory will be written automatically at the end of the open block. If you need to manage the Streamer manually, or defer the central directory write until appropriate, use the constructor instead and call Streamer#close:

zip = ZipTricks::Streamer.new(out_io)
.....
zip.close

Calling #close will not call #close on the underlying IO object.

Defined Under Namespace

Classes: Entry, Writable

Constant Summary collapse

EntryBodySizeMismatch =
Class.new(StandardError)
InvalidOutput =
Class.new(ArgumentError)
Overflow =
Class.new(StandardError)
UnknownMode =
Class.new(StandardError)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream, writer: create_writer) ⇒ Streamer

Creates a new Streamer on top of the given IO-ish object.

Parameters:

  • stream (IO)

    the destination IO for the ZIP. Anything that responds to << can be used.

  • writer (ZipTricks::ZipWriter) (defaults to: create_writer)

    the object to be used as the writer. Defaults to an instance of ZipTricks::ZipWriter, normally you won't need to override it

Raises:



143
144
145
146
147
148
149
150
151
# File 'lib/zip_tricks/streamer.rb', line 143

def initialize(stream, writer: create_writer)
  raise InvalidOutput, 'The stream must respond to #<<' unless stream.respond_to?(:<<)

  @out = ZipTricks::WriteAndTell.new(stream)
  @files = []
  @local_header_offsets = []
  @filenames_set = Set.new
  @writer = writer
end

Class Method Details

.open(stream, **kwargs_for_new) {|Streamer| ... } ⇒ Object

Creates a new Streamer on top of the given IO-ish object and yields it. Once the given block returns, the Streamer will have it's close method called, which will write out the central directory of the archive to the output.

Parameters:

  • stream (IO)

    the destination IO for the ZIP (should respond to tell and <<)

  • kwargs_for_new (Hash)

    keyword arguments for Streamer.new

Yields:

  • (Streamer)

    the streamer that can be written to



104
105
106
107
108
# File 'lib/zip_tricks/streamer.rb', line 104

def self.open(stream, **kwargs_for_new)
  archive = new(stream, **kwargs_for_new)
  yield(archive)
  archive.close
end

.output_enum(**kwargs_for_new, &zip_streamer_block) ⇒ Enumerator

Creates a new Streamer that writes to a buffer. The buffer can be read from using each, and the creation of the ZIP is in lockstep with the caller calling each on the returned output enumerator object. This can be used when the calling program wants to stream the output of the ZIP archive and throttle that output, or split it into chunks, or use it as a generator.

For example:

# The block given to {output_enum} won't be executed immediately - rather it
# will only start to execute when the caller starts to read from the output
# by calling `each`
body = ZipTricks::Streamer.output_enum(writer: CustomWriter) do |zip|
  streamer.add_stored_entry(filename: 'large.tif', size: 1289894, crc32: 198210)
  streamer << large_file.read(1024*1024) until large_file.eof?
  ...
end

body.each do |bin_string|
  # Send the output somewhere, buffer it in a file etc.
  ...
end

Parameters:

  • kwargs_for_new (Hash)

    keyword arguments for Streamer.new

Returns:

  • (Enumerator)

    the enumerator you can read bytestrings of the ZIP from using each



134
135
136
# File 'lib/zip_tricks/streamer.rb', line 134

def self.output_enum(**kwargs_for_new, &zip_streamer_block)
  ZipTricks::OutputEnumerator.new(**kwargs_for_new, &zip_streamer_block)
end

Instance Method Details

#<<(binary_data) ⇒ Object

Writes a part of a zip entry body (actual binary data of the entry) into the output stream.

Parameters:

  • binary_data (String)

    a String in binary encoding

Returns:

  • self



157
158
159
160
# File 'lib/zip_tricks/streamer.rb', line 157

def <<(binary_data)
  @out << binary_data
  self
end

#add_deflated_entry(filename:, modification_time: Time.now.utc, compressed_size: 0, uncompressed_size: 0, crc32: 0, use_data_descriptor: false) ⇒ Integer Also known as: add_compressed_entry

Writes out the local header for an entry (file in the ZIP) that is using the deflated storage model (is compressed). Once this method is called, the << method has to be called to write the actual contents of the body.

Note that the deflated body that is going to be written into the output has to be precompressed (pre-deflated) before writing it into the Streamer, because otherwise it is impossible to know it's size upfront.

Parameters:

  • filename (String)

    the name of the file in the entry

  • modification_time (Time) (defaults to: Time.now.utc)

    the modification time of the file in the archive

  • compressed_size (Integer) (defaults to: 0)

    the size of the compressed entry that is going to be written into the archive

  • uncompressed_size (Integer) (defaults to: 0)

    the size of the entry when uncompressed, in bytes

  • crc32 (Integer) (defaults to: 0)

    the CRC32 checksum of the entry when uncompressed

  • use_data_descriptor (Boolean) (defaults to: false)

    whether the entry body will be followed by a data descriptor

Returns:

  • (Integer)

    the offset the output IO is at after writing the entry header



201
202
203
204
205
206
207
208
209
210
# File 'lib/zip_tricks/streamer.rb', line 201

def add_deflated_entry(filename:, modification_time: Time.now.utc, compressed_size: 0, uncompressed_size: 0, crc32: 0, use_data_descriptor: false)
  add_file_and_write_local_header(filename: filename,
                                  modification_time: modification_time,
                                  crc32: crc32,
                                  storage_mode: DEFLATED,
                                  compressed_size: compressed_size,
                                  uncompressed_size: uncompressed_size,
                                  use_data_descriptor: use_data_descriptor)
  @out.tell
end

#add_empty_directory(dirname:, modification_time: Time.now.utc) ⇒ Integer

Adds an empty directory to the archive with a size of 0 and permissions of 755.

Parameters:

  • dirname (String)

    the name of the directory in the archive

  • modification_time (Time) (defaults to: Time.now.utc)

    the modification time of the directory in the archive

Returns:

  • (Integer)

    the offset the output IO is at after writing the entry header



242
243
244
245
246
247
248
249
250
251
# File 'lib/zip_tricks/streamer.rb', line 242

def add_empty_directory(dirname:, modification_time: Time.now.utc)
  add_file_and_write_local_header(filename: dirname.to_s + '/',
                                  modification_time: modification_time,
                                  crc32: 0,
                                  storage_mode: STORED,
                                  compressed_size: 0,
                                  uncompressed_size: 0,
                                  use_data_descriptor: false)
  @out.tell
end

#add_stored_entry(filename:, modification_time: Time.now.utc, size: 0, crc32: 0, use_data_descriptor: false) ⇒ Integer

Writes out the local header for an entry (file in the ZIP) that is using the stored storage model (is stored as-is). Once this method is called, the << method has to be called one or more times to write the actual contents of the body.

Parameters:

  • filename (String)

    the name of the file in the entry

  • modification_time (Time) (defaults to: Time.now.utc)

    the modification time of the file in the archive

  • size (Integer) (defaults to: 0)

    the size of the file when uncompressed, in bytes

  • crc32 (Integer) (defaults to: 0)

    the CRC32 checksum of the entry when uncompressed

  • use_data_descriptor (Boolean) (defaults to: false)

    whether the entry body will be followed by a data descriptor. When in use

Returns:

  • (Integer)

    the offset the output IO is at after writing the entry header



226
227
228
229
230
231
232
233
234
235
# File 'lib/zip_tricks/streamer.rb', line 226

def add_stored_entry(filename:, modification_time: Time.now.utc,  size: 0, crc32: 0, use_data_descriptor: false)
  add_file_and_write_local_header(filename: filename,
                                  modification_time: modification_time,
                                  crc32: crc32,
                                  storage_mode: STORED,
                                  compressed_size: size,
                                  uncompressed_size: size,
                                  use_data_descriptor: use_data_descriptor)
  @out.tell
end

#closeInteger

Closes the archive. Writes the central directory, and switches the writer into a state where it can no longer be written to.

Once this method is called, the Streamer should be discarded (the ZIP archive is complete).

Returns:

  • (Integer)

    the offset the output IO is at after closing the archive



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/zip_tricks/streamer.rb', line 361

def close
  # Record the central directory offset, so that it can be written into the EOCD record
  cdir_starts_at = @out.tell

  # Write out the central directory entries, one for each file
  @files.each_with_index do |entry, i|
    header_loc = @local_header_offsets.fetch(i)
    @writer.write_central_directory_file_header(io: @out,
                                                local_file_header_location: header_loc,
                                                gp_flags: entry.gp_flags,
                                                storage_mode: entry.storage_mode,
                                                compressed_size: entry.compressed_size,
                                                uncompressed_size: entry.uncompressed_size,
                                                mtime: entry.mtime,
                                                crc32: entry.crc32,
                                                filename: entry.filename)
  end

  # Record the central directory size, for the EOCDR
  cdir_size = @out.tell - cdir_starts_at

  # Write out the EOCDR
  @writer.write_end_of_central_directory(io: @out,
                                         start_of_central_directory_location: cdir_starts_at,
                                         central_directory_size: cdir_size,
                                         num_files_in_archive: @files.length)

  # Clear the files so that GC will not have to trace all the way to here to deallocate them
  @files.clear
  @filenames_set.clear

  # and return the final offset
  @out.tell
end

#create_writerZipTricks::ZipWriter

Sets up the ZipWriter with wrappers if necessary. The method is called once, when the Streamer gets instantiated - the Writer then gets reused. This method is primarily there so that you can override it.

Returns:



401
402
403
# File 'lib/zip_tricks/streamer.rb', line 401

def create_writer
  ZipTricks::ZipWriter.new
end

#simulate_write(num_bytes) ⇒ Integer

Advances the internal IO pointer to keep the offsets of the ZIP file in check. Use this if you are going to use accelerated writes to the socket (like the sendfile() call) after writing the headers, or if you just need to figure out the size of the archive.

Parameters:

  • num_bytes (Integer)

    how many bytes are going to be written bypassing the Streamer

Returns:

  • (Integer)

    position in the output stream / ZIP archive



180
181
182
183
# File 'lib/zip_tricks/streamer.rb', line 180

def simulate_write(num_bytes)
  @out.advance_position_by(num_bytes)
  @out.tell
end

#update_last_entry_and_write_data_descriptor(crc32:, compressed_size:, uncompressed_size:) ⇒ Integer

Updates the last entry written with the CRC32 checksum and compressed/uncompressed sizes. For stored entries, compressed_size and uncompressed_size are the same. After updating the entry will immediately write the data descriptor bytes to the output.

Parameters:

  • crc32 (Integer)

    the CRC32 checksum of the entry when uncompressed

  • compressed_size (Integer)

    the size of the compressed segment within the ZIP

  • uncompressed_size (Integer)

    the size of the entry once uncompressed

Returns:

  • (Integer)

    the offset the output IO is at after writing the data descriptor



414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/zip_tricks/streamer.rb', line 414

def update_last_entry_and_write_data_descriptor(crc32:, compressed_size:, uncompressed_size:)
  # Save the information into the entry for when the time comes to write
  # out the central directory
  last_entry = @files.fetch(-1)
  last_entry.crc32 = crc32
  last_entry.compressed_size = compressed_size
  last_entry.uncompressed_size = uncompressed_size

  @writer.write_data_descriptor(io: @out,
                                crc32: last_entry.crc32,
                                compressed_size: last_entry.compressed_size,
                                uncompressed_size: last_entry.uncompressed_size)
  @out.tell
end

#write(binary_data) ⇒ Integer

Writes a part of a zip entry body (actual binary data of the entry) into the output stream, and returns the number of bytes written. Is implemented to make Streamer usable with IO.copy_stream(from, to).

Parameters:

  • binary_data (String)

    a String in binary encoding

Returns:

  • (Integer)

    the number of bytes written



168
169
170
171
# File 'lib/zip_tricks/streamer.rb', line 168

def write(binary_data)
  @out << binary_data
  binary_data.bytesize
end

#write_deflated_file(filename, modification_time: Time.now.utc) {|#<<, #write| ... } ⇒ Object

Opens the stream for a deflated file in the archive, and yields a writer for that file to the block. Once the write completes, a data descriptor will be written with the actual compressed/uncompressed sizes and the CRC32 checksum.

Using a block, the write will be terminated with a data descriptor outright.

zip.write_stored_file("foo.txt") do |sink|
  IO.copy_stream(source_file, sink)
end

If deferred writes are desired (for example - to integerate with an API that does not support blocks, or to work with non-blocking environments) the method has to be called without a block. In that case it returns the sink instead, permitting to write to it in a deferred fashion. When close is called on the sink, any remanining compression output will be flushed and the data descriptor is going to be written.

Note that even though it does not have to happen within the same call stack, call sequencing still must be observed. It is therefore not possible to do this:

writer_for_file1 = zip.write_deflated_file("somefile.jpg")
writer_for_file2 = zip.write_deflated_file("another.tif")
writer_for_file1 << data
writer_for_file2 << data
writer_for_file1.close
writer_for_file2.close

because it is likely to result in an invalid ZIP file structure later on. So using this facility in async scenarios is certainly possible, but care and attention is recommended.

Parameters:

  • filename (String)

    the name of the file in the archive

  • modification_time (Time) (defaults to: Time.now.utc)

    the modification time of the file in the archive

Yields:

  • (#<<, #write)

    an object that the file contents must be written to



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'lib/zip_tricks/streamer.rb', line 339

def write_deflated_file(filename, modification_time: Time.now.utc)
  add_deflated_entry(filename: filename,
                     modification_time: modification_time,
                     use_data_descriptor: true,
                     crc32: 0,
                     compressed_size: 0,
                     uncompressed_size: 0)

  writable = Writable.new(self, DeflatedWriter.new(@out))
  if block_given?
    yield(writable)
    writable.close
  end
  writable
end

#write_stored_file(filename, modification_time: Time.now.utc) {|#<<, #write| ... } ⇒ #<<, ...

Opens the stream for a stored file in the archive, and yields a writer for that file to the block. Once the write completes, a data descriptor will be written with the actual compressed/uncompressed sizes and the CRC32 checksum.

Using a block, the write will be terminated with a data descriptor outright.

zip.write_stored_file("foo.txt") do |sink|
  IO.copy_stream(source_file, sink)
end

If deferred writes are desired (for example - to integerate with an API that does not support blocks, or to work with non-blocking environments) the method has to be called without a block. In that case it returns the sink instead, permitting to write to it in a deferred fashion. When close is called on the sink, any remanining compression output will be flushed and the data descriptor is going to be written.

Note that even though it does not have to happen within the same call stack, call sequencing still must be observed. It is therefore not possible to do this:

writer_for_file1 = zip.write_stored_file("somefile.jpg")
writer_for_file2 = zip.write_stored_file("another.tif")
writer_for_file1 << data
writer_for_file2 << data

because it is likely to result in an invalid ZIP file structure later on. So using this facility in async scenarios is certainly possible, but care and attention is recommended.

Parameters:

  • filename (String)

    the name of the file in the archive

  • modification_time (Time) (defaults to: Time.now.utc)

    the modification time of the file in the archive

Yields:

  • (#<<, #write)

    an object that the file contents must be written to that will be automatically closed

Returns:

  • (#<<, #write, #close)

    an object that the file contents must be written to, has to be closed manually



288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/zip_tricks/streamer.rb', line 288

def write_stored_file(filename, modification_time: Time.now.utc)
  add_stored_entry(filename: filename,
                   modification_time: modification_time,
                   use_data_descriptor: true,
                   crc32: 0,
                   size: 0)

  writable = Writable.new(self, StoredWriter.new(@out))
  if block_given?
    yield(writable)
    writable.close
  end
  writable
end