Module: Ruote::StorageBase
- Included in:
- CompositeStorage, FsStorage, HashStorage
- Defined in:
- lib/ruote/storage/base.rb
Overview
Base methods for storage implementations.
Instance Method Summary collapse
-
#clear ⇒ Object
Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.
-
#context ⇒ Object
– misc ++.
- #context=(c) ⇒ Object
-
#copy_to(target, opts = {}) ⇒ Object
Copies the content of this storage into a target storage.
- #delete_schedule(schedule_id) ⇒ Object
- #dump(type) ⇒ Object
- #empty?(type) ⇒ Boolean
-
#expression_wfids(opts) ⇒ Object
Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).
-
#find_expressions(wfid) ⇒ Object
Given a wfid, returns all the expressions of that process instance.
-
#find_root_expression(wfid) ⇒ Object
For a given wfid, fetches all the root expressions, sort by expid and return the first.
-
#find_root_expressions(wfid) ⇒ Object
For a given wfid, returns all the expressions (array of Hash instances) that have a nil ‘parent_id’.
-
#get_configuration(key) ⇒ Object
– configurations ++.
-
#get_engine_variable(k) ⇒ Object
– engine variables ++.
- #get_msgs ⇒ Object
-
#get_schedules(delta, now) ⇒ Object
– ats and crons ++.
-
#get_trackers(wfid = nil) ⇒ Object
Some storage implementation might need the wfid information when adding or removing trackers.
- #put_engine_variable(k, v) ⇒ Object
-
#put_msg(action, options) ⇒ Object
– messages ++.
-
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
Places schedule in storage.
-
#remove_process(wfid) ⇒ Object
Removes a process by removing all its schedules, expressions, errors, workitems and trackers.
- #replace_engine_configuration(options) ⇒ Object
-
#reserve(doc) ⇒ Object
Attempts to delete a document, returns true if the deletion succeeded.
-
#worker ⇒ Object
Warning, this is not equivalent to doing @context.worker, this method fetches the worker from the local thread variables.
Instance Method Details
#clear ⇒ Object
Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.
NOTE that it doesn’t remove engine variables (danger)
270 271 272 273 274 275 |
# File 'lib/ruote/storage/base.rb', line 270 def clear %w[ msgs schedules errors expressions workitems ].each do |type| purge_type!(type) end end |
#context ⇒ Object
– misc ++
40 41 42 43 |
# File 'lib/ruote/storage/base.rb', line 40 def context @context ||= Ruote::Context.new(self) end |
#context=(c) ⇒ Object
45 46 47 48 |
# File 'lib/ruote/storage/base.rb', line 45 def context=(c) @context = c end |
#copy_to(target, opts = {}) ⇒ Object
Copies the content of this storage into a target storage.
Of course, the target storage may be a different implementation.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/ruote/storage/base.rb', line 242 def copy_to(target, opts={}) counter = 0 %w[ configurations errors expressions msgs schedules variables workitems ].each do |type| ids(type).each do |id| item = get(type, id) item.delete('_rev') target.put(item) counter += 1 puts(" #{type}/#{item['_id']}") if opts[:verbose] end end counter end |
#delete_schedule(schedule_id) ⇒ Object
209 210 211 212 213 214 215 |
# File 'lib/ruote/storage/base.rb', line 209 def delete_schedule(schedule_id) return if schedule_id.nil? s = get('schedules', schedule_id) delete(s) if s end |
#dump(type) ⇒ Object
302 303 304 305 306 307 |
# File 'lib/ruote/storage/base.rb', line 302 def dump(type) require 'yaml' YAML.dump({ type => get_many(type) }) end |
#empty?(type) ⇒ Boolean
103 104 105 106 |
# File 'lib/ruote/storage/base.rb', line 103 def empty?(type) (get_many(type, nil, :count => true) == 0) end |
#expression_wfids(opts) ⇒ Object
Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).
Understands the :skip, :limit and :descending options.
This is a base implementation, different storage implementations may come up with different implementations (think CouchDB, which could provide a view for it).
144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/ruote/storage/base.rb', line 144 def expression_wfids(opts) wfids = ids('expressions').collect { |fei| fei.split('!').last }.uniq.sort wfids = wfids.reverse if opts[:descending] skip = opts[:skip] || 0 limit = opts[:limit] || wfids.length wfids[skip, limit] end |
#find_expressions(wfid) ⇒ Object
Given a wfid, returns all the expressions of that process instance.
114 115 116 117 |
# File 'lib/ruote/storage/base.rb', line 114 def find_expressions(wfid) get_many('expressions', wfid) end |
#find_root_expression(wfid) ⇒ Object
For a given wfid, fetches all the root expressions, sort by expid and return the first. Hopefully it’s the right root_expression.
130 131 132 133 |
# File 'lib/ruote/storage/base.rb', line 130 def find_root_expression(wfid) find_root_expressions(wfid).sort_by { |hexp| hexp['fei']['expid'] }.first end |
#find_root_expressions(wfid) ⇒ Object
For a given wfid, returns all the expressions (array of Hash instances) that have a nil ‘parent_id’.
122 123 124 125 |
# File 'lib/ruote/storage/base.rb', line 122 def find_root_expressions(wfid) find_expressions(wfid).select { |hexp| hexp['parent_id'].nil? } end |
#get_configuration(key) ⇒ Object
– configurations ++
70 71 72 73 |
# File 'lib/ruote/storage/base.rb', line 70 def get_configuration(key) get('configurations', key) end |
#get_engine_variable(k) ⇒ Object
– engine variables ++
221 222 223 224 |
# File 'lib/ruote/storage/base.rb', line 221 def get_engine_variable(k) get_engine_variables['variables'][k] end |
#get_msgs ⇒ Object
98 99 100 101 |
# File 'lib/ruote/storage/base.rb', line 98 def get_msgs get_many('msgs', nil, :limit => 300).sort_by { |d| d['put_at'] } end |
#get_schedules(delta, now) ⇒ Object
– ats and crons ++
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/ruote/storage/base.rb', line 173 def get_schedules(delta, now) # TODO : bring that 'optimization' back in, # maybe every minute, if min != last_min ... #if delta < 1.0 # at = now.strftime('%Y%m%d%H%M%S') # get_many('schedules', /-#{at}$/) #elsif delta < 60.0 # at = now.strftime('%Y%m%d%H%M') # scheds = get_many('schedules', /-#{at}\d\d$/) # filter_schedules(scheds, now) #else # load all the schedules scheds = get_many('schedules') filter_schedules(scheds, now) #end end |
#get_trackers(wfid = nil) ⇒ Object
Some storage implementation might need the wfid information when adding or removing trackers.
163 164 165 166 167 |
# File 'lib/ruote/storage/base.rb', line 163 def get_trackers(wfid=nil) get('variables', 'trackers') || { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} } end |
#put_engine_variable(k, v) ⇒ Object
226 227 228 229 230 231 232 |
# File 'lib/ruote/storage/base.rb', line 226 def put_engine_variable(k, v) vars = get_engine_variables vars['variables'][k] = v put_engine_variable(k, v) unless put(vars).nil? end |
#put_msg(action, options) ⇒ Object
– messages ++
91 92 93 94 95 96 |
# File 'lib/ruote/storage/base.rb', line 91 def put_msg(action, ) msg = prepare_msg_doc(action, ) put(msg) end |
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
Places schedule in storage. Returns the id of the ‘schedule’ document. If the schedule got triggered immediately, nil is returned.
196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/ruote/storage/base.rb', line 196 def put_schedule(flavour, owner_fei, s, msg) doc = prepare_schedule_doc(flavour, owner_fei, s, msg) return nil unless doc r = put(doc) raise "put_schedule failed" if r != nil doc['_id'] end |
#remove_process(wfid) ⇒ Object
Removes a process by removing all its schedules, expressions, errors, workitems and trackers.
Warning: will not trigger any cancel behaviours at all, just removes the process.
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/ruote/storage/base.rb', line 283 def remove_process(wfid) 2.times do # two passes Thread.pass %w[ schedules expressions errors workitems ].each do |type| get_many(type, wfid).each { |d| delete(d) } end doc = get_trackers doc['trackers'].delete_if { |k, v| k.end_with?("!#{wfid}") } @context.storage.put(doc) end end |
#replace_engine_configuration(options) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/ruote/storage/base.rb', line 75 def replace_engine_configuration() return if .delete('preserve_configuration') conf = get('configurations', 'engine') doc = .merge('type' => 'configurations', '_id' => 'engine') doc['_rev'] = conf['_rev'] if conf put(doc) end |
#reserve(doc) ⇒ Object
Attempts to delete a document, returns true if the deletion succeeded. This is used with msgs to reserve work on them.
53 54 55 56 |
# File 'lib/ruote/storage/base.rb', line 53 def reserve(doc) delete(doc).nil? end |
#worker ⇒ Object
Warning, this is not equivalent to doing @context.worker, this method fetches the worker from the local thread variables.
61 62 63 64 |
# File 'lib/ruote/storage/base.rb', line 61 def worker Ruote.current_worker end |