Class: PEROBS::FlatFile

Inherits:
Object
  • Object
show all
Defined in:
lib/perobs/FlatFile.rb

Overview

The FlatFile class manages the storage file of the FlatFileDB. It contains a sequence of blobs Each blob consists of header and the actual blob data bytes.

Instance Method Summary collapse

Constructor Details

#initialize(dir) ⇒ FlatFile

Create a new FlatFile object for a database in the given path.

Parameters:

  • dir (String)

    Directory path for the data base file



44
45
46
47
48
49
# File 'lib/perobs/FlatFile.rb', line 44

def initialize(dir)
  @db_dir = dir
  @f = nil
  @index = IndexTree.new(dir)
  @space_list = FreeSpaceManager.new(dir)
end

Instance Method Details

#check(repair = false) ⇒ Object



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/perobs/FlatFile.rb', line 396

def check(repair = false)
  return unless @f

  t = Time.now
  PEROBS.log.info "Checking FlatFile database" +
    "#{repair ? ' in repair mode' : ''}..."

  # First check the database blob file. Each entry should be readable and
  # correct.
  each_blob_header do |pos, header|
    if header.is_valid?
      # We have a non-deleted entry.
      begin
        @f.seek(pos + FlatFileBlobHeader::LENGTH)
        buf = @f.read(header.length)
        # Uncompress the data if the compression bit is set in the mark
        # byte.
        buf = Zlib.inflate(buf) if header.is_compressed?

        if header.crc && checksum(buf) != header.crc
          if repair
            PEROBS.log.error "Checksum failure while checking blob " +
              "with ID #{header.id}. Deleting object."
            delete_obj_by_address(pos, header.id)
          else
            PEROBS.log.fatal "Checksum failure while checking blob " +
              "with ID #{header.id}"
          end
        end
      rescue IOError => e
        PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " +
          e.message
      end
    end
  end

  # Now we check the index data. It must be correct and the entries must
  # match the blob file. All entries in the index must be in the blob file
  # and vise versa.
  begin
    unless @index.check(self) && @space_list.check(self) &&
           cross_check_entries
      regenerate_index_and_spaces if repair
    end
  rescue PEROBS::FatalError
    regenerate_index_and_spaces if repair
  end

  sync if repair
  PEROBS.log.info "check_db completed in #{Time.now - t} seconds"
end

#clear_all_marksObject

Clear alls marks.



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/perobs/FlatFile.rb', line 288

def clear_all_marks
  t = Time.now
  PEROBS.log.info "Clearing all marks..."

  total_blob_count = 0
  marked_blob_count = 0

  each_blob_header do |pos, header|
    total_blob_count += 1
    if header.is_valid? && header.is_marked?
      # Clear all valid and marked blocks.
      marked_blob_count += 1
      begin
        @f.seek(pos)
        @f.write([ header.mark & 0b11111101 ].pack('C'))
        @f.flush
      rescue IOError => e
        PEROBS.log.fatal "Unmarking of FlatFile blob with ID #{blob_id} " +
          "failed: #{e.message}"
      end
    end
  end
  PEROBS.log.info "#{marked_blob_count} marks in #{total_blob_count} " +
    "objects cleared in #{Time.now - t} seconds"
end

#closeObject

Close the flat file. This method must be called to ensure that all data is really written into the filesystem.



74
75
76
77
78
79
80
81
# File 'lib/perobs/FlatFile.rb', line 74

def close
  @space_list.close
  @index.close
  @f.flush
  @f.flock(File::LOCK_UN)
  @f.close
  @f = nil
end

#defragmentizeObject

Eliminate all the holes in the file. This is an in-place implementation. No additional space will be needed on the file system.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/perobs/FlatFile.rb', line 316

def defragmentize
  distance = 0
  deleted_blobs = 0
  valid_blobs = 0
  t = Time.now
  PEROBS.log.info "Defragmenting FlatFile"
  # Iterate over all entries.
  each_blob_header do |pos, header|
    # Total size of the current entry
    entry_bytes = FlatFileBlobHeader::LENGTH + header.length
    if header.is_valid?
      # We have found a valid entry.
      valid_blobs += 1
      if distance > 0
        begin
          # Read current entry into a buffer
          @f.seek(pos)
          buf = @f.read(entry_bytes)
          # Write the buffer right after the end of the previous entry.
          @f.seek(pos - distance)
          @f.write(buf)
          # Update the index with the new position
          @index.put_value(header.id, pos - distance)
          # Mark the space between the relocated current entry and the
          # next valid entry as deleted space.
          FlatFileBlobHeader.new(0, distance - FlatFileBlobHeader::LENGTH,
                                 0, 0).write(@f)
          @f.flush
        rescue IOError => e
          PEROBS.log.fatal "Error while moving blob for ID #{header.id}: " +
            e.message
        end
      end
    else
      deleted_blobs += 1
      distance += entry_bytes
    end
  end
  PEROBS.log.info "FlatFile defragmented in #{Time.now - t} seconds"
  PEROBS.log.info "#{distance / 1000} KiB/#{deleted_blobs} blobs of " +
    "#{@f.size / 1000} KiB/#{valid_blobs} blobs or " +
    "#{'%.1f' % (distance.to_f / @f.size * 100.0)}% reclaimed"

  @f.flush
  @f.truncate(@f.size - distance)
  @f.flush
  @space_list.clear

  sync
end

#delete_obj_by_address(addr, id) ⇒ Object

Delete the blob that is stored at the specified address.

Parameters:

  • addr (Integer)

    Address of the blob to delete

  • id (Integer)

    ID of the blob to delete



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/perobs/FlatFile.rb', line 107

def delete_obj_by_address(addr, id)
  @index.delete_value(id)
  header = FlatFileBlobHeader.read_at(@f, addr, id)
  begin
    @f.seek(addr)
    @f.write([ 0 ].pack('C'))
    @f.flush
    @space_list.add_space(addr, header.length)
  rescue IOError => e
    PEROBS.log.fatal "Cannot erase blob for ID #{header.id}: #{e.message}"
  end
end

#delete_obj_by_id(id) ⇒ Boolean

Delete the blob for the specified ID.

Parameters:

  • id (Integer)

    ID of the object to be deleted

Returns:

  • (Boolean)

    True if object was deleted, false otherwise



95
96
97
98
99
100
101
102
# File 'lib/perobs/FlatFile.rb', line 95

def delete_obj_by_id(id)
  if (pos = find_obj_addr_by_id(id))
    delete_obj_by_address(pos, id)
    return true
  end

  return false
end

#delete_unmarked_objectsObject

Delete all unmarked objects.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/perobs/FlatFile.rb', line 121

def delete_unmarked_objects
  PEROBS.log.info "Deleting unmarked objects..."
  t = Time.now

  deleted_ids = []
  each_blob_header do |pos, header|
    if header.is_valid? && !header.is_marked?
      delete_obj_by_address(pos, header.id)
      deleted_ids << header.id
    end
  end
  defragmentize

  PEROBS.log.info "#{deleted_ids.length} unmarked objects deleted " +
    "in #{Time.now - t} seconds"
  deleted_ids
end

#find_obj_addr_by_id(id) ⇒ Integer

Find the address of the object with the given ID.

Parameters:

  • id (Integer)

    ID of the object

Returns:

  • (Integer)

    Offset in the flat file or nil if not found



206
207
208
# File 'lib/perobs/FlatFile.rb', line 206

def find_obj_addr_by_id(id)
  @index.get_value(id)
end

#has_id_at?(id, address) ⇒ Boolean

Returns:

  • (Boolean)


469
470
471
472
# File 'lib/perobs/FlatFile.rb', line 469

def has_id_at?(id, address)
  header = FlatFileBlobHeader.read_at(@f, address)
  header.id == id
end

#has_space?(address, size) ⇒ Boolean

Returns:

  • (Boolean)


464
465
466
467
# File 'lib/perobs/FlatFile.rb', line 464

def has_space?(address, size)
  header = FlatFileBlobHeader.read_at(@f, address)
  header.length == size
end

#inspectObject



474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/perobs/FlatFile.rb', line 474

def inspect
  s = '['
  each_blob_header do |pos, header|
    s << "{ :pos => #{pos}, :mark => #{header.mark}, " +
         ":length => #{header.length}, :id => #{header.id}, " +
         ":crc => #{header.crc}"
    if header.is_valid?
      s << ", :value => #{@f.read(header.length)}"
    end
    s << " }\n"
  end
  s + ']'
end

#is_marked_by_id?(id) ⇒ Boolean

Return true if the object with the given ID is marked, false otherwise.

Parameters:

  • id (Integer)

    ID of the object

Returns:

  • (Boolean)


278
279
280
281
282
283
284
285
# File 'lib/perobs/FlatFile.rb', line 278

def is_marked_by_id?(id)
  if (addr = find_obj_addr_by_id(id))
    header = FlatFileBlobHeader.read_at(@f, addr, id)
    return header.is_marked?
  end

  false
end

#mark_obj_by_address(addr, id) ⇒ Object

Mark the object at the specified address.

Parameters:

  • addr (Integer)

    Offset in the file

  • id (Integer)

    ID of the object



264
265
266
267
268
269
270
271
272
273
274
# File 'lib/perobs/FlatFile.rb', line 264

def mark_obj_by_address(addr, id)
  header = FlatFileBlobHeader.read_at(@f, addr, id)
  begin
    @f.seek(addr)
    @f.write([ header.mark | (1 << 1) ].pack('C'))
    @f.flush
  rescue IOError => e
    PEROBS.log.fatal "Marking of FlatFile blob with ID #{id} " +
      "failed: #{e.message}"
  end
end

#mark_obj_by_id(id) ⇒ Object

Mark the object with the given ID.

Parameters:

  • id (Integer)

    ID of the object



255
256
257
258
259
# File 'lib/perobs/FlatFile.rb', line 255

def mark_obj_by_id(id)
  if (addr = find_obj_addr_by_id(id))
    mark_obj_by_address(addr, id)
  end
end

#openObject

Open the flat file for reading and writing.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/perobs/FlatFile.rb', line 52

def open
  file_name = File.join(@db_dir, 'database.blobs')
  begin
    if File.exist?(file_name)
      @f = File.open(file_name, 'rb+')
    else
      PEROBS.log.info 'New database.blobs file created'
      @f = File.open(file_name, 'wb+')
    end
  rescue IOError => e
    PEROBS.log.fatal "Cannot open flat file database #{file_name}: " +
      e.message
  end
  unless @f.flock(File::LOCK_NB | File::LOCK_EX)
    PEROBS.log.fatal 'Database is locked by another process'
  end
  @index.open
  @space_list.open
end

#read_obj_by_address(addr, id) ⇒ String

Read the object at the specified address.

Parameters:

  • addr (Integer)

    Offset in the flat file

  • id (Integer)

    ID of the data blob

Returns:

  • (String)

    Raw object data



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/perobs/FlatFile.rb', line 225

def read_obj_by_address(addr, id)
  header = FlatFileBlobHeader.read_at(@f, addr, id)
  if header.id != id
    PEROBS.log.fatal "Database index corrupted: Index for object " +
      "#{id} points to object with ID #{header.id}"
  end

  buf = nil

  begin
    @f.seek(addr + FlatFileBlobHeader::LENGTH)
    buf = @f.read(header.length)
  rescue IOError => e
    PEROBS.log.fatal "Cannot read blob for ID #{id}: #{e.message}"
  end

  # Uncompress the data if the compression bit is set in the mark byte.
  if header.is_compressed?
    buf = Zlib.inflate(buf)
  end

  if checksum(buf) != header.crc
    PEROBS.log.fatal "Checksum failure while reading blob ID #{id}"
  end

  buf
end

#read_obj_by_id(id) ⇒ String or nil

Read the object with the given ID.

Parameters:

  • id (Integer)

    ID of the object

Returns:

  • (String or nil)

    Raw object data if found, otherwise nil



213
214
215
216
217
218
219
# File 'lib/perobs/FlatFile.rb', line 213

def read_obj_by_id(id)
  if (addr = find_obj_addr_by_id(id))
    return read_obj_by_address(addr, id)
  end

  nil
end

#refreshObject

This method iterates over all entries in the FlatFile and removes the entry and inserts it again. This is useful to update all entries in cased the storage format has changed.



370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/perobs/FlatFile.rb', line 370

def refresh
  # This iteration might look scary as we iterate over the entries while
  # while we are rearranging them. Re-inserted items may be inserted
  # before or at the current entry and this is fine. They also may be
  # inserted after the current entry and will be re-read again unless they
  # are inserted after the original file end.
  file_size = @f.size
  PEROBS.log.info "Refreshing the DB..."
  t = Time.now
  each_blob_header do |pos, header|
    if header.is_valid?
      buf = read_obj_by_address(pos, header.id)
      delete_obj_by_address(pos, header.id)
      write_obj_by_id(header.id, buf)
    end

    # Some re-inserted blobs may be inserted after the original file end.
    # No need to process those blobs again.
    break if pos >= file_size
  end
  PEROBS.log.info "DB refresh completed in #{Time.now - t} seconds"

  # Reclaim the space saved by compressing entries.
  defragmentize
end

#regenerate_index_and_spacesObject

This method clears the index tree and the free space list and regenerates them from the FlatFile.



450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/perobs/FlatFile.rb', line 450

def regenerate_index_and_spaces
  PEROBS.log.warn "Re-generating FlatFileDB index and space files"
  @index.clear
  @space_list.clear

  each_blob_header do |pos, header|
    if header.is_valid?
      @index.put_value(header.id, pos)
    else
      @space_list.add_space(pos, header.length) if header.length > 0
    end
  end
end

#syncObject

Force outstanding data to be written to the filesystem.



84
85
86
87
88
89
90
# File 'lib/perobs/FlatFile.rb', line 84

def sync
  begin
    @f.flush
  rescue IOError => e
    PEROBS.log.fatal "Cannot sync flat file database: #{e.message}"
  end
end

#write_obj_by_id(id, raw_obj) ⇒ Integer

Write the given object into the file. This method assumes that no other entry with the given ID exists already in the file.

Parameters:

  • id (Integer)

    ID of the object

  • raw_obj (String)

    Raw object as String

Returns:

  • (Integer)

    position of the written blob in the blob file



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/perobs/FlatFile.rb', line 144

def write_obj_by_id(id, raw_obj)
  crc = checksum(raw_obj)

  # If the raw_obj is larger then 256 characters we will compress it to
  # safe some space in the database file. For smaller strings the
  # performance impact of compression is not compensated by writing
  # less data to the storage.
  compressed = false
  if raw_obj.length > 256
    raw_obj = Zlib.deflate(raw_obj)
    compressed = true
  end

  addr, length = find_free_blob(raw_obj.length)
  begin
    if length != -1
      # Just a safeguard so we don't overwrite current data.
      header = FlatFileBlobHeader.read_at(@f, addr)
      if header.length != length
        PEROBS.log.fatal "Length in free list (#{length}) and header " +
          "(#{header.length}) don't match."
      end
      if raw_obj.length > header.length
        PEROBS.log.fatal "Object (#{raw_obj.length}) is longer than " +
          "blob space (#{header.length})."
      end
      if header.is_valid?
        PEROBS.log.fatal "Entry (mark: #{header.mark}) is already used."
      end
    end
    @f.seek(addr)
    FlatFileBlobHeader.new(compressed ? (1 << 2) | 1 : 1, raw_obj.length,
                           id, crc).write(@f)
    @f.write(raw_obj)
    if length != -1 && raw_obj.length < length
      # The new object was not appended and it did not completely fill the
      # free space. So we have to write a new header to mark the remaining
      # empty space.
      unless length - raw_obj.length >= FlatFileBlobHeader::LENGTH
        PEROBS.log.fatal "Not enough space to append the empty space " +
          "header (space: #{length} bytes, object: #{raw_obj.length} " +
          "bytes)."
      end
      space_address = @f.pos
      space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.length
      FlatFileBlobHeader.new(0, space_length, 0, 0).write(@f)
      # Register the new space with the space list.
      @space_list.add_space(space_address, space_length) if space_length > 0
    end
    @f.flush
    @index.put_value(id, addr)
  rescue IOError => e
    PEROBS.log.fatal "Cannot write blob for ID #{id} to FlatFileDB: " +
      e.message
  end

  addr
end