Class: Roma::RecoverLost

Inherits:
Object
  • Object
show all
Defined in:
lib/roma/tools/recoverlost_lib.rb

Instance Method Summary collapse

Constructor Details

#initialize(pname, pushv_cmd, argv, alldata = false) ⇒ RecoverLost

Returns a new instance of RecoverLost.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/roma/tools/recoverlost_lib.rb', line 18

def initialize(pname, pushv_cmd, argv, alldata = false)
  if alldata == false && argv.length < 4
    puts "usage:#{pname} address port storage-path [yyyymmddhhmmss]"
    exit
  end

  if alldata && argv.length != 3
    puts "usage:#{pname} address port storage-path"
    exit
  end

  @addr = argv[0]
  @port = argv[1]
  @strgpath = argv[2]
  @ymdhms = argv[3]

  if @port =~ /\D/
    STDERR.puts "port was not numeric."
    exit
  end

  if @ymdhms && (@ymdhms.length != 14 || @ymdhms =~ /\D/)
    STDERR.puts "yyyymmddhhmmss format mismatch."
    exit
  end
  @pushv_cmd = pushv_cmd
  @nodeid = "#{@addr}_#{@port}"
  @stream_copy_wait_param = 0.0001
  @alldata = alldata
end

Instance Method Details

#broadcast_cmd(cmd, without_nids = nil) ⇒ Object



321
322
323
324
325
326
327
328
329
330
331
# File 'lib/roma/tools/recoverlost_lib.rb', line 321

def broadcast_cmd(cmd,without_nids=nil)
  without_nids=[] unless without_nids
  res = {}
  @rd.nodes.each{ |nid|
    res[nid] = send_cmd(nid,cmd) unless without_nids.include?(nid)
  }
  res
rescue => e
  STDERR.puts("#{e}\n#{$@}")
  nil
end

#clk_to_zero(data) ⇒ Object



310
311
312
313
314
315
316
317
318
319
# File 'lib/roma/tools/recoverlost_lib.rb', line 310

def clk_to_zero(data)
  vn, last, clk, expt, klen = data.unpack('NNNNN')
  k, vlen = data[20..-1].unpack("a#{klen}N")
  if vlen != 0
    v, = data[(20+klen+4)..-1].unpack("a#{vlen}")
    [vn, last, 0, expt, klen, k, vlen, v].pack("NNNNNa#{klen}Na#{vlen}")
  else
    [vn, last, 0, expt, klen, k, 0].pack("NNNNNa#{klen}N")
  end
end

#each_hash(path) ⇒ Object



81
82
83
84
85
86
87
# File 'lib/roma/tools/recoverlost_lib.rb', line 81

def each_hash(path)
  Dir::glob("#{path}/*").each{|dir|
    next unless File::directory?(dir)
    hname = dir[dir.rindex('/')+1..-1]
    yield hname,dir
  }
end

#get_history_of_lost(nid, ymdhms) ⇒ Object



102
103
104
105
106
107
108
109
110
111
# File 'lib/roma/tools/recoverlost_lib.rb', line 102

def get_history_of_lost(nid,ymdhms)
  ret = []
  con = Roma::Messaging::ConPool.instance.get_connection(nid)
  con.write("history_of_lost #{ymdhms}\r\n")
  while((buf = con.gets) != "END\r\n")
    ret << buf.chomp.to_i
  end
  Roma::Messaging::ConPool.instance.return_connection(nid, con)
  ret
end

#get_lost_vnodes(rd, ymdhms) ⇒ Object



94
95
96
97
98
99
100
# File 'lib/roma/tools/recoverlost_lib.rb', line 94

def get_lost_vnodes(rd,ymdhms)
  ret = rd.get_lost_vnodes
  if ymdhms
    ret |= get_history_of_lost(@nodeid,ymdhms)
  end
  ret
end

#get_routing_data(nid) ⇒ Object



89
90
91
92
# File 'lib/roma/tools/recoverlost_lib.rb', line 89

def get_routing_data(nid)
  sender = Roma::Client::Sender.new
  sender.send_routedump_command(nid)
end

#make_node_hash(keys) ⇒ Object



198
199
200
201
202
203
204
205
206
# File 'lib/roma/tools/recoverlost_lib.rb', line 198

def make_node_hash(keys)
  res = {}
  @rd.nodes.each{|nid| res[nid] = [] }
  keys.each{|key|
    d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits)
    @rd.v_idx[d & @rd.search_mask].each{|nid| res[nid] << key }
  }
  res
end

#new_storage(ext) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/roma/tools/recoverlost_lib.rb', line 132

def new_storage(ext)
  case(ext)
  when 'tc'
    return ::Roma::Storage::TCStorage.new
  when 'dbm'
    return Roma::Storage::DbmStorage.new
  when 'sql3'
    return Roma::Storage::SQLite3Storage.new
  else
    return nil
  end
end

#open_storage(path, vn_list) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/roma/tools/recoverlost_lib.rb', line 113

def open_storage(path,vn_list)
  unless File::directory?(path)
    STDERR.puts "#{path} does not found."
    return nil
  end

  # get a file extension
  ext = File::extname(Dir::glob("#{path}/0.*")[0])[1..-1]
  # count a number of divided files
  divnum = Dir::glob("#{path}/*.#{ext}").length

  st = new_storage(ext)
  st.divnum = divnum
  st.vn_list = vn_list
  st.storage_path = path
  st.opendb
  st
end

#push_a_vnode_stream(hname, vn, nid) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/roma/tools/recoverlost_lib.rb', line 172

def push_a_vnode_stream(hname, vn, nid)
  con = Roma::Messaging::ConPool.instance.get_connection(nid)

  con.write("#{@pushv_cmd} #{hname} #{vn}\r\n")

  res = con.gets # READY\r\n or error string
  if res != "READY\r\n"
    con.close
    return res.chomp
  end

  @storage.each_vn_dump(vn){|data|
    con.write(clk_to_zero(data))
    sleep @stream_copy_wait_param
  }
  con.write("\0"*20) # end of steram

  res = con.gets # STORED\r\n or error string
  Roma::Messaging::ConPool.instance.return_connection(nid,con)
  res.chomp! if res
  res
rescue =>e
  STDERR.puts "#{e}\n#{$@}"
  nil
end

#send_cmd(nid, cmd) ⇒ Object



333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/roma/tools/recoverlost_lib.rb', line 333

def send_cmd(nid, cmd)
  con = Roma::Messaging::ConPool.instance.get_connection(nid)
  con.write(cmd)
  res = con.gets
  Roma::Messaging::ConPool.instance.return_connection(nid, con)
  if res
    res.chomp!
  end
  res
rescue => e
  STDERR.puts("#{__FILE__}:#{__LINE__}:Send command failed that node-id is #{nid},command is #{cmd}.")
  nil
end

#start_recover(hname) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/roma/tools/recoverlost_lib.rb', line 145

def start_recover(hname)
  @lost_vnodes.each_with_index{|vn, idx|
    nodes = @rd.v_idx[vn]
    if nodes == nil || nodes.length == 0
      nids = []
      nids[0] = @rd.nodes[rand(@rd.nodes.length)]
      puts "#{idx}/#{@lost_vnodes.length} #{vn} assign to #{nids.inspect}"
    else
      nids = nodes
      puts "#{idx}/#{@lost_vnodes.length} #{vn} was auto assigned at #{nids.inspect}"
    end

    nids.each{|nid|
      if push_a_vnode_stream(hname, vn, nid)!="STORED"
        STDERR.puts "push_a_vnode_stream aborted in #{vn}"
        exit
      end
    }

    if nodes == nil || nodes.length == 0
      cmd = "setroute #{vn} #{@rd.v_clk[vn]} #{nids[0]}\r\n"
      exit unless send_cmd(nids[0] ,cmd)
      broadcast_cmd(cmd, nids[0])
    end
  }
end

#start_recover_width_keys(hname, keys) ⇒ Object



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/roma/tools/recoverlost_lib.rb', line 259

def start_recover_width_keys(hname,keys)
  keys.each{|key|
    data = @storage.get_raw2(key)
    if data
      puts "hit => #{key}"
      d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits)
      vn = d & @rd.search_mask
      nodes = @rd.v_idx[vn]
      nodes.each{|nid|
        print "#{nid}=>"
        res = upload_data(hname, vn, nid, key, data)
        puts res
      }
    end
  }
end

#start_recover_width_keys2(hname, keys) ⇒ Object



208
209
210
211
212
213
214
# File 'lib/roma/tools/recoverlost_lib.rb', line 208

def start_recover_width_keys2(hname,keys)
  node_hash = make_node_hash(keys)
  node_hash.each{|nid,ks|
    puts nid
    upload_data2(hname, nid, ks)
  }
end

#suiteObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/roma/tools/recoverlost_lib.rb', line 49

def suite
  @rd = get_routing_data(@nodeid)
  unless @alldata
    @lost_vnodes = get_lost_vnodes(@rd,@ymdhms)
    puts "#{@lost_vnodes.length} vnodes where data was lost."

    exit if @lost_vnodes.length == 0
  else
    @lost_vnodes = @rd.v_idx.keys
  end

  each_hash(@strgpath){|hname,dir|
    puts "#{hname} #{dir}"
    @storage = open_storage(dir,@lost_vnodes)
    start_recover(hname)
    @storage.closedb
  }
end

#suite_with_keys(keys) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/roma/tools/recoverlost_lib.rb', line 68

def suite_with_keys(keys)
  @rd = get_routing_data(@nodeid)
  @lost_vnodes = @rd.v_idx.keys

  each_hash(@strgpath){|hname,dir|
    puts "#{hname} #{dir}"
    @storage = open_storage(dir,@lost_vnodes)
    start_recover_width_keys(hname, keys)
#        start_recover_width_keys2(hname, keys)
    @storage.closedb
  }
end

#upload_data(hname, vn, nid, k, data) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/roma/tools/recoverlost_lib.rb', line 276

def upload_data(hname, vn, nid, k, data)
  con = Roma::Messaging::ConPool.instance.get_connection(nid)

  cmd = "#{@pushv_cmd} #{hname} #{vn}\r\n"
  con.write(cmd)
# puts "new vn = #{vn}"
  res = con.gets # READY\r\n or error string
  if res != "READY\r\n"
    con.close
    return res.chomp
  end

  vn_old, last, clk, expt, val = data
# puts "old vn = #{vn_old}"
  if val
    wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
  else
    wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N")
  end

  con.write(wd)
  sleep @stream_copy_wait_param

  con.write("\0"*20) # end of steram

  res = con.gets # STORED\r\n or error string
  Roma::Messaging::ConPool.instance.return_connection(nid,con)
  res.chomp! if res
  res
rescue =>e
  STDERR.puts "#{e}\n#{$@}"
  nil
end

#upload_data2(hname, nid, keys) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/roma/tools/recoverlost_lib.rb', line 216

def upload_data2(hname, nid, keys)
  con = Roma::Messaging::ConPool.instance.get_connection(nid)

  cmd = "#{@pushv_cmd} #{hname} 0\r\n"
  con.write(cmd)
  res = con.gets # READY\r\n or error string
  if res != "READY\r\n"
    con.close
    return res.chomp
  end

  n = keys.length
  m = n / 100
  m = 1 if m < 1
  keys.each_with_index{|k,i|
    print "#{i}/#{n}\r" if i%m == 0
    data = @storage.get_raw2(k)
    next unless data
    d = Digest::SHA1.hexdigest(k).hex % (2**@rd.dgst_bits)
    vn = d & @rd.search_mask

    vn_old, last, clk, expt, val = data
    # puts "old vn = #{vn_old}"
    if val
      wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
    else
      wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N")
    end

    con.write(wd)
    sleep @stream_copy_wait_param
  }
  con.write("\0"*20) # end of steram

  res = con.gets # STORED\r\n or error string
  Roma::Messaging::ConPool.instance.return_connection(nid,con)
  res.chomp! if res
  res
rescue =>e
  STDERR.puts "#{e}\n#{$@}"
  nil
end