Class: Tupelo::Archiver::Persister
- Includes:
- Enumerable
- Defined in:
- lib/tupelo/archiver/persister.rb
Instance Attribute Summary collapse
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
Instance Method Summary collapse
- #clear_excess_zeros ⇒ Object
- #db ⇒ Object
- #each ⇒ Object
-
#flush(rec, next_id, tick) ⇒ Object
rec points to linked list (via next_rec_to_save) of recs to flush to db but we don’t have to do that for every transaction (configurable).
-
#initialize(dir) ⇒ Persister
constructor
options: time and size thresholds sync wal run in fork, at end of pipe, for parallel case how to read old wal file in recovery case?.
- #next_id ⇒ Object
- #wal ⇒ Object
Constructor Details
#initialize(dir) ⇒ Persister
options: time and size thresholds sync wal run in fork, at end of pipe, for parallel case how to read old wal file in recovery case?
14 15 16 17 18 19 20 |
# File 'lib/tupelo/archiver/persister.rb', line 14 def initialize dir @dir = dir unless File.directory?(dir) raise "not a dir: #{dir.inspect} -- cannot set up persister" end ### raise unless blobber is msgpack (or json)? end |
Instance Attribute Details
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
7 8 9 |
# File 'lib/tupelo/archiver/persister.rb', line 7 def dir @dir end |
Instance Method Details
#clear_excess_zeros ⇒ Object
90 91 92 |
# File 'lib/tupelo/archiver/persister.rb', line 90 def clear_excess_zeros db[:tuples].filter(count: 0).delete ## limit rows to delete? threshold? end |
#db ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/tupelo/archiver/persister.rb', line 28 def db @db ||= begin db = Sequel.sqlite(:database => File.join(dir, "db")) #db.loggers << Logger.new($stderr) ## client.log ? db.create_table? :tuples do primary_key :id String :packed, null: false Integer :count, null: false end db.create_table? :subspaces do foreign_key :tuple_id, :tuples, index: true, null: false Integer :tag, null: false end db.create_table? :global do # one row Integer :tick # starts from 0 when system starts, but # we persist it in case of crash while 2 arcs running, to # determine which is more correct Integer :next_id # internal state to a single arc worker end if db[:global].count == 0 db[:global] << {tick: 0, next_id: 0} end db end end |
#each ⇒ Object
61 62 63 64 65 |
# File 'lib/tupelo/archiver/persister.rb', line 61 def each db[:tuples].each do |tuple| yield tuple end end |
#flush(rec, next_id, tick) ⇒ Object
rec points to linked list (via next_rec_to_save) of recs to flush to db but we don’t have to do that for every transaction (configurable)
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/tupelo/archiver/persister.rb', line 70 def flush rec, next_id, tick ## if threshold etc. db.transaction do while rec n = db[:tuples].filter(id: rec.id).update(count: rec.count) if n == 0 db[:tuples].insert(id: rec.id, count: rec.count, packed: rec.packed) end rec.unmark_to_save rec = rec.next_rec_to_save end db[:global].update(next_id: next_id, tick: tick) end ## rescue ??? end |
#next_id ⇒ Object
86 87 88 |
# File 'lib/tupelo/archiver/persister.rb', line 86 def next_id db[:global].first[:next_id] end |
#wal ⇒ Object
22 23 24 25 26 |
# File 'lib/tupelo/archiver/persister.rb', line 22 def wal @wal ||= begin File.open File.join(dir, "wal"), "w" end end |