Class: OFlow::Actors::Recorder

Inherits:
OFlow::Actor show all
Defined in:
lib/oflow/actors/recorder.rb

Overview

Actor that saves records to the local file system as JSON representations of the records as lines in a single file associated with one of the elements of the JSON record. The message that triggers the store must have a ‘table’ element, a ‘key’, and a ‘rec’ element.

Defined Under Namespace

Classes: ExistsError, KeyError, NotFoundError, SeqError, TableError

Instance Attribute Summary collapse

Attributes inherited from OFlow::Actor

#task

Instance Method Summary collapse

Methods inherited from OFlow::Actor

#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread

Constructor Details

#initialize(task, options) ⇒ Recorder

Initializes the recorder with options of:

Parameters:

  • options (Hash)

    with keys of

    • :dir [String] directory to store the persisted records

    • :results_path [String] path to where the results should be placed in

      the request (default: nil or ship only results)
      


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/oflow/actors/recorder.rb', line 19

def initialize(task, options)
  super
	@cache = {}
  @dir = options[:dir]
  if @dir.nil?
    @dir = File.join('db', task.full_name.gsub(':', '/'))
  end
  @dir = File.expand_path(@dir.strip)
  @results_path = options[:results_path]
  @results_path.strip! unless @results_path.nil?

  if Dir.exist?(@dir)
    Dir.glob(File.join(@dir, '*.json')).each do |path|
      load(path)
    end
  else
    `mkdir -p #{@dir}`
  end
end

Instance Attribute Details

#dirObject (readonly)

Returns the value of attribute dir.



12
13
14
# File 'lib/oflow/actors/recorder.rb', line 12

def dir
  @dir
end

Instance Method Details

#clear(box) ⇒ Object



129
130
131
132
133
134
135
# File 'lib/oflow/actors/recorder.rb', line 129

def clear(box)
  @cache = {}
  `rm -rf #{@dir}`
  # remake the dir in preparation for future inserts
  `mkdir -p #{@dir}`
  nil
end

#delete(box) ⇒ Object

Raises:



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/oflow/actors/recorder.rb', line 102

def delete(box)
  table = box.get('table')
  key = box.get('key')
  raise KeyError.new(:read) if table.nil?
  raise KeyError.new(:read) if key.nil?

	tc = @cache[table]
	unless tc.nil?
	  tc.delete(key)
	  write(table)
	end
  nil
end

#insert(box) ⇒ Object Also known as: update, insert_update

Raises:



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/oflow/actors/recorder.rb', line 70

def insert(box)
  table = box.get('table')
  key = box.get('key')
  rec = box.get('rec')
  raise KeyError.new(:insert) if table.nil?
  raise KeyError.new(:insert) if key.nil?

	tc = @cache[table]
	if tc.nil?
	  tc = {}
	  @cache[table] = tc
	end
  tc[key] = rec
	write(table)
end

#perform(op, box) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/oflow/actors/recorder.rb', line 39

def perform(op, box)
  dest = box.contents[:dest]
  result = nil
  case op
  when :insert, :create
    result = insert(box)
  when :get, :read
    result = read(box)
  when :update
    result = update(box)
  when :insert_update
    result = insert_update(box)
  when :delete, :remove
    result = delete(box)
  when :query
    result = query(box)
  when :clear
    result = clear(box)
  else
    raise OpError.new(task.full_name, op)
  end
  unless dest.nil?
    if @results_path.nil?
      box = Box.new(result, box.tracker)
    else
      box = box.set(@results_path, result)
    end
    task.ship(dest, box)
  end
end

#query(box) ⇒ Object

Raises:



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/oflow/actors/recorder.rb', line 116

def query(box)
  recs = {}
  expr = box.get('expr')
  table = box.get('table')
  raise KeyError.new(:query) if table.nil?

	tc = @cache[table]
	tc.each do |key,rec|
    recs[key] = rec if (expr.nil? || expr.call(rec, key))
  end
  recs
end

#read(box) ⇒ Object

Raises:



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/oflow/actors/recorder.rb', line 89

def read(box)
  table = box.get('table')
  key = box.get('key')
  raise KeyError.new(:read) if table.nil?
  raise KeyError.new(:read) if key.nil?

	tc = @cache[table]
	return nil if tc.nil?

	rec = tc[key]
  rec
end