Class: PEROBS::FlatFile

Inherits:
Object
  • Object
show all
Defined in:
lib/perobs/FlatFile.rb

Overview

The FlatFile class manages the storage file of the FlatFileDB. It contains a sequence of blobs Each blob consists of a 25 byte header and the actual blob data bytes. The header has the following structure:

1 Byte: Mark byte.

Bit 0: 0 deleted entry, 1 valid entry
Bit 1: 0 unmarked, 1 marked
Bit 2 - 7: reserved, must be 0

8 bytes: Length of the data blob in bytes 8 bytes: ID of the value in the data blob 4 bytes: CRC32 checksum of the data blob

If the bit 0 of the mark byte is 0, only the length is valid. The blob is empty. Only of bit 0 is set then entry is valid.

Defined Under Namespace

Classes: Header

Constant Summary collapse

BLOB_HEADER_FORMAT =

The ‘pack()’ format of the header.

'CQQL'
BLOB_HEADER_LENGTH =

The length of the header in bytes.

21

Instance Method Summary collapse

Constructor Details

#initialize(dir) ⇒ FlatFile

Create a new FlatFile object for a database in the given path.

Parameters:

  • dir (String)

    Directory path for the data base file



63
64
65
66
67
68
# File 'lib/perobs/FlatFile.rb', line 63

def initialize(dir)
  @db_dir = dir
  @f = nil
  @index = IndexTree.new(dir)
  @space_list = FreeSpaceManager.new(dir)
end

Instance Method Details

#check(repair = false) ⇒ Object



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/perobs/FlatFile.rb', line 339

def check(repair = false)
  return unless @f

  # First check the database blob file. Each entry should be readable and
  # correct.
  each_blob_header do |pos, mark, length, blob_id, crc|
    if (mark & 1 == 1)
      # We have a non-deleted entry.
      begin
        @f.seek(pos + BLOB_HEADER_LENGTH)
        buf = @f.read(length)
        if crc && checksum(buf) != crc
          if repair
            PEROBS.log.error "Checksum failure while checking blob " +
              "with ID #{id}. Deleting object."
            delete_obj_by_address(pos, blob_id)
          else
            PEROBS.log.fatal "Checksum failure while checking blob " +
              "with ID #{id}"
          end
        end
      rescue => e
        PEROBS.log.fatal "Check of blob with ID #{blob_id} failed: " +
          e.message
      end
    end
  end

  # Now we check the index data. It must be correct and the entries must
  # match the blob file. All entries in the index must be in the blob file
  # and vise versa.
  begin
    unless @index.check(self) && @space_list.check(self) &&
      cross_check_entries
      return unless repair

      regenerate_index_and_spaces
    end
  rescue PEROBS::FatalError
    regenerate_index_and_spaces
  end

  sync
end

#clear_all_marksObject

Clear alls marks.



276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/perobs/FlatFile.rb', line 276

def clear_all_marks
  each_blob_header do |pos, mark, length, blob_id, crc|
    if (mark & 1 == 1)
      begin
        @f.seek(pos)
        @f.write([ mark & 0b11111101 ].pack('C'))
        @f.flush
      rescue => e
        PEROBS.log.fatal "Unmarking of FlatFile blob with ID #{blob_id} " +
          "failed: #{e.message}"
      end
    end
  end
end

#closeObject

Close the flat file. This method must be called to ensure that all data is really written into the filesystem.



90
91
92
93
94
95
96
# File 'lib/perobs/FlatFile.rb', line 90

def close
  @space_list.close
  @index.close
  @f.flush
  @f.close
  @f = nil
end

#defragmentizeObject

Eliminate all the holes in the file. This is an in-place implementation. No additional space will be needed on the file system.



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/perobs/FlatFile.rb', line 293

def defragmentize
  distance = 0
  t = Time.now
  PEROBS.log.debug "Defragmenting FlatFile"
  # Iterate over all entries.
  each_blob_header do |pos, mark, length, blob_id, crc|
    # Total size of the current entry
    entry_bytes = BLOB_HEADER_LENGTH + length
    if (mark & 1 == 1)
      # We have found a valid entry.
      if distance > 0
        begin
          # Read current entry into a buffer
          @f.seek(pos)
          buf = @f.read(entry_bytes)
          # Write the buffer right after the end of the previous entry.
          @f.seek(pos - distance)
          @f.write(buf)
          # Update the index with the new position
          @index.put_value(blob_id, pos - distance)
          # Mark the space between the relocated current entry and the
          # next valid entry as deleted space.
          @f.write([ 0, distance - BLOB_HEADER_LENGTH, 0, 0 ].
                   pack(BLOB_HEADER_FORMAT))
          @f.flush
        rescue => e
          PEROBS.log.fatal "Error while moving blob for ID #{blob_id}: " +
            e.message
        end
      end
    else
      distance += entry_bytes
    end
  end
  PEROBS.log.debug "FlatFile defragmented in #{Time.now - t} seconds"
  PEROBS.log.debug "#{distance} bytes or " +
    "#{'%.1f' % (distance.to_f / @f.size * 100.0)}% reclaimed"

  @f.flush
  @f.truncate(@f.size - distance)
  @f.flush
  @space_list.clear

  sync
end

#delete_obj_by_address(addr, id) ⇒ Object

Delete the blob that is stored at the specified address.

Parameters:

  • addr (Integer)

    Address of the blob to delete

  • id (Integer)

    ID of the blob to delete



122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/perobs/FlatFile.rb', line 122

def delete_obj_by_address(addr, id)
  @index.delete_value(id)
  header = read_blob_header(addr, id)
  begin
    @f.seek(addr)
    @f.write([ 0 ].pack('C'))
    @f.flush
    @space_list.add_space(addr, header.length)
  rescue => e
    PEROBS.log.fatal "Cannot erase blob for ID #{header.id}: #{e.message}"
  end
end

#delete_obj_by_id(id) ⇒ Boolean

Delete the blob for the specified ID.

Parameters:

  • id (Integer)

    ID of the object to be deleted

Returns:

  • (Boolean)

    True if object was deleted, false otherwise



110
111
112
113
114
115
116
117
# File 'lib/perobs/FlatFile.rb', line 110

def delete_obj_by_id(id)
  if (pos = find_obj_addr_by_id(id))
    delete_obj_by_address(pos, id)
    return true
  end

  return false
end

#delete_unmarked_objectsObject

Delete all unmarked objects.



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/perobs/FlatFile.rb', line 136

def delete_unmarked_objects
  deleted_ids = []
  each_blob_header do |pos, mark, length, blob_id, crc|
    if (mark & 3 == 1)
      delete_obj_by_address(pos, blob_id)
      deleted_ids << blob_id
    end
  end
  defragmentize

  deleted_ids
end

#find_obj_addr_by_id(id) ⇒ Integer

Find the address of the object with the given ID.

Parameters:

  • id (Integer)

    ID of the object

Returns:

  • (Integer)

    Offset in the flat file or nil if not found



204
205
206
# File 'lib/perobs/FlatFile.rb', line 204

def find_obj_addr_by_id(id)
  @index.get_value(id)
end

#has_id_at?(id, address) ⇒ Boolean

Returns:

  • (Boolean)


405
406
407
408
# File 'lib/perobs/FlatFile.rb', line 405

def has_id_at?(id, address)
  header = read_blob_header(address)
  header.id == id
end

#has_space?(address, size) ⇒ Boolean

Returns:

  • (Boolean)


400
401
402
403
# File 'lib/perobs/FlatFile.rb', line 400

def has_space?(address, size)
  header = read_blob_header(address)
  header.length == size
end

#inspectObject



410
411
412
413
414
415
416
417
418
419
420
421
# File 'lib/perobs/FlatFile.rb', line 410

def inspect
  s = '['
  each_blob_header do |pos, mark, length, blob_id, crc|
    s << "{ :pos => #{pos}, :mark => #{mark}, " +
         ":length => #{length}, :id => #{blob_id}, :crc => #{crc}"
    if mark != 0
      s << ", :value => #{@f.read(length)}"
    end
    s << " }\n"
  end
  s + ']'
end

#is_marked_by_id?(id) ⇒ Boolean

Return true if the object with the given ID is marked, false otherwise.

Parameters:

  • id (Integer)

    ID of the object

Returns:

  • (Boolean)


266
267
268
269
270
271
272
273
# File 'lib/perobs/FlatFile.rb', line 266

def is_marked_by_id?(id)
  if (addr = find_obj_addr_by_id(id))
    header = read_blob_header(addr, id)
    return (header.mark & 2) == 2
  end

  false
end

#mark_obj_by_address(addr, id) ⇒ Object

Mark the object at the specified address.

Parameters:

  • addr (Integer)

    Offset in the file

  • id (Integer)

    ID of the object



252
253
254
255
256
257
258
259
260
261
262
# File 'lib/perobs/FlatFile.rb', line 252

def mark_obj_by_address(addr, id)
  header = read_blob_header(addr, id)
  begin
    @f.seek(addr)
    @f.write([ header.mark | 2 ].pack('C'))
    @f.flush
  rescue => e
    PEROBS.log.fatal "Marking of FlatFile blob with ID #{id} " +
      "failed: #{e.message}"
  end
end

#mark_obj_by_id(id) ⇒ Object

Mark the object with the given ID.

Parameters:

  • id (Integer)

    ID of the object



243
244
245
246
247
# File 'lib/perobs/FlatFile.rb', line 243

def mark_obj_by_id(id)
  if (addr = find_obj_addr_by_id(id))
    mark_obj_by_address(addr, id)
  end
end

#openObject

Open the flat file for reading and writing.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/perobs/FlatFile.rb', line 71

def open
  file_name = File.join(@db_dir, 'database.blobs')
  begin
    if File.exist?(file_name)
      @f = File.open(file_name, 'rb+')
    else
      PEROBS.log.info 'New database.blobs file created'
      @f = File.open(file_name, 'wb+')
    end
  rescue IOError => e
    PEROBS.log.fatal "Cannot open flat file database #{file_name}: " +
      e.message
  end
  @index.open
  @space_list.open
end

#read_obj_by_address(addr, id) ⇒ String

Read the object at the specified address.

Parameters:

  • addr (Integer)

    Offset in the flat file

  • id (Integer)

    ID of the data blob

Returns:

  • (String)

    Raw object data



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/perobs/FlatFile.rb', line 223

def read_obj_by_address(addr, id)
  header = read_blob_header(addr, id)
  if header.id != id
    PEROBS.log.fatal "Database index corrupted: Index for object " +
      "#{id} points to object with ID #{header.id}"
  end
  begin
    @f.seek(addr + BLOB_HEADER_LENGTH)
    buf = @f.read(header.length)
    if checksum(buf) != header.crc
      PEROBS.log.fatal "Checksum failure while reading blob ID #{id}"
    end
    return buf
  rescue => e
    PEROBS.log.fatal "Cannot read blob for ID #{id}: #{e.message}"
  end
end

#read_obj_by_id(id) ⇒ String or nil

Read the object with the given ID.

Parameters:

  • id (Integer)

    ID of the object

Returns:

  • (String or nil)

    Raw object data if found, otherwise nil



211
212
213
214
215
216
217
# File 'lib/perobs/FlatFile.rb', line 211

def read_obj_by_id(id)
  if (addr = find_obj_addr_by_id(id))
    return read_obj_by_address(addr, id)
  end

  nil
end

#regenerate_index_and_spacesObject

This method clears the index tree and the free space list and regenerates them from the FlatFile.



386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/perobs/FlatFile.rb', line 386

def regenerate_index_and_spaces
  PEROBS.log.warn "Re-generating FlatFileDB index and space files"
  @index.clear
  @space_list.clear

  each_blob_header do |pos, mark, length, id, crc|
    if mark == 0
      @space_list.add_space(pos, length) if length > 0
    else
      @index.put_value(id, pos)
    end
  end
end

#syncObject

Force outstanding data to be written to the filesystem.



99
100
101
102
103
104
105
# File 'lib/perobs/FlatFile.rb', line 99

def sync
  begin
    @f.flush
  rescue IOError => e
    PEROBS.log.fatal "Cannot sync flat file database: #{e.message}"
  end
end

#write_obj_by_id(id, raw_obj) ⇒ Integer

Write the given object into the file. This method assumes that no other entry with the given ID exists already in the file.

Parameters:

  • id (Integer)

    ID of the object

  • raw_obj (String)

    Raw object as String

Returns:

  • (Integer)

    position of the written blob in the blob file



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/perobs/FlatFile.rb', line 154

def write_obj_by_id(id, raw_obj)
  addr, length = find_free_blob(raw_obj.length)
  begin
    if length != -1
      # Just a safeguard so we don't overwrite current data.
      header = read_blob_header(addr)
      if header.length != length
        PEROBS.log.fatal "Length in free list (#{length}) and header " +
          "(#{header.length}) don't match."
      end
      if raw_obj.length > header.length
        PEROBS.log.fatal "Object (#{raw_obj.length}) is longer than " +
          "blob space (#{header.length})."
      end
      if header.mark != 0
        PEROBS.log.fatal "Mark (#{header.mark}) is not 0."
      end
    end
    @f.seek(addr)
    @f.write([ 1, raw_obj.length, id, checksum(raw_obj)].
             pack(BLOB_HEADER_FORMAT))
    @f.write(raw_obj)
    if length != -1 && raw_obj.length < length
      # The new object was not appended and it did not completely fill the
      # free space. So we have to write a new header to mark the remaining
      # empty space.
      unless length - raw_obj.length >= BLOB_HEADER_LENGTH
        PEROBS.log.fatal "Not enough space to append the empty space " +
          "header (space: #{length} bytes, object: #{raw_obj.length} " +
          "bytes)."
      end
      space_address = @f.pos
      space_length = length - BLOB_HEADER_LENGTH - raw_obj.length
      @f.write([ 0, space_length, 0, 0 ].pack(BLOB_HEADER_FORMAT))
      # Register the new space with the space list.
      @space_list.add_space(space_address, space_length) if space_length > 0
    end
    @f.flush
    @index.put_value(id, addr)
  rescue IOError => e
    PEROBS.log.fatal "Cannot write blob for ID #{id} to FlatFileDB: " +
      e.message
  end

  addr
end