Class: OFlow::Actors::Recorder
Overview
Actor that saves records to the local file system as JSON representations of the records as lines in a single file associated with one of the elements of the JSON record. The message that triggers the store must have a ‘table’ element, a ‘key’, and a ‘rec’ element.
Defined Under Namespace
Classes: ExistsError, KeyError, NotFoundError, SeqError, TableError
Instance Attribute Summary collapse
Attributes inherited from OFlow::Actor
#task
Instance Method Summary
collapse
#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread
Constructor Details
#initialize(task, options) ⇒ Recorder
Initializes the recorder with options of:
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/oflow/actors/recorder.rb', line 19
def initialize(task, options)
super
@cache = {}
@dir = options[:dir]
if @dir.nil?
@dir = File.join('db', task.full_name.gsub(':', '/'))
end
@dir = File.expand_path(@dir.strip)
@results_path = options[:results_path]
@results_path.strip! unless @results_path.nil?
if Dir.exist?(@dir)
Dir.glob(File.join(@dir, '*.json')).each do |path|
load(path)
end
else
`mkdir -p #{@dir}`
end
end
|
Instance Attribute Details
#dir ⇒ Object
Returns the value of attribute dir.
12
13
14
|
# File 'lib/oflow/actors/recorder.rb', line 12
def dir
@dir
end
|
Instance Method Details
#clear(box) ⇒ Object
129
130
131
132
133
134
135
|
# File 'lib/oflow/actors/recorder.rb', line 129
def clear(box)
@cache = {}
`rm -rf #{@dir}`
`mkdir -p #{@dir}`
nil
end
|
#delete(box) ⇒ Object
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/oflow/actors/recorder.rb', line 102
def delete(box)
table = box.get('table')
key = box.get('key')
raise KeyError.new(:read) if table.nil?
raise KeyError.new(:read) if key.nil?
tc = @cache[table]
unless tc.nil?
tc.delete(key)
write(table)
end
nil
end
|
#insert(box) ⇒ Object
Also known as:
update, insert_update
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/oflow/actors/recorder.rb', line 70
def insert(box)
table = box.get('table')
key = box.get('key')
rec = box.get('rec')
raise KeyError.new(:insert) if table.nil?
raise KeyError.new(:insert) if key.nil?
tc = @cache[table]
if tc.nil?
tc = {}
@cache[table] = tc
end
tc[key] = rec
write(table)
end
|
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/oflow/actors/recorder.rb', line 39
def perform(op, box)
dest = box.contents[:dest]
result = nil
case op
when :insert, :create
result = insert(box)
when :get, :read
result = read(box)
when :update
result = update(box)
when :insert_update
result = insert_update(box)
when :delete, :remove
result = delete(box)
when :query
result = query(box)
when :clear
result = clear(box)
else
raise OpError.new(task.full_name, op)
end
unless dest.nil?
if @results_path.nil?
box = Box.new(result, box.tracker)
else
box = box.set(@results_path, result)
end
task.ship(dest, box)
end
end
|
#query(box) ⇒ Object
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/oflow/actors/recorder.rb', line 116
def query(box)
recs = {}
expr = box.get('expr')
table = box.get('table')
raise KeyError.new(:query) if table.nil?
tc = @cache[table]
tc.each do |key,rec|
recs[key] = rec if (expr.nil? || expr.call(rec, key))
end
recs
end
|
#read(box) ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/oflow/actors/recorder.rb', line 89
def read(box)
table = box.get('table')
key = box.get('key')
raise KeyError.new(:read) if table.nil?
raise KeyError.new(:read) if key.nil?
tc = @cache[table]
return nil if tc.nil?
rec = tc[key]
rec
end
|