Class: Snapsync::LocalSync

Inherits:
Object
  • Object
show all
Defined in:
lib/snapsync/local_sync.rb

Overview

Synchronization between local file systems

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, target) ⇒ LocalSync

Returns a new instance of LocalSync.



13
14
15
# File 'lib/snapsync/local_sync.rb', line 13

def initialize(config, target)
    @config, @target = config, target
end

Instance Attribute Details

#configSnapperConfig (readonly)

The snapper configuration we should synchronize

Returns:



7
8
9
# File 'lib/snapsync/local_sync.rb', line 7

def config
  @config
end

#targetLocalTarget (readonly)

The target directory into which to synchronize

Returns:



11
12
13
# File 'lib/snapsync/local_sync.rb', line 11

def target
  @target
end

Instance Method Details

#copy_snapshot(target_snapshot_dir, src, parent: nil) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/snapsync/local_sync.rb', line 93

def copy_snapshot(target_snapshot_dir, src, parent: nil)
    # This variable is used in the 'ensure' block. Make sure it is
    # initialized properly
    success = false

    File.open(target_snapshot_dir + "info.xml", 'w') do |io|
        io.write (src.snapshot_dir + "info.xml").read
    end

    if parent
        parent_opt = ['-p', parent.subvolume_dir.to_s]
        estimated_size = src.size_diff_from(parent)
    else
        parent_opt = []
        estimated_size = src.size
    end

    Snapsync.info "Estimating transfer for #{src.snapshot_dir} to be #{human_readable_size(estimated_size)}"

    start = Time.now
    bytes_transferred = nil
    bytes_transferred =
        Btrfs.popen('send', *parent_opt, src.subvolume_dir.to_s) do |send_io|
            Btrfs.popen('receive', target_snapshot_dir.to_s, mode: 'w', out: '/dev/null') do |receive_io|
                receive_io.sync = true
                copy_stream(send_io, receive_io, estimated_size: estimated_size)
            end
        end

    Snapsync.info "Flushing data to disk"
    Btrfs.run("filesystem", "sync", target_snapshot_dir.to_s)
    duration = Time.now - start
    rate = bytes_transferred / duration
    Snapsync.info "Transferred #{human_readable_size(bytes_transferred)} in #{human_readable_time(duration)} (#{human_readable_size(rate)}/s)"
    Snapsync.info "Successfully synchronized #{src.snapshot_dir}"
    true

rescue Exception => e
    Snapsync.warn "Failed to synchronize #{src.snapshot_dir}, deleting target directory"
    subvolume_dir = target_snapshot_dir + "snapshot"
    if subvolume_dir.directory?
        Btrfs.run("subvolume", "delete", subvolume_dir.to_s)
    end
    if target_snapshot_dir.directory?
        target_snapshot_dir.rmtree
    end

    raise
end

#copy_stream(send_io, receive_io, chunk_length: (1 << 20), estimated_size: 0) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/snapsync/local_sync.rb', line 36

def copy_stream(send_io, receive_io, chunk_length: (1 << 20), estimated_size: 0)
    longest_message_length = 0
    counter = 0
    start = Time.now
    while !send_io.eof?
        if buffer = send_io.read(chunk_length) # 1MB buffer
            receive_io.write(buffer)

            counter += buffer.size
            rate = counter / (Time.now - start)
            remaining =
                if estimated_size > counter
                    human_readable_time((estimated_size - counter) / rate)
                elsif counter - estimated_size < 100 * 1024**2
                    human_readable_time(0)
                else
                    '?'
                end

            msg = "#{human_readable_size(counter)} (#{human_readable_size(rate)}/s), #{remaining} remaining"
            longest_message_length = [longest_message_length, msg.length].max
            print "\r%-#{longest_message_length}s" % [msg]
        end
    end
    print "\r#{" " * longest_message_length}\r"
    counter
end

#create_synchronization_pointObject



17
18
19
20
21
# File 'lib/snapsync/local_sync.rb', line 17

def create_synchronization_point
    config.create(
        description: "synchronization snapshot for snapsync",
        user_data: Hash['important' => 'yes', 'snapsync' => target.uuid])
end

#human_readable_size(size, digits: 1) ⇒ Object



202
203
204
205
206
207
208
209
210
# File 'lib/snapsync/local_sync.rb', line 202

def human_readable_size(size, digits: 1)
    order = ['B', 'kB', 'MB', 'GB']
    magnitude =
        if size > 0
            Integer(Math.log2(size) / 10)
        else 0
        end
    "%.#{digits}f#{order[magnitude]}" % [Float(size) / (1024 ** magnitude)]
end

#human_readable_time(time) ⇒ Object



195
196
197
198
199
200
# File 'lib/snapsync/local_sync.rb', line 195

def human_readable_time(time)
    hrs = time / 3600
    min = (time / 60) % 60
    sec = time % 60
    "%02i:%02i:%02i" % [hrs, min, sec]
end

#remove_synchronization_points(except_last: true) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/snapsync/local_sync.rb', line 23

def remove_synchronization_points(except_last: true)
    synchronization_points = config.each_snapshot.find_all do |snapshot|
        snapshot.synchronization_point_for?(target)
    end
    if except_last
        synchronization_points = synchronization_points.sort_by(&:num)
        synchronization_points.pop
    end
    synchronization_points.each do |snapshot|
        config.delete(snapshot)
    end
end

#syncObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/snapsync/local_sync.rb', line 143

def sync
    STDOUT.sync = true

    # First, create a snapshot and protect it against cleanup, to use as
    # synchronization point
    #
    # We remove old synchronization points on successful synchronization
    source_snapshots = config.each_snapshot.sort_by(&:num)
    sync_snapshot = source_snapshots.reverse.find do |snapshot|
        if snapshot.synchronization_point_for?(target)
            true
        elsif !snapshot.synchronization_point?
            break
        end
    end
    sync_snapshot ||= create_synchronization_point

    target_snapshots = target.each_snapshot.sort_by(&:num)
    nums_on_target = target_snapshots.map(&:num).to_set

    last_common_snapshot = source_snapshots.find do |s|
        nums_on_target.include?(s.num)
    end
    if !last_common_snapshot
        Snapsync.warn "no common snapshot found, will have to synchronize the first snapshot fully"
    end

    # Merge source and target snapshots to find out which are needed on
    # the target, and then remove the ones that are already present.
    all_snapshots = source_snapshots.find_all { |s| !nums_on_target.include?(s.num) } +
        target_snapshots
    nums_required = target.sync_policy.filter_snapshots(all_snapshots).
        map(&:num).to_set
    source_snapshots.each do |src|
        if !nums_required.include?(src.num)
            if nums_on_target.include?(src.num)
                last_common_snapshot = src
            end
            next
        elsif synchronize_snapshot(target.dir + src.num.to_s, src, parent: last_common_snapshot)
            last_common_snapshot = src
        end
    end

    if synchronize_snapshot(target.dir + sync_snapshot.num.to_s, sync_snapshot, parent: last_common_snapshot)
        Snapsync.debug "successfully copied last synchronization point #{sync_snapshot.num}, removing old ones"
        remove_synchronization_points
    end

    last_common_snapshot
end

#synchronize_snapshot(target_snapshot_dir, src, parent: nil) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/snapsync/local_sync.rb', line 64

def synchronize_snapshot(target_snapshot_dir, src, parent: nil)
    partial_marker_path = Snapshot.partial_marker_path(target_snapshot_dir)

    # Verify first if the snapshot is already present and/or partially
    # synchronized
    begin
        snapshot = Snapshot.new(target_snapshot_dir)
        if snapshot.partial?
            Snapsync.warn "target snapshot directory #{target_snapshot_dir} looks like an aborted snapsync synchronization, I will attempt to refresh it"
        else
            return true
        end
    rescue InvalidSnapshot
        if target_snapshot_dir.exist?
            Snapsync.warn "target snapshot directory #{target_snapshot_dir} already exists, but does not seem to be a valid snapper snapshot. I will attempt to refresh it"
        else
            target_snapshot_dir.mkdir
        end
        FileUtils.touch(partial_marker_path.to_s)
    end

    if copy_snapshot(target_snapshot_dir, src, parent: parent)
        partial_marker_path.unlink
        IO.popen(["btrfs", "filesystem", "sync", target_snapshot_dir.to_s, err: '/dev/null']).read
        Snapsync.info "Successfully synchronized #{src.snapshot_dir}"
        true
    end
end