Module: Ruote::StorageBase

Included in:
CompositeStorage, FsStorage, HashStorage
Defined in:
lib/ruote/storage/base.rb

Overview

Base methods for storage implementations.

Instance Method Summary collapse

Instance Method Details

#clearObject

Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.

NOTE that it doesn’t remove engine variables (danger)



270
271
272
273
274
275
# File 'lib/ruote/storage/base.rb', line 270

def clear

  %w[ msgs schedules errors expressions workitems ].each do |type|
    purge_type!(type)
  end
end

#contextObject

– misc ++



40
41
42
43
# File 'lib/ruote/storage/base.rb', line 40

def context

  @context ||= Ruote::Context.new(self)
end

#context=(c) ⇒ Object



45
46
47
48
# File 'lib/ruote/storage/base.rb', line 45

def context=(c)

  @context = c
end

#copy_to(target, opts = {}) ⇒ Object

Copies the content of this storage into a target storage.

Of course, the target storage may be a different implementation.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/ruote/storage/base.rb', line 242

def copy_to(target, opts={})

  counter = 0

  %w[
    configurations errors expressions msgs schedules variables workitems
  ].each do |type|

    ids(type).each do |id|

      item = get(type, id)

      item.delete('_rev')
      target.put(item)

      counter += 1
      puts("  #{type}/#{item['_id']}") if opts[:verbose]
    end
  end

  counter
end

#delete_schedule(schedule_id) ⇒ Object



209
210
211
212
213
214
215
# File 'lib/ruote/storage/base.rb', line 209

def delete_schedule(schedule_id)

  return if schedule_id.nil?

  s = get('schedules', schedule_id)
  delete(s) if s
end

#dump(type) ⇒ Object



302
303
304
305
306
307
# File 'lib/ruote/storage/base.rb', line 302

def dump(type)

  require 'yaml'

  YAML.dump({ type => get_many(type) })
end

#empty?(type) ⇒ Boolean

Returns:

  • (Boolean)


103
104
105
106
# File 'lib/ruote/storage/base.rb', line 103

def empty?(type)

  (get_many(type, nil, :count => true) == 0)
end

#expression_wfids(opts) ⇒ Object

Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).

Understands the :skip, :limit and :descending options.

This is a base implementation, different storage implementations may come up with different implementations (think CouchDB, which could provide a view for it).



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/ruote/storage/base.rb', line 144

def expression_wfids(opts)

  wfids = ids('expressions').collect { |fei| fei.split('!').last }.uniq.sort

  wfids = wfids.reverse if opts[:descending]

  skip = opts[:skip] || 0
  limit = opts[:limit] || wfids.length

  wfids[skip, limit]
end

#find_expressions(wfid) ⇒ Object

Given a wfid, returns all the expressions of that process instance.



114
115
116
117
# File 'lib/ruote/storage/base.rb', line 114

def find_expressions(wfid)

  get_many('expressions', wfid)
end

#find_root_expression(wfid) ⇒ Object

For a given wfid, fetches all the root expressions, sort by expid and return the first. Hopefully it’s the right root_expression.



130
131
132
133
# File 'lib/ruote/storage/base.rb', line 130

def find_root_expression(wfid)

  find_root_expressions(wfid).sort_by { |hexp| hexp['fei']['expid'] }.first
end

#find_root_expressions(wfid) ⇒ Object

For a given wfid, returns all the expressions (array of Hash instances) that have a nil ‘parent_id’.



122
123
124
125
# File 'lib/ruote/storage/base.rb', line 122

def find_root_expressions(wfid)

  find_expressions(wfid).select { |hexp| hexp['parent_id'].nil? }
end

#get_configuration(key) ⇒ Object

– configurations ++



70
71
72
73
# File 'lib/ruote/storage/base.rb', line 70

def get_configuration(key)

  get('configurations', key)
end

#get_engine_variable(k) ⇒ Object

– engine variables ++



221
222
223
224
# File 'lib/ruote/storage/base.rb', line 221

def get_engine_variable(k)

  get_engine_variables['variables'][k]
end

#get_msgsObject



98
99
100
101
# File 'lib/ruote/storage/base.rb', line 98

def get_msgs

  get_many('msgs', nil, :limit => 300).sort_by { |d| d['put_at'] }
end

#get_schedules(delta, now) ⇒ Object

– ats and crons ++



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/ruote/storage/base.rb', line 173

def get_schedules(delta, now)

  # TODO : bring that 'optimization' back in,
  #        maybe every minute, if min != last_min ...

  #if delta < 1.0
  #  at = now.strftime('%Y%m%d%H%M%S')
  #  get_many('schedules', /-#{at}$/)
  #elsif delta < 60.0
  #  at = now.strftime('%Y%m%d%H%M')
  #  scheds = get_many('schedules', /-#{at}\d\d$/)
  #  filter_schedules(scheds, now)
  #else # load all the schedules

  scheds = get_many('schedules')
  filter_schedules(scheds, now)

  #end
end

#get_trackers(wfid = nil) ⇒ Object

Some storage implementation might need the wfid information when adding or removing trackers.



163
164
165
166
167
# File 'lib/ruote/storage/base.rb', line 163

def get_trackers(wfid=nil)

  get('variables', 'trackers') ||
    { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} }
end

#put_engine_variable(k, v) ⇒ Object



226
227
228
229
230
231
232
# File 'lib/ruote/storage/base.rb', line 226

def put_engine_variable(k, v)

  vars = get_engine_variables
  vars['variables'][k] = v

  put_engine_variable(k, v) unless put(vars).nil?
end

#put_msg(action, options) ⇒ Object

– messages ++



91
92
93
94
95
96
# File 'lib/ruote/storage/base.rb', line 91

def put_msg(action, options)

  msg = prepare_msg_doc(action, options)

  put(msg)
end

#put_schedule(flavour, owner_fei, s, msg) ⇒ Object

Places schedule in storage. Returns the id of the ‘schedule’ document. If the schedule got triggered immediately, nil is returned.



196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/ruote/storage/base.rb', line 196

def put_schedule(flavour, owner_fei, s, msg)

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  r = put(doc)

  raise "put_schedule failed" if r != nil

  doc['_id']
end

#remove_process(wfid) ⇒ Object

Removes a process by removing all its schedules, expressions, errors, workitems and trackers.

Warning: will not trigger any cancel behaviours at all, just removes the process.



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/ruote/storage/base.rb', line 283

def remove_process(wfid)

  2.times do
    # two passes

    Thread.pass

    %w[ schedules expressions errors workitems ].each do |type|
      get_many(type, wfid).each { |d| delete(d) }
    end

    doc = get_trackers

    doc['trackers'].delete_if { |k, v| k.end_with?("!#{wfid}") }

    @context.storage.put(doc)
  end
end

#replace_engine_configuration(options) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/ruote/storage/base.rb', line 75

def replace_engine_configuration(options)

  return if options.delete('preserve_configuration')

  conf = get('configurations', 'engine')

  doc = options.merge('type' => 'configurations', '_id' => 'engine')
  doc['_rev'] = conf['_rev'] if conf

  put(doc)
end

#reserve(doc) ⇒ Object

Attempts to delete a document, returns true if the deletion succeeded. This is used with msgs to reserve work on them.



53
54
55
56
# File 'lib/ruote/storage/base.rb', line 53

def reserve(doc)

  delete(doc).nil?
end

#workerObject

Warning, this is not equivalent to doing @context.worker, this method fetches the worker from the local thread variables.



61
62
63
64
# File 'lib/ruote/storage/base.rb', line 61

def worker

  Ruote.current_worker
end