Class: Snapsync::SnapshotTransfer

Inherits:
Object
  • Object
show all
Defined in:
lib/snapsync/snapshot_transfer.rb

Overview

Snapshot transfer between two btrfs filesystems

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, target) ⇒ SnapshotTransfer

Returns a new instance of SnapshotTransfer.



19
20
21
22
23
24
# File 'lib/snapsync/snapshot_transfer.rb', line 19

def initialize(config, target)
    @config, @target = config, target

    @btrfs_src = Btrfs.get(config.subvolume)
    @btrfs_dest = Btrfs.get(@target.dir)
end

Instance Attribute Details

#btrfs_destBtrfs (readonly)

Returns dest filesystem.

Returns:

  • (Btrfs)

    dest filesystem



17
18
19
# File 'lib/snapsync/snapshot_transfer.rb', line 17

def btrfs_dest
  @btrfs_dest
end

#btrfs_srcBtrfs (readonly)

Returns src filesystem.

Returns:

  • (Btrfs)

    src filesystem



14
15
16
# File 'lib/snapsync/snapshot_transfer.rb', line 14

def btrfs_src
  @btrfs_src
end

#configSnapperConfig (readonly)

The snapper configuration we should synchronize

Returns:



7
8
9
# File 'lib/snapsync/snapshot_transfer.rb', line 7

def config
  @config
end

#targetSyncTarget (readonly)

The target directory into which to synchronize

Returns:



11
12
13
# File 'lib/snapsync/snapshot_transfer.rb', line 11

def target
  @target
end

Instance Method Details

#copy_snapshot(target_snapshot_dir, src, parent: nil) ⇒ Object

Parameters:



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/snapsync/snapshot_transfer.rb', line 106

def copy_snapshot(target_snapshot_dir, src, parent: nil)
    # This variable is used in the 'ensure' block. Make sure it is
    # initialized properly
    success = false

    (target_snapshot_dir + "info.xml").open('w') do |io|
        io.write (src.snapshot_dir + "info.xml").read
    end

    if parent
        parent_opt = ['-p', parent.subvolume_dir.to_s]
        estimated_size = src.size_diff_from(parent)
    else
        parent_opt = []
        estimated_size = src.size
    end

    Snapsync.info "Estimating transfer for #{src.snapshot_dir} to be #{human_readable_size(estimated_size)}"

    start = Time.now
    bytes_transferred = nil
    bytes_transferred =
        btrfs_src.popen('send', *parent_opt, src.subvolume_dir.to_s) do |send_io|
            btrfs_dest.popen('receive', target_snapshot_dir.path_part, mode: 'w', out: '/dev/null') do |receive_io|
                receive_io.sync = true
                copy_stream(send_io, receive_io, estimated_size: estimated_size)
            end
        end

    Snapsync.info "Flushing data to disk"
    btrfs_dest.run("filesystem", "sync", target_snapshot_dir.path_part)
    duration = Time.now - start
    rate = bytes_transferred / duration
    Snapsync.info "Transferred #{human_readable_size(bytes_transferred)} in #{human_readable_time(duration)} (#{human_readable_size(rate)}/s)"
    Snapsync.info "Successfully synchronized #{src.snapshot_dir}"
    true

rescue Exception => e
    subvolume_dir = target_snapshot_dir + "snapshot"
    Snapsync.warn "Failed to synchronize #{src.snapshot_dir}, deleting target directory #{subvolume_dir}"
    if subvolume_dir.directory?
        btrfs_dest.run("subvolume", "delete", subvolume_dir.path_part)
    end
    if target_snapshot_dir.directory?
        target_snapshot_dir.rmtree
    end

    raise
end

#copy_stream(send_io, receive_io, chunk_length: (1 << 20), estimated_size: 0) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/snapsync/snapshot_transfer.rb', line 45

def copy_stream(send_io, receive_io, chunk_length: (1 << 20), estimated_size: 0)
    longest_message_length = 0
    counter = 0
    start = Time.now
    while !send_io.eof?
        if buffer = send_io.read(chunk_length) # 1MB buffer
            receive_io.write(buffer)

            counter += buffer.size
            rate = counter / (Time.now - start)
            remaining =
                if estimated_size > counter
                    human_readable_time((estimated_size - counter) / rate)
                elsif counter - estimated_size < 100 * 1024**2
                    human_readable_time(0)
                else
                    '?'
                end

            msg = "#{human_readable_size(counter)} (#{human_readable_size(rate)}/s), #{remaining} remaining"
            longest_message_length = [longest_message_length, msg.length].max
            print "\r%-#{longest_message_length}s" % [msg]
        end
    end
    print "\r#{" " * longest_message_length}\r"
    counter
end

#create_synchronization_pointObject



26
27
28
29
30
# File 'lib/snapsync/snapshot_transfer.rb', line 26

def create_synchronization_point
    config.create(
        description: "synchronization snapshot for snapsync",
        user_data: Hash['important' => 'yes', 'snapsync-description' => target.description, 'snapsync' => target.uuid])
end

#remove_synchronization_points(except_last: true) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/snapsync/snapshot_transfer.rb', line 32

def remove_synchronization_points(except_last: true)
    synchronization_points = config.each_snapshot.find_all do |snapshot|
        snapshot.synchronization_point_for?(target)
    end
    if except_last
        synchronization_points = synchronization_points.sort_by(&:num)
        synchronization_points.pop
    end
    synchronization_points.each do |snapshot|
        config.delete(snapshot)
    end
end

#syncObject



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/snapsync/snapshot_transfer.rb', line 156

def sync
    STDOUT.sync = true

    # Do a snapper cleanup before syncing
    config.cleanup

    # First, create a snapshot and protect it against cleanup, to use as
    # synchronization point
    #
    # We remove old synchronization points on successful synchronization
    source_snapshots = config.each_snapshot.sort_by(&:num)
    sync_snapshot = source_snapshots.reverse.find do |snapshot|
        if snapshot.synchronization_point_for?(target)
            true
        elsif !snapshot.synchronization_point?
            break
        end
    end
    sync_snapshot ||= create_synchronization_point

    target_snapshots = target.each_snapshot.sort_by(&:num)
    nums_on_target = target_snapshots.map(&:num).to_set

    last_common_snapshot = source_snapshots.find do |s|
        nums_on_target.include?(s.num)
    end
    if !last_common_snapshot
        Snapsync.warn "no common snapshot found, will have to synchronize the first snapshot fully"
    end

    # Merge source and target snapshots to find out which are needed on
    # the target, and then remove the ones that are already present.
    all_snapshots = source_snapshots.find_all { |s| !nums_on_target.include?(s.num) } +
        target_snapshots
    nums_required = target.sync_policy.filter_snapshots(all_snapshots).
        map(&:num).to_set
    source_snapshots.each do |src|
        if !nums_required.include?(src.num)
            if nums_on_target.include?(src.num)
                last_common_snapshot = src
            end
            next
        elsif synchronize_snapshot(target.dir + src.num.to_s, src, parent: last_common_snapshot)
            last_common_snapshot = src
        end
    end

    if synchronize_snapshot(target.dir + sync_snapshot.num.to_s, sync_snapshot, parent: last_common_snapshot)
        Snapsync.debug "successfully copied last synchronization point #{sync_snapshot.num}, removing old ones"
        remove_synchronization_points
    end

    last_common_snapshot
end

#synchronize_snapshot(target_snapshot_dir, src, parent: nil) ⇒ Object

Parameters:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/snapsync/snapshot_transfer.rb', line 74

def synchronize_snapshot(target_snapshot_dir, src, parent: nil)
    partial_marker_path = Snapshot.partial_marker_path(target_snapshot_dir)

    # Verify first if the snapshot is already present and/or partially
    # synchronized
    begin
        snapshot = Snapshot.new(target_snapshot_dir)
        if snapshot.partial?
            Snapsync.warn "target snapshot directory #{target_snapshot_dir} looks like an aborted snapsync synchronization, I will attempt to refresh it"
        else
            return true
        end
    rescue InvalidSnapshot
        if target_snapshot_dir.exist?
            Snapsync.warn "target snapshot directory #{target_snapshot_dir} already exists, but does not seem to be a valid snapper snapshot. I will attempt to refresh it"
        else
            target_snapshot_dir.mkdir
        end
        partial_marker_path.touch
    end

    if copy_snapshot(target_snapshot_dir, src, parent: parent)
        partial_marker_path.unlink
        btrfs_dest.run("filesystem", "sync", target_snapshot_dir.path_part)
        Snapsync.info "Successfully synchronized #{src.snapshot_dir}"
        true
    end
end