Class: Tupelo::Archiver::Persister

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/tupelo/archiver/persister.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dir) ⇒ Persister

options: time and size thresholds sync wal run in fork, at end of pipe, for parallel case how to read old wal file in recovery case?



14
15
16
17
18
19
20
# File 'lib/tupelo/archiver/persister.rb', line 14

def initialize dir
  @dir = dir
  unless File.directory?(dir)
    raise "not a dir: #{dir.inspect} -- cannot set up persister"
  end
  ### raise unless blobber is msgpack (or json)?
end

Instance Attribute Details

#dirObject (readonly)

Returns the value of attribute dir.



7
8
9
# File 'lib/tupelo/archiver/persister.rb', line 7

def dir
  @dir
end

Instance Method Details

#clear_excess_zerosObject



90
91
92
# File 'lib/tupelo/archiver/persister.rb', line 90

def clear_excess_zeros
  db[:tuples].filter(count: 0).delete ## limit rows to delete? threshold?
end

#dbObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/tupelo/archiver/persister.rb', line 28

def db
  @db ||= begin
    db = Sequel.sqlite(:database => File.join(dir, "db"))
    #db.loggers << Logger.new($stderr) ## client.log ?

    db.create_table? :tuples do
      primary_key   :id
      String        :packed,  null: false
      Integer       :count,   null: false
    end

    db.create_table? :subspaces do
      foreign_key   :tuple_id, :tuples, index: true, null: false
      Integer       :tag,     null: false
    end

    db.create_table? :global do # one row
      Integer       :tick
        # starts from 0 when system starts, but
        # we persist it in case of crash while 2 arcs running, to
        # determine which is more correct
      Integer       :next_id
        # internal state to a single arc worker
    end
    
    if db[:global].count == 0
      db[:global] << {tick: 0, next_id: 0}
    end

    db
  end
end

#eachObject



61
62
63
64
65
# File 'lib/tupelo/archiver/persister.rb', line 61

def each
  db[:tuples].each do |tuple|
    yield tuple
  end
end

#flush(rec, next_id, tick) ⇒ Object

rec points to linked list (via next_rec_to_save) of recs to flush to db but we don’t have to do that for every transaction (configurable)



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/tupelo/archiver/persister.rb', line 70

def flush rec, next_id, tick
  ## if threshold etc.
  db.transaction do
    while rec
      n = db[:tuples].filter(id: rec.id).update(count: rec.count)
      if n == 0
        db[:tuples].insert(id: rec.id, count: rec.count, packed: rec.packed)
      end
      rec.unmark_to_save
      rec = rec.next_rec_to_save
    end
    db[:global].update(next_id: next_id, tick: tick)
  end
## rescue ???
end

#next_idObject



86
87
88
# File 'lib/tupelo/archiver/persister.rb', line 86

def next_id
  db[:global].first[:next_id]
end

#walObject



22
23
24
25
26
# File 'lib/tupelo/archiver/persister.rb', line 22

def wal
  @wal ||= begin
    File.open File.join(dir, "wal"), "w"
  end
end