Class: Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload

Inherits:
Upload
  • Object
show all
Defined in:
lib/etna/clients/metis/workflows/metis_upload_workflow.rb

Constant Summary

Constants inherited from Upload

Upload::INITIAL_BLOB_SIZE, Upload::MAX_BLOB_SIZE, Upload::ZERO_HASH

Instance Attribute Summary

Attributes inherited from Upload

#current_byte_position, #next_blob_size, #source_file

Instance Method Summary collapse

Methods inherited from Upload

#advance_position!, #complete?, #next_blob_hash, #resume_from!

Constructor Details

#initialize(readable_io:, size_hint: 0, **args) ⇒ StreamingIOUpload

Returns a new instance of StreamingIOUpload.



156
157
158
159
160
161
162
# File 'lib/etna/clients/metis/workflows/metis_upload_workflow.rb', line 156

def initialize(readable_io:, size_hint: 0, **args)
  @readable_io = readable_io
  @size_hint = size_hint
  @read_position = 0
  @last_bytes = ""
  super(**args)
end

Instance Method Details

#file_sizeObject



164
165
166
# File 'lib/etna/clients/metis/workflows/metis_upload_workflow.rb', line 164

def file_size
  @size_hint
end

#next_blob_bytesObject



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/etna/clients/metis/workflows/metis_upload_workflow.rb', line 168

def next_blob_bytes
  next_left = current_byte_position
  next_right = current_byte_position + next_blob_size

  if next_right < @read_position
    raise StreamingUploadError.new("Upload needs restart, but source is streaming and ephemeral. #{next_right} #{@read_position} You need to restart the source stream and create a new upload.")
  elsif @read_position < next_left
    # read from the stream and discard until we are positioned for the next read.
    data = @readable_io.read(next_left - @read_position)
    raise StreamingUploadError.new("Unexpected EOF in read stream") if data.nil?

    @read_position += data.bytes.length
  end

  # If we have consumed all requested data, return what we have consumed.
  # If we have requested no data, make sure to provide "" as the result.
  if next_right == @read_position
    return @last_bytes
  end

  if @read_position != next_left
    raise StreamingUploadError.new("Alignment error, source data does not match expected upload resume. #{@read_position} #{next_left} Restart the upload to address.")
  end

  @last_bytes = "".tap do |bytes|
    while @read_position < next_right
      bytes << @readable_io.read(next_right - @read_position).tap do |data|
        raise StreamingUploadError.new("Unexpected EOF in read stream") if data.nil?
        @read_position += data.bytes.length
      end
    end
  end
end