Class: OFlow::Actors::Persister
- Inherits:
-
OFlow::Actor
- Object
- OFlow::Actor
- OFlow::Actors::Persister
- Defined in:
- lib/oflow/actors/persister.rb
Overview
Actor that persists records to the local file system as JSON representations of the records. Records can be the whole contents of the box received or a sub element of the contents. The key to the records are keys provided either in the record data or outside the data but somewhere else in the box received. Options for maintaining historic records and sequence number locking are included. If no sequence number is provide the Persister will assume there is no checking required and write anyway.
Records are stored as JSON with the filename as the key and sequence number. The format of the file name is <key>~<seq>.json. As an example, a record stored with a key of ‘first’ and a sequence number of 3 (third time saved) would be ‘first~3.json.
Defined Under Namespace
Classes: ExistsError, KeyError, NotFoundError, SeqError
Instance Attribute Summary collapse
-
#data_path ⇒ Object
readonly
Returns the value of attribute data_path.
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
-
#historic ⇒ Object
readonly
Returns the value of attribute historic.
-
#key_path ⇒ Object
readonly
Returns the value of attribute key_path.
-
#seq_path ⇒ Object
readonly
Returns the value of attribute seq_path.
Attributes inherited from OFlow::Actor
Instance Method Summary collapse
-
#caching? ⇒ Boolean
Returns true if the actor is caching records.
- #clear(box) ⇒ Object
- #delete(box) ⇒ Object
- #delete_historic(key, seq) ⇒ Object
-
#initialize(task, options) ⇒ Persister
constructor
Initializes the persister with options of:.
- #insert(box) ⇒ Object
- #insert_update(box) ⇒ Object
- #key_seq_from_path(path) ⇒ Object
- #load(path) ⇒ Object
- #perform(op, box) ⇒ Object
- #query(box) ⇒ Object
- #read(box) ⇒ Object
-
#save(rec, key, seq) ⇒ Object
internal use only.
- #update(box) ⇒ Object
Methods inherited from OFlow::Actor
#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread
Constructor Details
#initialize(task, options) ⇒ Persister
Initializes the persister with options of:
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/oflow/actors/persister.rb', line 36 def initialize(task, ) super @dir = [:dir] if @dir.nil? @dir = File.join('db', task.full_name.gsub(':', '/')) end @dir = File.(@dir.strip) @key_path = .fetch(:key_path, 'key').strip @seq_path = .fetch(:seq_path, 'seq').strip @data_path = .fetch(:data_path, nil) # nil means all contents @data_path.strip! unless @data_path.nil? @results_path = [:results_path] @results_path.strip! unless @results_path.nil? if .fetch(:cache, true) # key is record key, value is [seq, rec] @cache = {} else @cache = nil end @historic = ('true' == .fetch(:historic, 'false').to_s) if Dir.exist?(@dir) unless @cache.nil? Dir.glob(File.join(@dir, '**', '*.json')).each do |path| if File.symlink?(path) rec = load(path) unless @cache.nil? key, seq = key_seq_from_path(path) @cache[key] = [seq, rec] end end end end else `mkdir -p #{@dir}` end end |
Instance Attribute Details
#data_path ⇒ Object (readonly)
Returns the value of attribute data_path.
23 24 25 |
# File 'lib/oflow/actors/persister.rb', line 23 def data_path @data_path end |
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
20 21 22 |
# File 'lib/oflow/actors/persister.rb', line 20 def dir @dir end |
#historic ⇒ Object (readonly)
Returns the value of attribute historic.
24 25 26 |
# File 'lib/oflow/actors/persister.rb', line 24 def historic @historic end |
#key_path ⇒ Object (readonly)
Returns the value of attribute key_path.
21 22 23 |
# File 'lib/oflow/actors/persister.rb', line 21 def key_path @key_path end |
#seq_path ⇒ Object (readonly)
Returns the value of attribute seq_path.
22 23 24 |
# File 'lib/oflow/actors/persister.rb', line 22 def seq_path @seq_path end |
Instance Method Details
#caching? ⇒ Boolean
Returns true if the actor is caching records.
116 117 118 |
# File 'lib/oflow/actors/persister.rb', line 116 def caching?() !@cache.nil? end |
#clear(box) ⇒ Object
220 221 222 223 224 225 226 |
# File 'lib/oflow/actors/persister.rb', line 220 def clear(box) @cache = {} unless @cache.nil? `rm -rf #{@dir}` # remake the dir in preparation for future inserts `mkdir -p #{@dir}` nil end |
#delete(box) ⇒ Object
177 178 179 180 181 182 183 184 |
# File 'lib/oflow/actors/persister.rb', line 177 def delete(box) key = box.get(@key_path) @cache.delete(key) unless @cache.nil? linkpath = File.join(@dir, "#{key}.json") File.delete(linkpath) delete_historic(key, nil) unless @historic nil end |
#delete_historic(key, seq) ⇒ Object
249 250 251 252 253 254 255 |
# File 'lib/oflow/actors/persister.rb', line 249 def delete_historic(key, seq) Dir.glob(File.join(@dir, '**', "#{key}~*.json")).each do |path| _, s = key_seq_from_path(path) next if s == seq File.delete(path) end end |
#insert(box) ⇒ Object
106 107 108 109 110 111 112 113 |
# File 'lib/oflow/actors/persister.rb', line 106 def insert(box) key = box.get(@key_path) raise KeyError.new(:insert) if key.nil? box = box.set(@seq_path, 1) rec = box.get(@data_path) @cache[key] = [1, rec] unless @cache.nil? save(rec, key, 1) end |
#insert_update(box) ⇒ Object
169 170 171 172 173 174 175 |
# File 'lib/oflow/actors/persister.rb', line 169 def insert_update(box) begin insert(box) rescue ExistsError update(box) end end |
#key_seq_from_path(path) ⇒ Object
257 258 259 260 261 262 |
# File 'lib/oflow/actors/persister.rb', line 257 def key_seq_from_path(path) path = File.readlink(path) if File.symlink?(path) base = File.basename(path)[0..-6] # strip off '.json' a = base.split('~') [a[0..-2].join('~'), a[-1].to_i] end |
#load(path) ⇒ Object
244 245 246 247 |
# File 'lib/oflow/actors/persister.rb', line 244 def load(path) return nil unless File.exist?(path) Oj.load_file(path, :mode => :object) end |
#perform(op, box) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/oflow/actors/persister.rb', line 75 def perform(op, box) dest = box.contents[:dest] result = nil case op when :insert, :create result = insert(box) when :get, :read result = read(box) when :update result = update(box) when :insert_update result = insert_update(box) when :delete, :remove result = delete(box) when :query result = query(box) when :clear result = clear(box) else raise OpError.new(task.full_name, op) end unless dest.nil? if @results_path.nil? box = Box.new(result, box.tracker) else box = box.set(@results_path, result) end task.ship(dest, box) end end |
#query(box) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/oflow/actors/persister.rb', line 186 def query(box) recs = {} expr = box.get('expr') if expr.nil? if @cache.nil? Dir.glob(File.join(@dir, '**/*.json')).each do |path| recs[File.basename(path)[0..-6]] = load(path) if File.symlink?(path) end else @cache.each do |key,seq_rec| recs[key] = seq_rec[1] end end elsif expr.is_a?(Proc) if @cache.nil? Dir.glob(File.join(@dir, '**/*.json')).each do |path| next unless File.symlink?(path) rec = load(path) key, seq = key_seq_from_path(path) recs[key] = rec if expr.call(rec, key, seq) end else @cache.each do |key,seq_rec| rec = seq_rec[1] recs[key] = rec if expr.call(rec, key, seq_rec[0]) end end else # TBD add support for string safe expressions in the future raise Exception.new("expr can only be a Proc, not a #{expr.class}") end recs end |
#read(box) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/oflow/actors/persister.rb', line 120 def read(box) # Should be a Hash. key = box.contents[:key] raise KeyError(:read) if key.nil? if @cache.nil? linkpath = File.join(@dir, "#{key}.json") rec = load(linkpath) else unless (seq_rec = @cache[key]).nil? rec = seq_rec[1] end end # If not found rec will be nil, that is okay. rec end |
#save(rec, key, seq) ⇒ Object
internal use only
229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/oflow/actors/persister.rb', line 229 def save(rec, key, seq) filename = "#{key}~#{seq}.json" path = File.join(@dir, filename) linkpath = File.join(@dir, "#{key}.json") raise ExistsError.new(key, seq) if File.exist?(path) Oj.to_file(path, rec, :mode => :object) begin File.delete(linkpath) rescue Exception # ignore end File.symlink(filename, linkpath) rec end |
#update(box) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/oflow/actors/persister.rb', line 136 def update(box) key = box.get(@key_path) raise KeyError.new(:update) if key.nil? seq = box.get(@seq_path) if @cache.nil? if (seq_rec = @cache[key]).nil? raise NotFoundError.new(key) end seq = seq_rec[0] if seq.nil? else seq = 0 has_rec = false Dir.glob(File.join(@dir, '**', "#{key}*.json")).each do |path| if File.symlink?(path) has_rec = true next end _, s = key_seq_from_path(path) seq = s if seq < s end end raise NotFoundError.new(key) unless has_rec raise SeqError.new(:update, key) if seq.nil? || 0 == seq seq += 1 box = box.set(@seq_path, seq) rec = box.get(@data_path) @cache[key] = [seq, rec] unless @cache.nil? rec = save(rec, key, seq) delete_historic(key, seq) unless @historic rec end |