Class: Roma::Storage::BasicStorage

Inherits:
Object
  • Object
show all
Defined in:
lib/roma/storage/basic_storage.rb

Constant Summary collapse

PACK_HEADER_TEMPLATE =
0.. 3

vn

4.. 7

physical clock (unix time)

8..11

logical clock

12..15

exptime(unix time)

16..

value data

'NNNN'
PACK_TEMPLATE =
PACK_HEADER_TEMPLATE+'a*'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBasicStorage

Returns a new instance of BasicStorage.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/roma/storage/basic_storage.rb', line 31

def initialize
  # database handler
  @hdb = []
  # database cache handler
  @hdbc = []
  # status of a database
  @dbs = []
  @log_fd = nil
  # file number list of a each_vn_dump while
  @each_vn_dump_vnodes = []

  @hdiv = Hash.new(0)

  @ext_name = 'db'

  @st_class = nil
  @divnum = 10

  @each_vn_dump_sleep = 0.001
  @each_vn_dump_sleep_count = 100
  @each_clean_up_sleep = 0.01
  @cleanup_regexp = nil
  @logic_clock_expire = 300

  @each_cache_lock = Mutex::new
  @each_clean_up_lock = Mutex::new
  @stat_lock = Mutex::new
end

Instance Attribute Details

#cleanup_regexpObject

Returns the value of attribute cleanup_regexp.



26
27
28
# File 'lib/roma/storage/basic_storage.rb', line 26

def cleanup_regexp
  @cleanup_regexp
end

#dbsObject (readonly)

Returns the value of attribute dbs.



15
16
17
# File 'lib/roma/storage/basic_storage.rb', line 15

def dbs
  @dbs
end

#divnumObject

Returns the value of attribute divnum.



22
23
24
# File 'lib/roma/storage/basic_storage.rb', line 22

def divnum
  @divnum
end

#do_each_vn_dumpObject

Returns the value of attribute do_each_vn_dump.



29
30
31
# File 'lib/roma/storage/basic_storage.rb', line 29

def do_each_vn_dump
  @do_each_vn_dump
end

#each_clean_up_sleepObject

Returns the value of attribute each_clean_up_sleep.



25
26
27
# File 'lib/roma/storage/basic_storage.rb', line 25

def each_clean_up_sleep
  @each_clean_up_sleep
end

#each_vn_dump_sleepObject

Returns the value of attribute each_vn_dump_sleep.



23
24
25
# File 'lib/roma/storage/basic_storage.rb', line 23

def each_vn_dump_sleep
  @each_vn_dump_sleep
end

#each_vn_dump_sleep_countObject

Returns the value of attribute each_vn_dump_sleep_count.



24
25
26
# File 'lib/roma/storage/basic_storage.rb', line 24

def each_vn_dump_sleep_count
  @each_vn_dump_sleep_count
end

#error_messageObject (readonly)

Returns the value of attribute error_message.



14
15
16
# File 'lib/roma/storage/basic_storage.rb', line 14

def error_message
  @error_message
end

#ext_nameObject (readonly)

Returns the value of attribute ext_name.



13
14
15
# File 'lib/roma/storage/basic_storage.rb', line 13

def ext_name
  @ext_name
end

#hdbObject (readonly)

Returns the value of attribute hdb.



11
12
13
# File 'lib/roma/storage/basic_storage.rb', line 11

def hdb
  @hdb
end

#hdivObject (readonly)

Returns the value of attribute hdiv.



12
13
14
# File 'lib/roma/storage/basic_storage.rb', line 12

def hdiv
  @hdiv
end

#logic_clock_expireObject

Returns the value of attribute logic_clock_expire.



27
28
29
# File 'lib/roma/storage/basic_storage.rb', line 27

def logic_clock_expire
  @logic_clock_expire
end

#option=(value) ⇒ Object (writeonly)

Sets the attribute option

Parameters:

  • value

    the value to set the attribute option to.



19
20
21
# File 'lib/roma/storage/basic_storage.rb', line 19

def option=(value)
  @option = value
end

#st_classObject

Returns the value of attribute st_class.



21
22
23
# File 'lib/roma/storage/basic_storage.rb', line 21

def st_class
  @st_class
end

#storage_path=(value) ⇒ Object (writeonly)

Sets the attribute storage_path

Parameters:

  • value

    the value to set the attribute storage_path to.



18
19
20
# File 'lib/roma/storage/basic_storage.rb', line 18

def storage_path=(value)
  @storage_path = value
end

#vn_list=(value) ⇒ Object (writeonly)

Sets the attribute vn_list

Parameters:

  • value

    the value to set the attribute vn_list to.



17
18
19
# File 'lib/roma/storage/basic_storage.rb', line 17

def vn_list=(value)
  @vn_list = value
end

Instance Method Details

#add(vn, k, d, expt, v) ⇒ Object



238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/roma/storage/basic_storage.rb', line 238

def add(vn, k, d, expt, v)
  buf = db_get(vn, k)
  clk = 0
  if buf
    vn, t, clk, expt2, v2 = unpack_data(buf)
    return nil if Time.now.to_i <= expt2
    clk = (clk + 1) & 0xffffffff
  end
  
  # not exist
  ret = [vn, Time.now.to_i, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end

#append(vn, k, d, expt, v) ⇒ Object



267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/roma/storage/basic_storage.rb', line 267

def append(vn, k, d, expt, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  ret = [vn, Time.now.to_i, clk, expt, v2 + v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end

#cache_file_name(dn) ⇒ Object



674
675
676
# File 'lib/roma/storage/basic_storage.rb', line 674

def cache_file_name(dn)
  "#{@storage_path}/#{dn}.cache.#{@ext_name}"
end

#cas(vn, k, d, clk, expt, v) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/roma/storage/basic_storage.rb', line 195

def cas(vn, k, d, clk, expt, v)
  buf = db_get(vn ,k)
  return :not_found unless buf
  t = Time.now.to_i
  data = unpack_data(buf)
  return :not_found if t > data[3]
  return :exists if clk != data[2]
  clk = (data[2] + 1) & 0xffffffff
  ret = [vn, t, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end

#close_logObject



770
771
772
773
# File 'lib/roma/storage/basic_storage.rb', line 770

def close_log
  @log_fd.close if @log_fd
  @log_fd = nil
end

#closedbObject



130
131
132
133
134
135
136
137
# File 'lib/roma/storage/basic_storage.rb', line 130

def closedb
  stop_clean_up
  buf = @hdb; @hdb = []
  buf.each{ |h| close_db(h) if h }
  buf = @hdbc; @hdbc = []
  buf.each{ |h| close_db(h) if h }
  close_log
end

#db_get(vn, k) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/roma/storage/basic_storage.rb', line 162

def db_get(vn, k)
  n = @hdiv[vn]
  d = @hdb[n].get(k)
  return d if @dbs[n] == :normal

  c = @hdbc[n].get(k)
  return d unless c # in case of out of :normal status

  if @dbs[n] == :cachecleaning && d
    # in case of existing value is both @hdb and @hdbc
    vn, lat, clk, expt = unpack_header(d)
    cvn, clat, cclk, cexpt = unpack_header(c)
    return d if cmp_clk(clk, cclk) > 0 # if @hdb newer than @hdbc
  end
  c
end

#db_put(vn, k, v) ⇒ Object



179
180
181
182
183
184
185
186
187
# File 'lib/roma/storage/basic_storage.rb', line 179

def db_put(vn, k, v)
  n = @hdiv[vn]
  if @dbs[n] == :safecopy_flushing || @dbs[n] == :safecopy_flushed
    ret = @hdbc[n].put(k, v)
  else
    ret = @hdb[n].put(k, v)
  end
  ret
end

#decr(vn, k, d, v) ⇒ Object



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/roma/storage/basic_storage.rb', line 387

def decr(vn, k, d, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  v = (v2.to_i - v)
  v = 0 if v < 0
  v = v & 0xffffffffffffffff

  ret = [vn, Time.now.to_i, clk, expt2, v.to_s]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end

#delete(vn, k, d) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/roma/storage/basic_storage.rb', line 342

def delete(vn, k, d)
  buf = db_get(vn, k)
  v = ret = nil
  clk = 0
  if buf
    vn, t, clk, expt, v2 = unpack_data(buf)
    return :deletemark if expt == 0
    clk = (clk + 1) & 0xffffffff
    v = v2 if v2 && v2.length != 0 && Time.now.to_i <= expt  
  end
 
  # [ 0.. 3] vn
  # [ 4.. 7] physical clock(unix time)
  # [ 8..11] logical clock
  # [12..15] exptime(unix time) => 0
  ret = [vn, Time.now.to_i, clk, 0, v]
  if db_put(vn, k, pack_header(*ret[0..-2]))
    return ret
  else
    return nil
  end
end

#dump(vn) ⇒ Object

Returns the vnode dump.



543
544
545
546
547
# File 'lib/roma/storage/basic_storage.rb', line 543

def dump(vn)
  buf = get_vnode_hash(vn)
  return nil if buf.length == 0
  Marshal.dump(buf)
end

#each_cache_by_keys(dn, keys) ⇒ Object

Calls the geven block, passes the cache(@hdbc) element.

dn

number of database

keys

key list



619
620
621
622
623
624
625
# File 'lib/roma/storage/basic_storage.rb', line 619

def each_cache_by_keys(dn, keys)
  keys.each do |k|
    v = @hdbc[dn].get(k)
    vn, last, clk, expt, val = unpack_data(v)
    yield [vn, last, clk, expt, k, val]
  end
end

#each_cache_dump_pack(dn, keys) ⇒ Object

Calls the geven block, passes the cache(@hdbc) element as the spushv command data format.

dn

number of database

keys

key list



631
632
633
634
635
636
637
# File 'lib/roma/storage/basic_storage.rb', line 631

def each_cache_dump_pack(dn, keys)
  keys.each do |k|
    v = @hdbc[dn].get(k)
    vn, last, clk, expt, val = unpack_data(v)
    yield vn_dump_pack(vn, last, clk, expt, k, val)
  end
end

#each_clean_up(t, vnhash) ⇒ Object



424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
# File 'lib/roma/storage/basic_storage.rb', line 424

def each_clean_up(t, vnhash)
  @do_clean_up = true

  f = nil;
  if @cleanup_regexp && File.exist?(@storage_path)
    f = open(@storage_path + '/klist.txt','w')
  end

  return unless @each_clean_up_lock.try_lock
  nt = Time.now.to_i
  @divnum.times do |i|
    next if @dbs[i] != :normal
    hdb = @hdb[i]
    hdb.each do |k, v|
      return unless @do_clean_up # 1st check
      vn, last, clk, expt = unpack_header(v)
      vn_stat = vnhash[vn]
      if f && @cleanup_regexp && k =~ /#{@cleanup_regexp}/
        # write klist
        f.puts("#{k},#{last},#{clk}") if hdb.get(k) == v
      end
      if vn_stat == :primary && ( (expt != 0 && nt > expt) || (expt == 0 && t > last) )
        if yield k, vn
          hdb.out(k) if hdb.get(k) == v
        end
      elsif vn_stat == nil && t > last
        if yield k, vn
          hdb.out(k) if hdb.get(k) == v
        end
      end
      return unless @do_clean_up # 2nd ckeck 
      sleep @each_clean_up_sleep
    end
  end
ensure
  @each_clean_up_lock.unlock if @each_clean_up_lock.locked?
  if f
    @cleanup_regexp = nil
    f.close
  end
end

#each_hdb_dump(i, except_vnh = nil) ⇒ Object



594
595
596
597
598
599
600
601
602
603
604
605
606
# File 'lib/roma/storage/basic_storage.rb', line 594

def each_hdb_dump(i,except_vnh = nil)
  count = 0
  @hdb[i].each{|k,v|
    vn, last, clk, expt, val = unpack_data(v)
    if except_vnh && except_vnh.key?(vn) || Time.now.to_i > expt
      count += 1
      sleep @each_vn_dump_sleep if count % @each_vn_dump_sleep_count == 0
    else
      yield [vn, last, clk, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
      sleep @each_vn_dump_sleep
    end
  }
end

#each_vn_dump(target_vn) ⇒ Object



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
# File 'lib/roma/storage/basic_storage.rb', line 549

def each_vn_dump(target_vn)
  n = @hdiv[target_vn]
  @stat_lock.synchronize do
    return false if @dbs[n] != :normal
    return false if @each_vn_dump_vnodes.include?(target_vn)
    @each_vn_dump_vnodes << target_vn
  end

  begin
    @do_each_vn_dump = true
    each_unpacked_db(target_vn, @hdb) do |vn, last, clk, expt, k, val|
      return unless @do_each_vn_dump
      yield vn_dump_pack(vn, last, clk, expt, k, val)
    end
  ensure
    @each_vn_dump_vnodes.delete(target_vn)
  end

  true
end

#flush_db(dn) ⇒ Object



670
671
672
# File 'lib/roma/storage/basic_storage.rb', line 670

def flush_db(dn)
  @hdb[dn].sync
end

#get(vn, k, d) ⇒ Object



295
296
297
298
299
300
301
302
# File 'lib/roma/storage/basic_storage.rb', line 295

def get(vn, k, d)
  buf = db_get(vn, k)
  return nil unless buf
  vn, t, clk, expt, v = unpack_data(buf)

  return nil if Time.now.to_i > expt
  v
end

#get_context(vn, k, d) ⇒ Object



189
190
191
192
193
# File 'lib/roma/storage/basic_storage.rb', line 189

def get_context(vn, k, d)
  buf = db_get(vn, k)
  return nil unless buf
  unpack_header(buf)
end

#get_keys_in_cache(dn, kn = 100) ⇒ Object

Returns a key array in a cache(@hdbc).

dn

number of database

kn

number of keys which is return value



642
643
644
645
646
647
648
649
650
651
652
653
654
# File 'lib/roma/storage/basic_storage.rb', line 642

def get_keys_in_cache(dn, kn=100)
  return nil if @do_each_vn_dump
  ret = []
  return ret unless @hdbc[dn]
  count = 0
  @each_cache_lock.synchronize do
    @hdbc[dn].each do |k, v|
      ret << k
      break if (count+=1) >= kn
    end
  end
  ret
end

#get_logfile_listObject



731
732
733
734
735
736
737
738
739
740
741
# File 'lib/roma/storage/basic_storage.rb', line 731

def get_logfile_list
  l={}
  files=Dir.glob("#{@storage_path}/status.log.*")
  files.each{ |file|
    if /$.+status\.log\.(\d+)$/=~file
      l[$1.to_i]=$&
    end
  }
  # sorted by old order
  l.to_a.sort{|a,b| a[0]<=>b[0]}
end

#get_raw(vn, k, d) ⇒ Object



304
305
306
307
308
309
# File 'lib/roma/storage/basic_storage.rb', line 304

def get_raw(vn, k, d)
  buf = db_get(vn, k)
  return nil unless buf

  unpack_data(buf)
end

#get_raw2(k) ⇒ Object



311
312
313
314
315
316
317
# File 'lib/roma/storage/basic_storage.rb', line 311

def get_raw2(k)
  @hdb.each{|hdb|
    buf = hdb.get(k)
    return unpack_data(buf) if buf
  }
  nil
end

#get_statObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/roma/storage/basic_storage.rb', line 60

def get_stat
  ret = {}
  ret['storage.storage_path'] = File.expand_path(@storage_path)
  ret['storage.st_class'] = @st_class.to_s.match(/Roma::Storage::(.*)/)[1]
  ret['storage.divnum'] = @divnum
  ret['storage.option'] = @option
  ret['storage.each_vn_dump_sleep'] = @each_vn_dump_sleep
  ret['storage.each_vn_dump_sleep_count'] = @each_vn_dump_sleep_count
  ret['storage.each_vn_dump_files'] = @each_vn_dump_files.inspect
  ret['storage.each_clean_up_sleep'] = @each_clean_up_sleep
  ret['storage.cleanup_regexp'] = @cleanup_regexp
  ret['storage.logic_clock_expire'] = @logic_clock_expire
  ret['storage.safecopy_stats'] = @dbs.inspect
  ret
end

#incr(vn, k, d, v) ⇒ Object



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/roma/storage/basic_storage.rb', line 369

def incr(vn, k, d, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  v = (v2.to_i + v)
  v = 0 if v < 0
  v = v & 0xffffffffffffffff

  ret = [vn, Time.now.to_i, clk, expt2, v.to_s]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end

#load(dmp) ⇒ Object



478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/roma/storage/basic_storage.rb', line 478

def load(dmp)
  n = 0
  h = Marshal.load(dmp)
  h.each_pair{ |k, v|
    # remort data
    r_vn, r_last, r_clk, r_expt = unpack_header(v)
    raise "An invalid vnode number is include.key=#{k} vn=#{r_vn}" unless @hdiv.key?(r_vn)
    local = @hdb[@hdiv[r_vn]].get(k)
    if local == nil
      n += 1
      @hdb[@hdiv[r_vn]].put(k, v)
    else
      # local data
      l_vn, l_last, l_clk, l_expt = unpack_data(local)
      if r_last - l_last < @logic_clock_expire && cmp_clk(r_clk,l_clk) <= 0
      else # remort is newer.
        n += 1
        @hdb[@hdiv[r_vn]].put(k, v)
      end
    end
    sleep @each_vn_dump_sleep
  }
  n
end

#load_stream_dump(vn, last, clk, expt, k, v) ⇒ Object



503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# File 'lib/roma/storage/basic_storage.rb', line 503

def load_stream_dump(vn, last, clk, expt, k, v)
  buf = db_get(vn, k)
  if buf
    data = unpack_header(buf)
    if last - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      return nil
    end
  end

  ret = [vn, last, clk, expt, v]
  if expt == 0
    # for the deleted mark
    return ret if db_put(vn, k, pack_header(*ret[0..3]))
  else
    return ret if db_put(vn, k, pack_data(*ret))
  end
  nil
end

#load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v) ⇒ Object



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# File 'lib/roma/storage/basic_storage.rb', line 522

def load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v)
  n = @hdiv[vn]
  buf = @hdb[n].get(k)
  if buf
    data = unpack_header(buf)
    if last - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      return nil
    end
  end

  ret = [vn, last, clk, expt, v]
  if expt == 0
    # for the deleted mark
    return ret if @hdb[n].put(k, pack_header(*ret[0..3]))
  else
    return ret if @hdb[n].put(k, pack_data(*ret))
  end
  nil
end

#open_logObject



743
744
745
746
747
748
749
750
751
752
753
754
755
# File 'lib/roma/storage/basic_storage.rb', line 743

def open_log
  logs = get_logfile_list
  if logs.length == 0
    @log_name="#{@storage_path}/status.log.1"
  else
    if File::stat("#{@fname}.#{logs.last[0]}").size == 0
      @log_name="#{@fname}.#{logs.last[0]}"
    else
      @log_name="#{@fname}.#{logs.last[0]+1}"
    end
  end
  @log_fd=File.open(@log_name,"a")
end

#opendbObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/roma/storage/basic_storage.rb', line 112

def opendb
  create_div_hash
  mkdir_p(@storage_path)
  @divnum.times do |i|
    # open database file
    @hdb[i] = open_db("#{@storage_path}/#{i}.#{@ext_name}")
    # check cache file
    if File.exist?(cache_file_name(i))
      @hdbc[i] = open_db(cache_file_name(i))
      stop_clean_up { @dbs[i] = :safecopy_flushed }
    else
      @dbs[i] = :normal
      @hdbc[i] = nil
    end
  end
  open_log
end

#out(vn, k, d) ⇒ Object



365
366
367
# File 'lib/roma/storage/basic_storage.rb', line 365

def out(vn, k, d)
  @hdb[@hdiv[vn]].out(k)
end

#out_cache(dn, key) ⇒ Object

Remove a key for the cache(@hdbc).

dn

number of database

key

key



611
612
613
# File 'lib/roma/storage/basic_storage.rb', line 611

def out_cache(dn, key)
  @hdbc[dn].out(key)
end

#prepend(vn, k, d, expt, v) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/roma/storage/basic_storage.rb', line 281

def prepend(vn, k, d, expt, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  ret = [vn, Time.now.to_i, clk, expt, v + v2]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end

#rdelete(vn, k, d, clk) ⇒ Object



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/roma/storage/basic_storage.rb', line 319

def rdelete(vn, k, d, clk)
  buf = db_get(vn, k)
  t = Time.now.to_i
  if buf
    data = unpack_header(buf)
    if t - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      @error_message = "error:#{t-data[1]} < #{@logic_clock_expire} && cmp_clk(#{clk},#{data[2]})<=0"
      return nil 
    end
  end

  # [ 0.. 3] vn
  # [ 4.. 7] physical clock(unix time)
  # [ 8..11] logical clock
  # [12..15] exptime(unix time) => 0
  ret = [vn, t, clk, 0]
  if db_put(vn, k, pack_header(*ret))
    return ret
  else
    return nil
  end
end

#replace(vn, k, d, expt, v) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/roma/storage/basic_storage.rb', line 253

def replace(vn, k, d, expt, v)
  buf = db_get(vn, k)
  return nil unless buf

  # buf != nil
  vn, t, clk, expt2, v2 = unpack_data(buf)
  return nil if Time.now.to_i > expt2
  clk = (clk + 1) & 0xffffffff

  ret = [vn, Time.now.to_i, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  nil
end

#rset(vn, k, d, clk, expt, v) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/roma/storage/basic_storage.rb', line 208

def rset(vn, k, d, clk, expt, v)
  buf = db_get(vn, k)
  t = Time.now.to_i
  if buf
    data = unpack_data(buf)
    if t - data[1] < @logic_clock_expire && cmp_clk(clk,data[2]) <= 0
      @error_message = "error:#{t-data[1]} < #{@logic_clock_expire} && cmp_clk(#{clk},#{data[2]})<=0"
      return nil
    end
  end

  ret = [vn, t, clk, expt, v]
  return ret if db_put(vn, k, pack_data(*ret))
  @error_message = "error:put"
  nil
end

#set(vn, k, d, expt, v) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/roma/storage/basic_storage.rb', line 225

def set(vn, k, d, expt, v)
  buf = db_get(vn, k)
  clk = 0
  if buf
    data = unpack_data(buf)
    clk = (data[2] + 1) & 0xffffffff
  end

  ret = [vn, Time.now.to_i, clk, expt, v]
  return ret if db_put(vn , k, pack_data(*ret))
  nil
end

#set_db_stat(dn, stat) ⇒ Object



678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
# File 'lib/roma/storage/basic_storage.rb', line 678

def set_db_stat(dn, stat)
  @stat_lock.synchronize do
    case @dbs[dn]
    when :normal
      @each_vn_dump_vnodes.each do |vn|
        return false if dn == @hdiv[vn]
      end
      if stat == :safecopy_flushing
        # open cache
        @hdbc[dn] = open_db(cache_file_name(dn))
        stop_clean_up { @dbs[dn] = stat }
        write_log("#{dn} #{stat.to_s}")
        stat
      else
        false
      end
    when :safecopy_flushing
      if stat == :safecopy_flushed
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
      else
        false
      end
    when :safecopy_flushed
      if stat == :cachecleaning
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
      else
        false
      end
    when :cachecleaning
      if stat == :normal
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
        # remove cache
        close_db(@hdbc[dn])
        @hdbc[dn] = nil
        if File.exist?("#{@storage_path}/#{dn}.cache.#{@ext_name}")
          File.unlink("#{@storage_path}/#{dn}.cache.#{@ext_name}")
        end
        stat
      elsif stat == :safecopy_flushing
        write_log("#{dn} #{stat.to_s}")
        @dbs[dn] = stat
      else
        false
      end
    else
      false
    end
  end
end

#set_expt(vn, k, d, expt) ⇒ Object

set expire time



406
407
408
409
410
411
412
413
414
415
416
# File 'lib/roma/storage/basic_storage.rb', line 406

def set_expt(vn, k, d, expt)
  buf = db_get(vn, k)
  if buf
    vn, t, clk, expt2, v = unpack_data(buf)
    return nil if Time.now.to_i > expt2
    clk = (clk + 1) & 0xffffffff
    ret = [vn, Time.now.to_i, clk, expt, v]
    return ret if db_put(vn, k, pack_data(*ret))
  end
  nil
end

#stop_clean_up(&block) ⇒ Object



466
467
468
469
470
471
472
473
474
475
476
# File 'lib/roma/storage/basic_storage.rb', line 466

def stop_clean_up(&block)
  @do_clean_up = false
  if block
    @each_clean_up_lock.lock
    begin
      block.call
    ensure
      @each_clean_up_lock.unlock
    end
  end
end

#true_lengthObject



418
419
420
421
422
# File 'lib/roma/storage/basic_storage.rb', line 418

def true_length
  res = 0
  @hdb.each{ |hdb| res += hdb.rnum }
  res
end

#write_log(line) ⇒ Object



757
758
759
760
761
762
763
764
765
766
767
768
# File 'lib/roma/storage/basic_storage.rb', line 757

def write_log(line)
  return unless @log_name
  # log rotation
  if File::stat(@log_name).size > 1000 * 1024
    close_log
    open_log
  end
  t = Time.now
  tstr = "#{t.strftime('%Y-%m-%dT%H:%M:%S')}.#{t.usec}"
  @log_fd.write("#{tstr} #{line}\n")
  @log_fd.flush
end