Module: Ruote::StorageBase

Included in:
CompositeStorage, FsStorage, HashStorage
Defined in:
lib/ruote/storage/base.rb

Overview

Base methods for storage implementations.

Instance Method Summary collapse

Instance Method Details

#clearObject

Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.

NOTE that it doesn't remove engine variables (danger)



267
268
269
270
271
272
# File 'lib/ruote/storage/base.rb', line 267

def clear

  %w[ msgs schedules errors expressions workitems ].each do |type|
    purge_type!(type)
  end
end

#contextObject

– misc ++



39
40
41
42
# File 'lib/ruote/storage/base.rb', line 39

def context

  @context ||= Ruote::Context.new(self)
end

#context=(c) ⇒ Object



44
45
46
47
# File 'lib/ruote/storage/base.rb', line 44

def context=(c)

  @context = c
end

#copy_to(target, opts = {}) ⇒ Object

Copies the content of this storage into a target storage.

Of course, the target storage may be a different implementation.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/ruote/storage/base.rb', line 239

def copy_to(target, opts={})

  counter = 0

  %w[
    configurations errors expressions msgs schedules variables workitems
  ].each do |type|

    ids(type).each do |id|

      item = get(type, id)

      item.delete('_rev')
      target.put(item)

      counter += 1
      puts("  #{type}/#{item['_id']}") if opts[:verbose]
    end
  end

  counter
end

#delete_schedule(schedule_id) ⇒ Object



206
207
208
209
210
211
212
# File 'lib/ruote/storage/base.rb', line 206

def delete_schedule(schedule_id)

  return if schedule_id.nil?

  s = get('schedules', schedule_id)
  delete(s) if s
end

#dump(type) ⇒ Object



299
300
301
302
303
304
# File 'lib/ruote/storage/base.rb', line 299

def dump(type)

  require 'yaml'

  YAML.dump({ type => get_many(type) })
end

#empty?(type) ⇒ Boolean

Returns:

  • (Boolean)


100
101
102
103
# File 'lib/ruote/storage/base.rb', line 100

def empty?(type)

  (get_many(type, nil, :count => true) == 0)
end

#expression_wfids(opts) ⇒ Object

Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).

Understands the :skip, :limit and :descending options.

This is a base implementation, different storage implementations may come up with different implementations (think CouchDB, which could provide a view for it).



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/ruote/storage/base.rb', line 141

def expression_wfids(opts)

  wfids = ids('expressions').collect { |fei| fei.split('!').last }.uniq.sort

  wfids = wfids.reverse if opts[:descending]

  skip = opts[:skip] || 0
  limit = opts[:limit] || wfids.length

  wfids[skip, limit]
end

#find_expressions(wfid) ⇒ Object

Given a wfid, returns all the expressions of that process instance.



111
112
113
114
# File 'lib/ruote/storage/base.rb', line 111

def find_expressions(wfid)

  get_many('expressions', wfid)
end

#find_root_expression(wfid) ⇒ Object

For a given wfid, fetches all the root expressions, sort by expid and return the first. Hopefully it's the right root_expression.



127
128
129
130
# File 'lib/ruote/storage/base.rb', line 127

def find_root_expression(wfid)

  find_root_expressions(wfid).sort_by { |hexp| hexp['fei']['expid'] }.first
end

#find_root_expressions(wfid) ⇒ Object

For a given wfid, returns all the expressions (array of Hash instances) that have a nil 'parent_id'.



119
120
121
122
# File 'lib/ruote/storage/base.rb', line 119

def find_root_expressions(wfid)

  find_expressions(wfid).select { |hexp| hexp['parent_id'].nil? }
end

#get_configuration(key) ⇒ Object

– configurations ++



69
70
71
72
# File 'lib/ruote/storage/base.rb', line 69

def get_configuration(key)

  get('configurations', key)
end

#get_engine_variable(k) ⇒ Object

– engine variables ++



218
219
220
221
# File 'lib/ruote/storage/base.rb', line 218

def get_engine_variable(k)

  get_engine_variables['variables'][k]
end

#get_msgsObject



95
96
97
98
# File 'lib/ruote/storage/base.rb', line 95

def get_msgs

  get_many('msgs', nil, :limit => 300).sort_by { |d| d['put_at'] }
end

#get_schedules(delta, now) ⇒ Object

– ats and crons ++



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/ruote/storage/base.rb', line 170

def get_schedules(delta, now)

  # TODO : bring that 'optimization' back in,
  #        maybe every minute, if min != last_min ...

  #if delta < 1.0
  #  at = now.strftime('%Y%m%d%H%M%S')
  #  get_many('schedules', /-#{at}$/)
  #elsif delta < 60.0
  #  at = now.strftime('%Y%m%d%H%M')
  #  scheds = get_many('schedules', /-#{at}\d\d$/)
  #  filter_schedules(scheds, now)
  #else # load all the schedules

  scheds = get_many('schedules')
  filter_schedules(scheds, now)

  #end
end

#get_trackers(wfid = nil) ⇒ Object

Some storage implementation might need the wfid information when adding or removing trackers.



160
161
162
163
164
# File 'lib/ruote/storage/base.rb', line 160

def get_trackers(wfid=nil)

  get('variables', 'trackers') ||
    { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} }
end

#put_engine_variable(k, v) ⇒ Object



223
224
225
226
227
228
229
# File 'lib/ruote/storage/base.rb', line 223

def put_engine_variable(k, v)

  vars = get_engine_variables
  vars['variables'][k] = v

  put_engine_variable(k, v) unless put(vars).nil?
end

#put_msg(action, options) ⇒ Object

– messages ++



90
91
92
93
# File 'lib/ruote/storage/base.rb', line 90

def put_msg(action, options)

  put(prepare_msg_doc(action, options))
end

#put_schedule(flavour, owner_fei, s, msg) ⇒ Object

Places schedule in storage. Returns the id of the 'schedule' document. If the schedule got triggered immediately, nil is returned.



193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/ruote/storage/base.rb', line 193

def put_schedule(flavour, owner_fei, s, msg)

  doc = prepare_schedule_doc(flavour, owner_fei, s, msg)

  return nil unless doc

  r = put(doc)

  raise "put_schedule failed" if r != nil

  doc['_id']
end

#remove_process(wfid) ⇒ Object

Removes a process by removing all its schedules, expressions, errors, workitems and trackers.

Warning: will not trigger any cancel behaviours at all, just removes the process.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/ruote/storage/base.rb', line 280

def remove_process(wfid)

  2.times do
    # two passes

    Thread.pass

    %w[ schedules expressions errors workitems ].each do |type|
      get_many(type, wfid).each { |d| delete(d) }
    end

    doc = get_trackers

    doc['trackers'].delete_if { |k, v| k.end_with?("!#{wfid}") }

    @context.storage.put(doc)
  end
end

#replace_engine_configuration(options) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/ruote/storage/base.rb', line 74

def replace_engine_configuration(options)

  return if options.delete('preserve_configuration')

  conf = get('configurations', 'engine')

  doc = options.merge('type' => 'configurations', '_id' => 'engine')
  doc['_rev'] = conf['_rev'] if conf

  put(doc)
end

#reserve(doc) ⇒ Object

Attempts to delete a document, returns true if the deletion succeeded. This is used with msgs to reserve work on them.



52
53
54
55
# File 'lib/ruote/storage/base.rb', line 52

def reserve(doc)

  delete(doc).nil?
end

#workerObject

Warning, this is not equivalent to doing @context.worker, this method fetches the worker from the local thread variables.



60
61
62
63
# File 'lib/ruote/storage/base.rb', line 60

def worker

  Ruote.current_worker
end