Class: Avro::DataFile::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/data_file.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(writer, datum_writer, writers_schema = nil, codec = nil, meta = {}) ⇒ Writer

Returns a new instance of Writer.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/avro/data_file.rb', line 96

def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={})
  # If writers_schema is not present, presume we're appending
  @writer = writer
  @encoder = IO::BinaryEncoder.new(@writer)
  @datum_writer = datum_writer
  @meta = meta
  @buffer_writer = StringIO.new('', 'w')
  @buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding)
  @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
  @block_count = 0

  if writers_schema
    @sync_marker = Writer.generate_sync_marker
    @codec = DataFile.get_codec(codec)
    @meta['avro.codec'] = @codec.codec_name.to_s
    @meta['avro.schema'] = writers_schema.to_s
    datum_writer.writers_schema = writers_schema
    write_header
  else
    # open writer for reading to collect metadata
    dfr = Reader.new(writer, Avro::IO::DatumReader.new)

    # FIXME(jmhodges): collect arbitrary metadata
    # collect metadata
    @sync_marker = dfr.sync_marker
    @meta['avro.codec'] = dfr.meta['avro.codec']
    @codec = DataFile.get_codec(meta['avro.codec'])

    # get schema used to write existing file
    schema_from_file = dfr.meta['avro.schema']
    @meta['avro.schema'] = schema_from_file
    datum_writer.writers_schema = Schema.parse(schema_from_file)

    # seek to the end of the file and prepare for writing
    writer.seek(0,2)
  end
end

Instance Attribute Details

#block_countObject

Returns the value of attribute block_count.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def block_count
  @block_count
end

#buffer_encoderObject (readonly)

Returns the value of attribute buffer_encoder.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def buffer_encoder
  @buffer_encoder
end

#buffer_writerObject (readonly)

Returns the value of attribute buffer_writer.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def buffer_writer
  @buffer_writer
end

#codecObject (readonly)

Returns the value of attribute codec.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def codec
  @codec
end

#datum_writerObject (readonly)

Returns the value of attribute datum_writer.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def datum_writer
  @datum_writer
end

#encoderObject (readonly)

Returns the value of attribute encoder.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def encoder
  @encoder
end

#metaObject (readonly)

Returns the value of attribute meta.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def meta
  @meta
end

#sync_markerObject (readonly)

Returns the value of attribute sync_marker.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def sync_marker
  @sync_marker
end

#writerObject (readonly)

Returns the value of attribute writer.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def writer
  @writer
end

Class Method Details

.generate_sync_markerObject



89
90
91
# File 'lib/avro/data_file.rb', line 89

def self.generate_sync_marker
  OpenSSL::Random.random_bytes(16)
end

Instance Method Details

#<<(datum) ⇒ Object

Append a datum to the file



135
136
137
138
139
140
141
142
143
144
# File 'lib/avro/data_file.rb', line 135

def <<(datum)
  datum_writer.write(datum, buffer_encoder)
  self.block_count += 1

  # if the data to write is larger than the sync interval, write
  # the block
  if buffer_writer.tell >= SYNC_INTERVAL
    write_block
  end
end

#closeObject



160
161
162
163
# File 'lib/avro/data_file.rb', line 160

def close
  flush
  writer.close
end

#flushObject

Flush the current state of the file, including metadata



155
156
157
158
# File 'lib/avro/data_file.rb', line 155

def flush
  write_block
  writer.flush
end

#syncObject

Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.



149
150
151
152
# File 'lib/avro/data_file.rb', line 149

def sync
  write_block
  writer.tell
end