Class: Roma::Routing::ChurnbasedRoutingTable

Inherits:
RoutingTable show all
Includes:
RandomPartitioner
Defined in:
lib/roma/routing/cb_rttable.rb

Instance Attribute Summary collapse

Attributes inherited from RoutingTable

#div_bits, #fail_cnt, #fail_cnt_gap, #fail_cnt_threshold, #hbits, #mtree, #rd, #rn, #search_mask, #sub_nid

Instance Method Summary collapse

Methods included from RandomPartitioner

#exclude_nodes, #exclude_nodes_for_join, #select_node_for_release, #select_vn_for_balance, #select_vn_for_join, #select_vn_for_recover

Methods inherited from RoutingTable

#check_repetition_in_routing, #create_nodes_from_v_idx, #dump, #dump_binary, #dump_json, #dump_yaml, #get_vnode_id, #init_mtree, #nodes, #num_of_vn, #proc_failed, #proc_succeed, #search_nodes, #sub_nid_rd, #vnodes

Constructor Details

#initialize(rd, fname) ⇒ ChurnbasedRoutingTable

Returns a new instance of ChurnbasedRoutingTable.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/roma/routing/cb_rttable.rb', line 29

def initialize(rd,fname)
  super(rd)
  @rd.nodes.sort!
  @trans={}
  @fname=fname
  @leave_proc=nil
  @lost_proc=nil
  @recover_proc=nil
  @lost_action=:no_action
  @auto_recover=false
  @auto_recover_status="waiting"
  @auto_recover_time=1800
  @event = []
  @event_limit_line = 1000
  @logs = []
  @enabled_failover=false
  @lock = Mutex.new
  @version_of_nodes = Hash.new(0)
  @min_version = nil
  open_log
end

Instance Attribute Details

#auto_recoverObject

Returns the value of attribute auto_recover.



19
20
21
# File 'lib/roma/routing/cb_rttable.rb', line 19

def auto_recover
  @auto_recover
end

#auto_recover_statusObject

Returns the value of attribute auto_recover_status.



20
21
22
# File 'lib/roma/routing/cb_rttable.rb', line 20

def auto_recover_status
  @auto_recover_status
end

#auto_recover_timeObject

Returns the value of attribute auto_recover_time.



21
22
23
# File 'lib/roma/routing/cb_rttable.rb', line 21

def auto_recover_time
  @auto_recover_time
end

#enabled_failoverObject

Returns the value of attribute enabled_failover.



25
26
27
# File 'lib/roma/routing/cb_rttable.rb', line 25

def enabled_failover
  @enabled_failover
end

#eventObject

Returns the value of attribute event.



22
23
24
# File 'lib/roma/routing/cb_rttable.rb', line 22

def event
  @event
end

#event_limit_lineObject

Returns the value of attribute event_limit_line.



23
24
25
# File 'lib/roma/routing/cb_rttable.rb', line 23

def event_limit_line
  @event_limit_line
end

#fnameObject (readonly)

Returns the value of attribute fname.



11
12
13
# File 'lib/roma/routing/cb_rttable.rb', line 11

def fname
  @fname
end

#leave_procObject (readonly)

Returns the value of attribute leave_proc.



15
16
17
# File 'lib/roma/routing/cb_rttable.rb', line 15

def leave_proc
  @leave_proc
end

#log_fdObject (readonly)

Returns the value of attribute log_fd.



12
13
14
# File 'lib/roma/routing/cb_rttable.rb', line 12

def log_fd
  @log_fd
end

#log_nameObject (readonly)

Returns the value of attribute log_name.



13
14
15
# File 'lib/roma/routing/cb_rttable.rb', line 13

def log_name
  @log_name
end

#logsObject

Returns the value of attribute logs.



24
25
26
# File 'lib/roma/routing/cb_rttable.rb', line 24

def logs
  @logs
end

#lost_actionObject

Returns the value of attribute lost_action.



18
19
20
# File 'lib/roma/routing/cb_rttable.rb', line 18

def lost_action
  @lost_action
end

#lost_procObject (readonly)

Returns the value of attribute lost_proc.



16
17
18
# File 'lib/roma/routing/cb_rttable.rb', line 16

def lost_proc
  @lost_proc
end

#min_versionObject (readonly)

Returns the value of attribute min_version.



27
28
29
# File 'lib/roma/routing/cb_rttable.rb', line 27

def min_version
  @min_version
end

#recover_procObject (readonly)

Returns the value of attribute recover_proc.



17
18
19
# File 'lib/roma/routing/cb_rttable.rb', line 17

def recover_proc
  @recover_proc
end

#transObject (readonly)

transaction



14
15
16
# File 'lib/roma/routing/cb_rttable.rb', line 14

def trans
  @trans
end

#version_of_nodesObject (readonly)

Returns the value of attribute version_of_nodes.



26
27
28
# File 'lib/roma/routing/cb_rttable.rb', line 26

def version_of_nodes
  @version_of_nodes
end

Instance Method Details

#add_node(nid) ⇒ Object



213
214
215
216
217
218
219
220
# File 'lib/roma/routing/cb_rttable.rb', line 213

def add_node(nid)
  unless @rd.nodes.include?(nid)
    @rd.nodes << nid
    @rd.nodes.sort!
    write_log("join #{nid}")
    set_event(nid, 'join')
  end
end

#can_i_recover?Boolean

Returns:

  • (Boolean)


124
125
126
# File 'lib/roma/routing/cb_rttable.rb', line 124

def can_i_recover?
  @rd.nodes.length >= @rd.rn
end

#can_i_release?(ap_str, rep_host) ⇒ Boolean

Returns:

  • (Boolean)


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/roma/routing/cb_rttable.rb', line 128

def can_i_release?(ap_str, rep_host)
  buf = self.nodes
  buf.delete(ap_str)
  hosts = []

  unless rep_host
    buf.each{ |node|
      host = node.split(/[:_]/)[0]
      hosts << host unless hosts.include?(host)
    }
  else
    hosts = buf
  end

  hosts.length < @rd.rn
end

#close_logObject



120
121
122
# File 'lib/roma/routing/cb_rttable.rb', line 120

def close_log
  @log_fd.close
end

#commit(vn) ⇒ Object



334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/roma/routing/cb_rttable.rb', line 334

def commit(vn)
  return false unless @trans.key?(vn)
  @lock.synchronize {
    @rd.v_idx[vn]=@trans[vn][0]
    @trans.delete(vn)
    clk = @rd.v_clk[vn] + 1
    @rd.v_clk[vn] = clk
    @mtree.set(vn, @rd.v_idx[vn])
    write_log_setroute(vn, clk, @rd.v_idx[vn])
    return clk
  }
end

#delete_old_trans(sec = 3600) ⇒ Object

sec: elapsed-time



352
353
354
# File 'lib/roma/routing/cb_rttable.rb', line 352

def delete_old_trans(sec=3600)
  @trans.delete_if{|vn,val| val[1] < Time.now-sec }
end

#each_vnodeObject



303
304
305
# File 'lib/roma/routing/cb_rttable.rb', line 303

def each_vnode
  @rd.v_idx.each_pair{ |k, v| yield(k, v) }
end

#find_min_versionObject



70
71
72
73
74
# File 'lib/roma/routing/cb_rttable.rb', line 70

def find_min_version
  ret = 0xffffff
  @version_of_nodes.each_value{|ver| ret = ver if ret > ver}
  ret
end

#get_stat(ap) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/roma/routing/cb_rttable.rb', line 51

def get_stat(ap)
  ret = super(ap)
  ret['routing.lost_action'] = @lost_action.to_s
  ret['routing.auto_recover'] = @auto_recover.to_s
  ret['routing.auto_recover_status'] = @auto_recover_status.to_s
  ret['routing.auto_recover_time'] = @auto_recover_time
  ret['routing.event'] = @event
  ret['routing.event_limit_line'] = @event_limit_line
  ret['routing.version_of_nodes'] = @version_of_nodes.inspect
  ret['routing.min_version'] = @min_version
  ret['routing.enabled_failover'] = @enabled_failover
  ret
end

#has_node?(ap_str) ⇒ Boolean

Returns:

  • (Boolean)


382
383
384
385
386
387
# File 'lib/roma/routing/cb_rttable.rb', line 382

def has_node?(ap_str)
  self.each_vnode do |vn, nids|
    return true if nids.include?(ap_str)
  end
  false
end

#leave(nid) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/roma/routing/cb_rttable.rb', line 231

def leave(nid)
  unless @enabled_failover
    return
  end
  return unless @rd.nodes.include?(nid)

  @leave_proc.call(nid) if @leave_proc
  @rd.nodes.delete(nid)
  @version_of_nodes.delete(nid)
  @min_version = find_min_version

  @log.warn("#{nid} just failed.")
  write_log("leave #{nid}")
  set_event(nid, __method__)

  lost_vnodes=[]
  short_vnodes=[]
  @lock.synchronize {
    @rd.v_idx.each_pair{ |vn, nids|
      buf = nids.clone
      if buf.delete(nid)
        set_route_and_inc_clk_inside_sync(vn, buf)
        if buf.length == 0
          lost_vnodes << vn
          @log.error("Vnode data is lost.(Vnode=#{vn})")
        elsif buf.length < @rd.rn
          short_vnodes << vn
        end
      end
    }
  }
  if lost_vnodes.length > 0
    @lost_proc.call if @lost_proc
    if @lost_action == :auto_assign
      lost_vnodes.each{ |vn|
        set_route_and_inc_clk_inside_sync( vn, next_alive_vnode(vn) )
      }
    end
  elsif short_vnodes.length > 0
    @log.error("Short vnodes exist.")
    @recover_proc.call('start_auto_recover_process') if @recover_proc
  end
  @fail_cnt.delete(nid)
end

#open_logObject



88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/roma/routing/cb_rttable.rb', line 88

def open_log
  log_list=@rd.get_file_list(@fname)
  if log_list.length==0
    @log_name="#{@fname}.1"
  else
    if File::stat("#{@fname}.#{log_list.last[0]}").size == 0
      @log_name="#{@fname}.#{log_list.last[0]}"
    else
      @log_name="#{@fname}.#{log_list.last[0]+1}"
    end
  end
  @log_fd=File.open(@log_name,"a")
end

#rollback(vn) ⇒ Object



347
348
349
# File 'lib/roma/routing/cb_rttable.rb', line 347

def rollback(vn)
  @trans.delete(vn)
end

#sample_vnode(without_nodes) ⇒ Object

vnode sampling without without_nodes



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/roma/routing/cb_rttable.rb', line 171

def sample_vnode(without_nodes)
  short_idx = {}
  idx = {}
  @rd.v_idx.each_pair{|vn, nids|
    unless list_include?(nids, without_nodes)
      idx[vn] = nids
      short_idx[vn] = nids if nids.length < @rd.rn
    end
  }
  idx = short_idx if short_idx.length > 0

  ks = idx.keys
  return nil if ks.length == 0
  vn = ks[rand(ks.length)]
  nids = idx[vn]
  [vn, nids]
end

#search_lost_vnodes(t) ⇒ Object

Retuens the list of losted-data vnode newer than argument time.



146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/roma/routing/cb_rttable.rb', line 146

def search_lost_vnodes(t)
  ret = []
  @rd.each_log_all(@fname){|log_t,line|
    next if t > log_t
    s = line.split(/ /)
    if s[0] == 'setroute' && s.length == 3
      # vnode has a no pnode. therefor this vnode was lost.
      ret << s[1].to_i
    end
  }
  ret
end

#search_nodes_for_write(vn) ⇒ Object



311
312
313
314
315
316
# File 'lib/roma/routing/cb_rttable.rb', line 311

def search_nodes_for_write(vn)
  return @trans[vn][0].clone if @trans.key?(vn)
  @rd.v_idx[vn].clone
rescue
  nil
end

#search_nodes_with_clk(vn) ⇒ Object



318
319
320
321
322
323
324
# File 'lib/roma/routing/cb_rttable.rb', line 318

def search_nodes_with_clk(vn)
  @lock.synchronize {
    return [@rd.v_clk[vn], @rd.v_idx[vn].clone]
  }
rescue
  nil
end

#select_a_short_vnodes(exclued_nodes) ⇒ Object

select a vnodes where short of redundancy.



160
161
162
163
164
165
166
167
168
# File 'lib/roma/routing/cb_rttable.rb', line 160

def select_a_short_vnodes(exclued_nodes)
  ret = []
  @rd.v_idx.each_pair{|vn, nids|
    if nids.length < @rd.rn && list_include?(nids,exclued_nodes) == false
      ret << [vn,nids]
    end
  }
  ret
end

#set_leave_proc(&block) ⇒ Object



76
77
78
# File 'lib/roma/routing/cb_rttable.rb', line 76

def set_leave_proc(&block)
  @leave_proc=block
end

#set_lost_proc(&block) ⇒ Object



80
81
82
# File 'lib/roma/routing/cb_rttable.rb', line 80

def set_lost_proc(&block)
  @lost_proc=block
end

#set_recover_proc(&block) ⇒ Object



84
85
86
# File 'lib/roma/routing/cb_rttable.rb', line 84

def set_recover_proc(&block)
  @recover_proc=block
end

#set_route(vn, clk, nids) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/roma/routing/cb_rttable.rb', line 197

def set_route(vn, clk, nids)
  return "#{vn} is not found." unless @rd.v_idx.key?(vn)
  @lock.synchronize {
    return "It's old table." if @rd.v_clk[vn] > clk
    nids.each{ |nid|
      add_node(nid) unless @rd.nodes.include?(nid)
    }
    @rd.v_idx[vn] = nids.clone
    clk += 1
    @rd.v_clk[vn] = clk
    @mtree.set(vn, nids)
    write_log_setroute(vn, clk, nids)
    return clk
  }
end

#set_version(nid, ver) ⇒ Object



65
66
67
68
# File 'lib/roma/routing/cb_rttable.rb', line 65

def set_version(nid,ver)
  @version_of_nodes[nid] = ver
  @min_version = find_min_version
end

#transaction(vn, nids) ⇒ Object

vn: vnode-id nids: node-id list



328
329
330
331
332
# File 'lib/roma/routing/cb_rttable.rb', line 328

def transaction(vn, nids)
  return false if @trans.key?(vn)
  @trans[vn]=[nids.clone, Time.now]
  true
end

#v_idxObject



307
308
309
# File 'lib/roma/routing/cb_rttable.rb', line 307

def v_idx
  @rd.v_idx.clone
end

#vnode_balance(ap) ⇒ Object

Returns the status of vnode balance. ap: my address_port string(ex.“roma0_11211”)



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/roma/routing/cb_rttable.rb', line 358

def vnode_balance(ap)
  # amount of primary at node = amount of vnode / amount of node
  n = (2**div_bits) / nodes.length

  pcount = scount = 0
  @rd.v_idx.each_pair{ |vn, nids|
    next if nids == nil or nids.length == 0
    if nids[0] == ap
      pcount += 1
    elsif nids.include?(ap)
      scount += 1
    end
  }

  @log.debug("#{__method__}:n=#{n} pcount=#{pcount} scount=#{scount}")

  if pcount > n
    return :over
  elsif pcount < n
    return :less
  end
  :even
end

#write_log(line) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/roma/routing/cb_rttable.rb', line 108

def write_log(line)
  # log rotation
  if File::stat(@log_name).size > 1000 * 1024
    close_log
    open_log
  end
  t = Time.now
  tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}"
  @log_fd.write("#{tstr} #{line}\n")
  @log_fd.flush
end

#write_log_setroute(vn, clk, nids) ⇒ Object



102
103
104
105
106
# File 'lib/roma/routing/cb_rttable.rb', line 102

def write_log_setroute(vn, clk, nids)
  log="setroute #{vn} #{clk}"
  nids.each{ |nid| log << " #{nid}" }
  write_log(log)
end