Class: Roma::Routing::ChurnbasedRoutingTable
- Inherits:
-
RoutingTable
- Object
- RoutingTable
- Roma::Routing::ChurnbasedRoutingTable
- Includes:
- RandomPartitioner
- Defined in:
- lib/roma/routing/cb_rttable.rb
Instance Attribute Summary collapse
-
#auto_recover ⇒ Object
Returns the value of attribute auto_recover.
-
#auto_recover_status ⇒ Object
Returns the value of attribute auto_recover_status.
-
#auto_recover_time ⇒ Object
Returns the value of attribute auto_recover_time.
-
#enabled_failover ⇒ Object
Returns the value of attribute enabled_failover.
-
#event ⇒ Object
Returns the value of attribute event.
-
#event_limit_line ⇒ Object
Returns the value of attribute event_limit_line.
-
#fname ⇒ Object
readonly
Returns the value of attribute fname.
-
#leave_proc ⇒ Object
readonly
Returns the value of attribute leave_proc.
-
#log_fd ⇒ Object
readonly
Returns the value of attribute log_fd.
-
#log_name ⇒ Object
readonly
Returns the value of attribute log_name.
-
#logs ⇒ Object
Returns the value of attribute logs.
-
#lost_action ⇒ Object
Returns the value of attribute lost_action.
-
#lost_proc ⇒ Object
readonly
Returns the value of attribute lost_proc.
-
#min_version ⇒ Object
readonly
Returns the value of attribute min_version.
-
#recover_proc ⇒ Object
readonly
Returns the value of attribute recover_proc.
-
#trans ⇒ Object
readonly
transaction.
-
#version_of_nodes ⇒ Object
readonly
Returns the value of attribute version_of_nodes.
Attributes inherited from RoutingTable
#div_bits, #fail_cnt, #fail_cnt_gap, #fail_cnt_threshold, #hbits, #mtree, #rd, #rn, #search_mask, #sub_nid
Instance Method Summary collapse
- #add_node(nid) ⇒ Object
- #can_i_recover? ⇒ Boolean
- #can_i_release?(ap_str, rep_host) ⇒ Boolean
- #close_log ⇒ Object
- #commit(vn) ⇒ Object
-
#delete_old_trans(sec = 3600) ⇒ Object
sec
: elapsed-time. - #each_vnode ⇒ Object
- #find_min_version ⇒ Object
- #get_stat(ap) ⇒ Object
- #has_node?(ap_str) ⇒ Boolean
-
#initialize(rd, fname) ⇒ ChurnbasedRoutingTable
constructor
A new instance of ChurnbasedRoutingTable.
- #leave(nid) ⇒ Object
- #open_log ⇒ Object
- #rollback(vn) ⇒ Object
-
#sample_vnode(without_nodes) ⇒ Object
vnode sampling without
without_nodes
. -
#search_lost_vnodes(t) ⇒ Object
Retuens the list of losted-data vnode newer than argument time.
- #search_nodes_for_write(vn) ⇒ Object
- #search_nodes_with_clk(vn) ⇒ Object
-
#select_a_short_vnodes(exclued_nodes) ⇒ Object
select a vnodes where short of redundancy.
- #set_leave_proc(&block) ⇒ Object
- #set_lost_proc(&block) ⇒ Object
- #set_recover_proc(&block) ⇒ Object
- #set_route(vn, clk, nids) ⇒ Object
- #set_version(nid, ver) ⇒ Object
-
#transaction(vn, nids) ⇒ Object
vn
: vnode-idnids
: node-id list. - #v_idx ⇒ Object
-
#vnode_balance(ap) ⇒ Object
Returns the status of vnode balance.
- #write_log(line) ⇒ Object
- #write_log_setroute(vn, clk, nids) ⇒ Object
Methods included from RandomPartitioner
#exclude_nodes, #exclude_nodes_for_join, #select_node_for_release, #select_vn_for_balance, #select_vn_for_join, #select_vn_for_recover
Methods inherited from RoutingTable
#check_repetition_in_routing, #create_nodes_from_v_idx, #dump, #dump_binary, #dump_json, #dump_yaml, #get_vnode_id, #init_mtree, #nodes, #num_of_vn, #proc_failed, #proc_succeed, #search_nodes, #sub_nid_rd, #vnodes
Constructor Details
#initialize(rd, fname) ⇒ ChurnbasedRoutingTable
Returns a new instance of ChurnbasedRoutingTable.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/roma/routing/cb_rttable.rb', line 29 def initialize(rd,fname) super(rd) @rd.nodes.sort! @trans={} @fname=fname @leave_proc=nil @lost_proc=nil @recover_proc=nil @lost_action=:no_action @auto_recover=false @auto_recover_status="waiting" @auto_recover_time=1800 @event = [] @event_limit_line = 1000 @logs = [] @enabled_failover=false @lock = Mutex.new @version_of_nodes = Hash.new(0) @min_version = nil open_log end |
Instance Attribute Details
#auto_recover ⇒ Object
Returns the value of attribute auto_recover.
19 20 21 |
# File 'lib/roma/routing/cb_rttable.rb', line 19 def auto_recover @auto_recover end |
#auto_recover_status ⇒ Object
Returns the value of attribute auto_recover_status.
20 21 22 |
# File 'lib/roma/routing/cb_rttable.rb', line 20 def auto_recover_status @auto_recover_status end |
#auto_recover_time ⇒ Object
Returns the value of attribute auto_recover_time.
21 22 23 |
# File 'lib/roma/routing/cb_rttable.rb', line 21 def auto_recover_time @auto_recover_time end |
#enabled_failover ⇒ Object
Returns the value of attribute enabled_failover.
25 26 27 |
# File 'lib/roma/routing/cb_rttable.rb', line 25 def enabled_failover @enabled_failover end |
#event ⇒ Object
Returns the value of attribute event.
22 23 24 |
# File 'lib/roma/routing/cb_rttable.rb', line 22 def event @event end |
#event_limit_line ⇒ Object
Returns the value of attribute event_limit_line.
23 24 25 |
# File 'lib/roma/routing/cb_rttable.rb', line 23 def event_limit_line @event_limit_line end |
#fname ⇒ Object (readonly)
Returns the value of attribute fname.
11 12 13 |
# File 'lib/roma/routing/cb_rttable.rb', line 11 def fname @fname end |
#leave_proc ⇒ Object (readonly)
Returns the value of attribute leave_proc.
15 16 17 |
# File 'lib/roma/routing/cb_rttable.rb', line 15 def leave_proc @leave_proc end |
#log_fd ⇒ Object (readonly)
Returns the value of attribute log_fd.
12 13 14 |
# File 'lib/roma/routing/cb_rttable.rb', line 12 def log_fd @log_fd end |
#log_name ⇒ Object (readonly)
Returns the value of attribute log_name.
13 14 15 |
# File 'lib/roma/routing/cb_rttable.rb', line 13 def log_name @log_name end |
#logs ⇒ Object
Returns the value of attribute logs.
24 25 26 |
# File 'lib/roma/routing/cb_rttable.rb', line 24 def logs @logs end |
#lost_action ⇒ Object
Returns the value of attribute lost_action.
18 19 20 |
# File 'lib/roma/routing/cb_rttable.rb', line 18 def lost_action @lost_action end |
#lost_proc ⇒ Object (readonly)
Returns the value of attribute lost_proc.
16 17 18 |
# File 'lib/roma/routing/cb_rttable.rb', line 16 def lost_proc @lost_proc end |
#min_version ⇒ Object (readonly)
Returns the value of attribute min_version.
27 28 29 |
# File 'lib/roma/routing/cb_rttable.rb', line 27 def min_version @min_version end |
#recover_proc ⇒ Object (readonly)
Returns the value of attribute recover_proc.
17 18 19 |
# File 'lib/roma/routing/cb_rttable.rb', line 17 def recover_proc @recover_proc end |
#trans ⇒ Object (readonly)
transaction
14 15 16 |
# File 'lib/roma/routing/cb_rttable.rb', line 14 def trans @trans end |
#version_of_nodes ⇒ Object (readonly)
Returns the value of attribute version_of_nodes.
26 27 28 |
# File 'lib/roma/routing/cb_rttable.rb', line 26 def version_of_nodes @version_of_nodes end |
Instance Method Details
#add_node(nid) ⇒ Object
213 214 215 216 217 218 219 220 |
# File 'lib/roma/routing/cb_rttable.rb', line 213 def add_node(nid) unless @rd.nodes.include?(nid) @rd.nodes << nid @rd.nodes.sort! write_log("join #{nid}") set_event(nid, 'join') end end |
#can_i_recover? ⇒ Boolean
124 125 126 |
# File 'lib/roma/routing/cb_rttable.rb', line 124 def can_i_recover? @rd.nodes.length >= @rd.rn end |
#can_i_release?(ap_str, rep_host) ⇒ Boolean
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/roma/routing/cb_rttable.rb', line 128 def can_i_release?(ap_str, rep_host) buf = self.nodes buf.delete(ap_str) hosts = [] unless rep_host buf.each{ |node| host = node.split(/[:_]/)[0] hosts << host unless hosts.include?(host) } else hosts = buf end hosts.length < @rd.rn end |
#close_log ⇒ Object
120 121 122 |
# File 'lib/roma/routing/cb_rttable.rb', line 120 def close_log @log_fd.close end |
#commit(vn) ⇒ Object
334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/roma/routing/cb_rttable.rb', line 334 def commit(vn) return false unless @trans.key?(vn) @lock.synchronize { @rd.v_idx[vn]=@trans[vn][0] @trans.delete(vn) clk = @rd.v_clk[vn] + 1 @rd.v_clk[vn] = clk @mtree.set(vn, @rd.v_idx[vn]) write_log_setroute(vn, clk, @rd.v_idx[vn]) return clk } end |
#delete_old_trans(sec = 3600) ⇒ Object
sec
: elapsed-time
352 353 354 |
# File 'lib/roma/routing/cb_rttable.rb', line 352 def delete_old_trans(sec=3600) @trans.delete_if{|vn,val| val[1] < Time.now-sec } end |
#each_vnode ⇒ Object
303 304 305 |
# File 'lib/roma/routing/cb_rttable.rb', line 303 def each_vnode @rd.v_idx.each_pair{ |k, v| yield(k, v) } end |
#find_min_version ⇒ Object
70 71 72 73 74 |
# File 'lib/roma/routing/cb_rttable.rb', line 70 def find_min_version ret = 0xffffff @version_of_nodes.each_value{|ver| ret = ver if ret > ver} ret end |
#get_stat(ap) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/roma/routing/cb_rttable.rb', line 51 def get_stat(ap) ret = super(ap) ret['routing.lost_action'] = @lost_action.to_s ret['routing.auto_recover'] = @auto_recover.to_s ret['routing.auto_recover_status'] = @auto_recover_status.to_s ret['routing.auto_recover_time'] = @auto_recover_time ret['routing.event'] = @event ret['routing.event_limit_line'] = @event_limit_line ret['routing.version_of_nodes'] = @version_of_nodes.inspect ret['routing.min_version'] = @min_version ret['routing.enabled_failover'] = @enabled_failover ret end |
#has_node?(ap_str) ⇒ Boolean
382 383 384 385 386 387 |
# File 'lib/roma/routing/cb_rttable.rb', line 382 def has_node?(ap_str) self.each_vnode do |vn, nids| return true if nids.include?(ap_str) end false end |
#leave(nid) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/roma/routing/cb_rttable.rb', line 231 def leave(nid) unless @enabled_failover return end return unless @rd.nodes.include?(nid) @leave_proc.call(nid) if @leave_proc @rd.nodes.delete(nid) @version_of_nodes.delete(nid) @min_version = find_min_version @log.warn("#{nid} just failed.") write_log("leave #{nid}") set_event(nid, __method__) lost_vnodes=[] short_vnodes=[] @lock.synchronize { @rd.v_idx.each_pair{ |vn, nids| buf = nids.clone if buf.delete(nid) set_route_and_inc_clk_inside_sync(vn, buf) if buf.length == 0 lost_vnodes << vn @log.error("Vnode data is lost.(Vnode=#{vn})") elsif buf.length < @rd.rn short_vnodes << vn end end } } if lost_vnodes.length > 0 @lost_proc.call if @lost_proc if @lost_action == :auto_assign lost_vnodes.each{ |vn| set_route_and_inc_clk_inside_sync( vn, next_alive_vnode(vn) ) } end elsif short_vnodes.length > 0 @log.error("Short vnodes exist.") @recover_proc.call('start_auto_recover_process') if @recover_proc end @fail_cnt.delete(nid) end |
#open_log ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/roma/routing/cb_rttable.rb', line 88 def open_log log_list=@rd.get_file_list(@fname) if log_list.length==0 @log_name="#{@fname}.1" else if File::stat("#{@fname}.#{log_list.last[0]}").size == 0 @log_name="#{@fname}.#{log_list.last[0]}" else @log_name="#{@fname}.#{log_list.last[0]+1}" end end @log_fd=File.open(@log_name,"a") end |
#rollback(vn) ⇒ Object
347 348 349 |
# File 'lib/roma/routing/cb_rttable.rb', line 347 def rollback(vn) @trans.delete(vn) end |
#sample_vnode(without_nodes) ⇒ Object
vnode sampling without without_nodes
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/roma/routing/cb_rttable.rb', line 171 def sample_vnode(without_nodes) short_idx = {} idx = {} @rd.v_idx.each_pair{|vn, nids| unless list_include?(nids, without_nodes) idx[vn] = nids short_idx[vn] = nids if nids.length < @rd.rn end } idx = short_idx if short_idx.length > 0 ks = idx.keys return nil if ks.length == 0 vn = ks[rand(ks.length)] nids = idx[vn] [vn, nids] end |
#search_lost_vnodes(t) ⇒ Object
Retuens the list of losted-data vnode newer than argument time.
146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/roma/routing/cb_rttable.rb', line 146 def search_lost_vnodes(t) ret = [] @rd.each_log_all(@fname){|log_t,line| next if t > log_t s = line.split(/ /) if s[0] == 'setroute' && s.length == 3 # vnode has a no pnode. therefor this vnode was lost. ret << s[1].to_i end } ret end |
#search_nodes_for_write(vn) ⇒ Object
311 312 313 314 315 316 |
# File 'lib/roma/routing/cb_rttable.rb', line 311 def search_nodes_for_write(vn) return @trans[vn][0].clone if @trans.key?(vn) @rd.v_idx[vn].clone rescue nil end |
#search_nodes_with_clk(vn) ⇒ Object
318 319 320 321 322 323 324 |
# File 'lib/roma/routing/cb_rttable.rb', line 318 def search_nodes_with_clk(vn) @lock.synchronize { return [@rd.v_clk[vn], @rd.v_idx[vn].clone] } rescue nil end |
#select_a_short_vnodes(exclued_nodes) ⇒ Object
select a vnodes where short of redundancy.
160 161 162 163 164 165 166 167 168 |
# File 'lib/roma/routing/cb_rttable.rb', line 160 def select_a_short_vnodes(exclued_nodes) ret = [] @rd.v_idx.each_pair{|vn, nids| if nids.length < @rd.rn && list_include?(nids,exclued_nodes) == false ret << [vn,nids] end } ret end |
#set_leave_proc(&block) ⇒ Object
76 77 78 |
# File 'lib/roma/routing/cb_rttable.rb', line 76 def set_leave_proc(&block) @leave_proc=block end |
#set_lost_proc(&block) ⇒ Object
80 81 82 |
# File 'lib/roma/routing/cb_rttable.rb', line 80 def set_lost_proc(&block) @lost_proc=block end |
#set_recover_proc(&block) ⇒ Object
84 85 86 |
# File 'lib/roma/routing/cb_rttable.rb', line 84 def set_recover_proc(&block) @recover_proc=block end |
#set_route(vn, clk, nids) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/roma/routing/cb_rttable.rb', line 197 def set_route(vn, clk, nids) return "#{vn} is not found." unless @rd.v_idx.key?(vn) @lock.synchronize { return "It's old table." if @rd.v_clk[vn] > clk nids.each{ |nid| add_node(nid) unless @rd.nodes.include?(nid) } @rd.v_idx[vn] = nids.clone clk += 1 @rd.v_clk[vn] = clk @mtree.set(vn, nids) write_log_setroute(vn, clk, nids) return clk } end |
#set_version(nid, ver) ⇒ Object
65 66 67 68 |
# File 'lib/roma/routing/cb_rttable.rb', line 65 def set_version(nid,ver) @version_of_nodes[nid] = ver @min_version = find_min_version end |
#transaction(vn, nids) ⇒ Object
vn
: vnode-id nids
: node-id list
328 329 330 331 332 |
# File 'lib/roma/routing/cb_rttable.rb', line 328 def transaction(vn, nids) return false if @trans.key?(vn) @trans[vn]=[nids.clone, Time.now] true end |
#v_idx ⇒ Object
307 308 309 |
# File 'lib/roma/routing/cb_rttable.rb', line 307 def v_idx @rd.v_idx.clone end |
#vnode_balance(ap) ⇒ Object
Returns the status of vnode balance. ap
: my address_port string(ex.“roma0_11211”)
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/roma/routing/cb_rttable.rb', line 358 def vnode_balance(ap) # amount of primary at node = amount of vnode / amount of node n = (2**div_bits) / nodes.length pcount = scount = 0 @rd.v_idx.each_pair{ |vn, nids| next if nids == nil or nids.length == 0 if nids[0] == ap pcount += 1 elsif nids.include?(ap) scount += 1 end } @log.debug("#{__method__}:n=#{n} pcount=#{pcount} scount=#{scount}") if pcount > n return :over elsif pcount < n return :less end :even end |
#write_log(line) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/roma/routing/cb_rttable.rb', line 108 def write_log(line) # log rotation if File::stat(@log_name).size > 1000 * 1024 close_log open_log end t = Time.now tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}" @log_fd.write("#{tstr} #{line}\n") @log_fd.flush end |
#write_log_setroute(vn, clk, nids) ⇒ Object
102 103 104 105 106 |
# File 'lib/roma/routing/cb_rttable.rb', line 102 def write_log_setroute(vn, clk, nids) log="setroute #{vn} #{clk}" nids.each{ |nid| log << " #{nid}" } write_log(log) end |