Class: Avro::DataFile::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/data_file.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(writer, datum_writer, writers_schema = nil, codec = nil) ⇒ Writer

Returns a new instance of Writer.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/avro/data_file.rb', line 95

def initialize(writer, datum_writer, writers_schema=nil, codec=nil)
  # If writers_schema is not present, presume we're appending
  @writer = writer
  @encoder = IO::BinaryEncoder.new(@writer)
  @datum_writer = datum_writer
  @buffer_writer = StringIO.new('', 'w')
  @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
  @block_count = 0

  @meta = {}

  if writers_schema
    @sync_marker = Writer.generate_sync_marker
    @codec = DataFile.get_codec(codec)
    meta['avro.codec'] = @codec.codec_name.to_s
    meta['avro.schema'] = writers_schema.to_s
    datum_writer.writers_schema = writers_schema
    write_header
  else
    # open writer for reading to collect metadata
    dfr = Reader.new(writer, Avro::IO::DatumReader.new)

    # FIXME(jmhodges): collect arbitrary metadata
    # collect metadata
    @sync_marker = dfr.sync_marker
    meta['avro.codec'] = dfr.meta['avro.codec']
    @codec = DataFile.get_codec(meta['avro.codec'])

    # get schema used to write existing file
    schema_from_file = dfr.meta['avro.schema']
    meta['avro.schema'] = schema_from_file
    datum_writer.writers_schema = Schema.parse(schema_from_file)

    # seek to the end of the file and prepare for writing
    writer.seek(0,2)
  end
end

Instance Attribute Details

#block_countObject

Returns the value of attribute block_count.



93
94
95
# File 'lib/avro/data_file.rb', line 93

def block_count
  @block_count
end

#buffer_encoderObject (readonly)

Returns the value of attribute buffer_encoder.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def buffer_encoder
  @buffer_encoder
end

#buffer_writerObject (readonly)

Returns the value of attribute buffer_writer.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def buffer_writer
  @buffer_writer
end

#codecObject (readonly)

Returns the value of attribute codec.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def codec
  @codec
end

#datum_writerObject (readonly)

Returns the value of attribute datum_writer.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def datum_writer
  @datum_writer
end

#encoderObject (readonly)

Returns the value of attribute encoder.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def encoder
  @encoder
end

#metaObject (readonly)

Returns the value of attribute meta.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def meta
  @meta
end

#sync_markerObject (readonly)

Returns the value of attribute sync_marker.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def sync_marker
  @sync_marker
end

#writerObject (readonly)

Returns the value of attribute writer.



92
93
94
# File 'lib/avro/data_file.rb', line 92

def writer
  @writer
end

Class Method Details

.generate_sync_markerObject



88
89
90
# File 'lib/avro/data_file.rb', line 88

def self.generate_sync_marker
  OpenSSL::Random.random_bytes(16)
end

Instance Method Details

#<<(datum) ⇒ Object

Append a datum to the file



134
135
136
137
138
139
140
141
142
143
# File 'lib/avro/data_file.rb', line 134

def <<(datum)
  datum_writer.write(datum, buffer_encoder)
  self.block_count += 1

  # if the data to write is larger than the sync interval, write
  # the block
  if buffer_writer.tell >= SYNC_INTERVAL
    write_block
  end
end

#closeObject



159
160
161
162
# File 'lib/avro/data_file.rb', line 159

def close
  flush
  writer.close
end

#flushObject

Flush the current state of the file, including metadata



154
155
156
157
# File 'lib/avro/data_file.rb', line 154

def flush
  write_block
  writer.flush
end

#syncObject

Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.



148
149
150
151
# File 'lib/avro/data_file.rb', line 148

def sync
  write_block
  writer.tell
end