Class: Ruote::ActiveRecord::Storage

Inherits:
Object
  • Object
show all
Includes:
StorageBase
Defined in:
lib/ruote/ar/storage.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Storage

Returns a new instance of Storage.



13
14
15
16
17
18
19
20
21
22
# File 'lib/ruote/ar/storage.rb', line 13

def initialize(options = {})

  @table_name = options['table_name'] || ('documents').to_sym
  @ip = Ruote.local_ip
  @last_time = Time.at(0.0).utc

  @worker = [current_worker_name, @ip.gsub(/\./, '_')].join('/')

  replace_engine_configuration(options) unless options.empty?
end

Instance Method Details

#add_type(type) ⇒ Object

Mainly used by ruote’s test/unit/ut_17_storage.rb



192
193
194
# File 'lib/ruote/ar/storage.rb', line 192

def add_type(type)
  # does nothing, types are differentiated by the 'typ' column
end

#by_field(type, field, value, opts = {}) ⇒ Object

Querying workitems by field (warning, goes deep into the JSON structure)

Raises:

  • (NotImplementedError)


220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/ruote/ar/storage.rb', line 220

def by_field(type, field, value, opts={})

  raise NotImplementedError if type != 'workitems'

  lk = [ '%"', field, '":' ]
  lk.push(Rufus::Json.encode(value)) if value
  lk.push('%')

  docs = table.where(table[:typ].eq(type).and(table[:doc].matches(lk.join)))

  return connection.select_value(docs.project('count(*)')) if opts[:count]
  
  docs = connection.select_all(docs.project('*').order(table[:ide].asc, table[:rev].desc).take(opts[:limit]).skip(opts[:offset] || opts[:skip]))
  select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d['doc']) }
end

#by_participant(type, participant_name, opts = {}) ⇒ Object

A provision made for workitems, allow to query them directly by participant name.

Raises:

  • (NotImplementedError)


205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/ruote/ar/storage.rb', line 205

def by_participant(type, participant_name, opts={})

  raise NotImplementedError if type != 'workitems'

  docs = table.where(table[:typ].eq(type).and(table[:participant_name].eq(participant_name)))

  return connection.select_value(docs.project('count(*)')) if opts[:count]
  
  docs = connection.select_all(docs.project('*').order(table[:ide].asc, table[:rev].desc).take(opts[:limit]).skip(opts[:offset] || opts[:skip]))

  select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d['doc']) }
end

#closeObject

Grrr… I should sort the mess between close and shutdown… Tests vs production :-(



186
187
188
# File 'lib/ruote/ar/storage.rb', line 186

def close
  shutdown
end

#delete(doc) ⇒ Object

Raises:

  • (true)


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/ruote/ar/storage.rb', line 108

def delete(doc)
  raise true if doc.nil?
 
  raise ArgumentError.new('no _rev for doc') unless doc['_rev']

  # usually not necessary, adding it not to forget it later on        

  dm = Arel::DeleteManager.new Arel::Table.engine
  dm.from table
  dm.where table[:typ].eq(doc['type']).and(table[:ide].eq(doc['_id']).and(table[:rev].eq(doc['_rev'].to_i)))
  count = connection.delete(dm)

  return (get(doc['type'], doc['_id']) || true) if count < 1
  # failure

  nil
  # success
end

#done(doc) ⇒ Object

removing doc after success (or fail) success. It’s important to not leave any message.



52
53
54
55
56
57
# File 'lib/ruote/ar/storage.rb', line 52

def done(doc)
  dm = Arel::DeleteManager.new Arel::Table.engine
  dm.from table
  dm.where table[:typ].eq(doc['type']).and(table[:ide].eq(doc['_id']).and(table[:rev].eq(1).and(table[:worker].eq(@worker))))
  connection.delete(dm)
end

#get(type, key) ⇒ Object



104
105
106
# File 'lib/ruote/ar/storage.rb', line 104

def get(type, key)
  do_get(type, key)
end

#get_many(type, key = nil, opts = {}) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/ruote/ar/storage.rb', line 127

def get_many(type, key=nil, opts={})

  ds = table[:typ].eq(type)

  keys = key ? Array(key) : nil
  ds = ds.and(table[:wfid].in(keys)) if keys && keys.first.is_a?(String)
  ds = ds.and(table[:worker].eq(nil)) if type == 'msgs'
  
  ds = table.where(ds)

  return connection.select_value(ds.project(table[:wfid].count)) if opts[:count]

  if opts[:descending].is_a?(Array) && opts[:descending].first.class != String
    opts[:descending] = opts[:descending].collect {|s| s.inspect.gsub(':','').gsub('.', ' ')}
  end

  if opts[:descending]
    ds = ds.order(table[:ide].desc, table[:rev].desc)
  else 
    ds = ds.order(table[:ide].asc, table[:rev].asc)
  end

  ds = ds.take(opts[:limit]).skip(opts[:skip]||opts[:offset])

  docs = connection.select_all(ds.project('*'))
  docs = select_last_revs(docs)
  docs = docs.collect { |d| Rufus::Json.decode(d['doc']) }

  if keys && keys.first.is_a?(Regexp)
    docs.select { |doc| keys.find { |k| k.match(doc['_id']) } }
  else
    docs
  end

  # (pass on the dataset.filter(:wfid => /regexp/) for now
  # since we have potentially multiple keys)
end

#ids(type) ⇒ Object

Returns all the ids of the documents of a given type.



167
168
169
# File 'lib/ruote/ar/storage.rb', line 167

def ids(type)
  connection.select_values(table.where(table[:typ].eq(type)).project('distinct ide').order(table[:ide]))
end

#purge!Object

Nukes all the documents in this storage.



173
174
175
# File 'lib/ruote/ar/storage.rb', line 173

def purge!
  # just for test
end

#purge_type!(type) ⇒ Object

Nukes a db type and reputs it (losing all the documents that were in it).



198
199
200
# File 'lib/ruote/ar/storage.rb', line 198

def purge_type!(type)
  # just for test
end

#put(doc, opts = {}) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ruote/ar/storage.rb', line 72

def put(doc, opts={})

  if doc['_rev']

    d = get(doc['type'], doc['_id'])

    return true unless d
    return d if d['_rev'] != doc['_rev']
      # failures
  end

  nrev = doc['_rev'].to_i + 1

  begin

    do_insert(doc, nrev, opts[:update_rev])

  rescue Exception => de
    puts "Error putting: #{de.message}: #{doc.inspect}"
    return (get(doc['type'], doc['_id']) || true)
      # failure
  end
  
  dm = Arel::DeleteManager.new Arel::Table.engine
  dm.from table
  dm.where table[:typ].eq(doc['type']).and(table[:ide].eq(doc['_id']).and(table[:rev].lt(nrev)))
  connection.delete(dm)

  nil
    # success
end

#put_msg(action, options) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/ruote/ar/storage.rb', line 25

def put_msg(action, options)

  # put_msg is a unique action, no need for all the complexity of put
  do_insert(prepare_msg_doc(action, options), 1)

  nil
end

#put_schedule(flavour, owner_fei, s, msg) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/ruote/ar/storage.rb', line 59

def put_schedule(flavour, owner_fei, s, msg)

  # put_schedule is a unique action, no need for all the complexity of put

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  do_insert(doc, 1)

  doc['_id']
end

#query_workitems(criteria) ⇒ Object



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/ruote/ar/storage.rb', line 236

def query_workitems(criteria)

  ds = table[:typ].eq('workitems')
  
  wfid = criteria.delete('wfid')
  ds = ds.and(table[:ide].matches("%!#{wfid}")) if wfid
  
  pname = criteria.delete('participant_name') || criteria.delete('participant')
  ds = ds.and(table[:participant_name].eq(pname)) if pname

  count = criteria.delete('count')
  limit = criteria.delete('limit')
  offset = criteria.delete('offset') || criteria.delete('skip')

  criteria.collect do |k, v|
    ds = ds.and(table[:doc].matches("%\"#{k}\":#{Rufus::Json.encode(v)}%"))
  end
  
  ds = table.where(ds).take(limit).skip(offset)

  return connection.select_one(ds.project(table[:wfid].count)).first if count
  
  select_last_revs(connection.select_all(ds.project('*'))).collect { |d| Ruote::Workitem.from_json(d['doc']) } 
end

#reserve(doc) ⇒ Object

Used to reserve ‘msgs’ and ‘schedules’. Simply update and return true if the update was affected more than one line.



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/ruote/ar/storage.rb', line 36

def reserve(doc)
  um = Arel::UpdateManager.new Arel::Table.engine
  um.table table
  um.where table[:typ].eq(doc['type'].to_s).
    and(table[:ide].eq(doc['_id'].to_s).
        and(table[:rev].eq(1).
            and(table[:worker].eq(nil))))
  um.set [
    [table[:worker], @worker]
  ]
  return connection.update(um.to_sql) > 0
end

#shutdownObject

Returns connection to pool



178
179
180
181
# File 'lib/ruote/ar/storage.rb', line 178

def shutdown
  ::ActiveRecord::Base.clear_active_connections!
  ::ActiveRecord::Base.connection.close
end