Module: MongoOplogBackup::Oplog

Defined in:
lib/mongo_oplog_backup/oplog.rb

Constant Summary collapse

FILENAME_RE =
/\/oplog-(\d+):(\d+)-(\d+):(\d+)\.bson(?:\.gz)?\z/

Class Method Summary collapse

Class Method Details

.each_document(filename) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/mongo_oplog_backup/oplog.rb', line 4

def self.each_document(filename)
  yield_bson_document = Proc.new do |stream|
    while !stream.eof?
      # FIXME: Since bson4, from_bson takes a ByteArray instead of a StringIO
      yield BSON::Document.from_bson(stream)
    end
  end

  if gzip_fingerprint(filename)
    Zlib::GzipReader.open(filename, &yield_bson_document)
  else
    File.open(filename, 'rb', &yield_bson_document)
  end
end

.find_oplogs(dir) ⇒ Object



113
114
115
116
117
118
# File 'lib/mongo_oplog_backup/oplog.rb', line 113

def self.find_oplogs(dir)
  files = Dir.glob(File.join(dir, 'oplog-*.bson*'))
  files.keep_if {|name| name =~ FILENAME_RE}
  files.sort! {|a, b| timestamps_from_filename(a)[:first] <=> timestamps_from_filename(b)[:first]}
  files
end

.gzip_fingerprint(filename) ⇒ Object



128
129
130
131
# File 'lib/mongo_oplog_backup/oplog.rb', line 128

def self.gzip_fingerprint filename
  bytes = File.read(filename, 2, 0)
  bytes[0] == "\x1f".force_encoding('BINARY') && bytes[1] == "\x8b".force_encoding('BINARY')
end

.merge(target, source_files, options = {}) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/mongo_oplog_backup/oplog.rb', line 46

def self.merge(target, source_files, options={})
  limit = options[:limit] # TODO: use
  force = options[:force]
  compress = !!options[:gzip]

  process_output = Proc.new do |output|
    last_timestamp = nil
    first = true

    source_files.each do |filename|
      timestamps = timestamps_from_filename(filename)
      if timestamps
        expected_first = timestamps[:first]
        expected_last = timestamps[:last]
      else
        expected_first = nil
        expected_last = nil
      end

      # Optimize:
      # We can assume that the timestamps are in order.
      # This means we only need to find the first non-overlapping point,
      # and the rest we can pass through directly.
      MongoOplogBackup.log.debug "Reading #{filename}"
      last_file_timestamp = nil
      skipped = 0
      wrote = 0
      first_file_timestamp = nil
      Oplog.each_document(filename) do |doc|
        timestamp = doc['ts']
        first_file_timestamp = timestamp if first_file_timestamp.nil?

        # gzip stores the mtime in the header, so we set it explicity for consistency between runs.
        output.mtime = first_file_timestamp.seconds if output.mtime.to_i == 0

        if !last_timestamp.nil? && timestamp <= last_timestamp
          skipped += 1
        elsif !last_file_timestamp.nil? && timestamp <= last_file_timestamp
          raise "Timestamps out of order in #{filename}"
        else
          output.write(doc.to_bson)
          wrote += 1
          last_timestamp = timestamp
        end
        last_file_timestamp = timestamp
      end

      if expected_first && first_file_timestamp != expected_first
        raise "#{expected_first} was not the first timestamp in #{filename}"
      end

      if expected_last && last_file_timestamp != expected_last
        raise "#{expected_last} was not the last timestamp in #{filename}"
      end

      MongoOplogBackup.log.info "Wrote #{wrote} and skipped #{skipped} oplog entries from #{filename}"
      raise "Overlap must be exactly 1" unless first || skipped == 1 || force
      first = false
    end
  end
  if (compress)
    Zlib::GzipWriter.open(target, &process_output)
  else
    File.open(target, 'wb', &process_output)
  end
end

.merge_backup(dir) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/mongo_oplog_backup/oplog.rb', line 120

def self.merge_backup(dir)
  oplogs = find_oplogs(dir)
  compress_target = oplogs.any? { |o| o.end_with?('.gz') }
  target = File.join(dir, 'dump', 'oplog.bson') # Mongorestore expects this filename, without a gzip suffix.
  FileUtils.mkdir_p(File.join(dir, 'dump'))
  merge(target, oplogs, {gzip: compress_target})
end

.oplog_timestamps(filename) ⇒ Object



19
20
21
22
23
24
25
26
27
# File 'lib/mongo_oplog_backup/oplog.rb', line 19

def self.oplog_timestamps(filename)
  timestamps = []
  each_document(filename) do |doc|
    # This can be optimized by only decoding the timestamp
    # (first field), instead of decoding the entire document.
    timestamps << doc['ts']
  end
  timestamps
end

.timestamps_from_filename(filename) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/mongo_oplog_backup/oplog.rb', line 31

def self.timestamps_from_filename filename
  match = FILENAME_RE.match(filename)
  return nil unless match
  s1 = match[1].to_i
  i1 = match[2].to_i
  s2 = match[3].to_i
  i2 = match[4].to_i
  first = BSON::Timestamp.new(s1, i1)
  last = BSON::Timestamp.new(s2, i2)
  {
    first: first,
    last: last
  }
end