Class: Ruote::StorageParticipant

Inherits:
Object
  • Object
show all
Includes:
Enumerable, LocalParticipant
Defined in:
lib/ruote/part/storage_participant.rb

Overview

A participant that stores the workitem in the same storage used by the engine and the worker(s).

part = engine.register_participant 'alfred', Ruote::StorageParticipant

# ... a bit later

puts "workitems still open : "
part.each do |workitem|
  puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end

# ... when done with a workitem

part.reply(workitem)
  # this will remove the workitem from the storage and hand it back
  # to the engine

Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).

Instance Attribute Summary

Attributes included from LocalParticipant

#context, #error, #fei, #flavour, #msg, #workitem

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Enumerable

#each_with_object

Methods included from LocalParticipant

#_accept?, #_dont_thread?, #_on_cancel, #_on_reply, #_on_workitem, #_rtimeout, #applied_workitem, #fexp, #is_cancelled?, #is_gone?, #lookup_variable, #participant_name, #re_dispatch, #reply_to_engine, #unschedule_re_dispatch

Methods included from ReceiverMixin

#fetch_flow_expression, #fetch_workitem, #launch, #receive, #sign

Constructor Details

#initialize(engine_or_options = {}, options = nil) ⇒ StorageParticipant

Returns a new instance of StorageParticipant.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/ruote/part/storage_participant.rb', line 58

def initialize(engine_or_options={}, options=nil)

  if engine_or_options.respond_to?(:context)
    @context = engine_or_options.context
  elsif engine_or_options.is_a?(Ruote::Context)
    @context = engine_or_options
  else
    @options = engine_or_options
  end

  @options ||= {}

  @store_name = @options['store_name']
end

Class Method Details

.matches?(hwi, pname, criteria) ⇒ Boolean

Used by #query when filtering workitems.

Returns:

  • (Boolean)


331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/ruote/part/storage_participant.rb', line 331

def self.matches?(hwi, pname, criteria)

  return false if pname && hwi['participant_name'] != pname

  fields = hwi['fields']

  criteria.each do |fname, fvalue|
    return false if fields[fname] != fvalue
  end

  true
end

Instance Method Details

#[](fei) ⇒ Object Also known as: by_fei

Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).



144
145
146
147
148
149
# File 'lib/ruote/part/storage_participant.rb', line 144

def [](fei)

  doc = fetch(fei)

  doc ? Ruote::Workitem.new(doc) : nil
end

#all(opts = {}) ⇒ Object

Returns all the workitems stored in here.



209
210
211
212
213
214
# File 'lib/ruote/part/storage_participant.rb', line 209

def all(opts={})

  res = fetch_all(opts)

  res.is_a?(Array) ? res.map { |hwi| Ruote::Workitem.new(hwi) } : res
end

#by_field(field, value = nil, opts = {}) ⇒ Object

field : returns all the workitems with the given field name present.

field and value : returns all the workitems with the given field name and the given value for that field.

Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.



257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/ruote/part/storage_participant.rb', line 257

def by_field(field, value=nil, opts={})

  (value, opts = nil, value) if value.is_a?(Hash)

  if @context.storage.respond_to?(:by_field)
    return @context.storage.by_field('workitems', field, value, opts)
  end

  do_select(opts) do |hwi|
    hwi['fields'].keys.include?(field) &&
    (value.nil? || hwi['fields'][field] == value)
  end
end

#by_participant(participant_name, opts = {}) ⇒ Object

Returns all workitems for the specified participant name



237
238
239
240
241
242
243
244
245
246
# File 'lib/ruote/part/storage_participant.rb', line 237

def by_participant(participant_name, opts={})

  return @context.storage.by_participant(
    'workitems', participant_name, opts
  ) if @context.storage.respond_to?(:by_participant)

  do_select(opts) do |hwi|
    hwi['participant_name'] == participant_name
  end
end

#by_wfid(wfid, opts = {}) ⇒ Object

Return all workitems for the specified wfid



226
227
228
229
230
231
232
233
# File 'lib/ruote/part/storage_participant.rb', line 226

def by_wfid(wfid, opts={})

  if @context.storage.respond_to?(:by_wfid)
    return @context.storage.by_wfid('workitems', wfid, opts)
  end

  wis(@context.storage.get_many('workitems', wfid, opts))
end

#delegate(workitem, new_owner) ⇒ Object

Delegates a currently owned workitem to a new owner.

Fails if the workitem can’t be found, belongs to noone, or if the workitem passed as argument is out of date (got modified in the mean time).

It’s OK to delegate to nil, thus freeing the workitem.

See #reserve for an an explanation of the reserve/delegate/proceed flow.



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/ruote/part/storage_participant.rb', line 405

def delegate(workitem, new_owner)

  hwi = fetch(workitem)

  fail ArgumentError.new(
    "workitem not found"
  ) if hwi == nil

  fail ArgumentError.new(
    "cannot delegate, workitem doesn't belong to anyone"
  ) if hwi['owner'] == nil

  fail ArgumentError.new(
    "cannot delegate, " +
    "workitem owned by '#{hwi['owner']}', not '#{workitem.owner}'"
  ) if hwi['owner'] != workitem.owner

  hwi['owner'] = new_owner

  r = @context.storage.put(hwi, :update_rev => true)

  fail ArgumentError.new("workitem is gone") if r == true
  fail ArgumentError.new("workitem got modified meanwhile") if r != nil

  Workitem.new(hwi)
end

#do_not_threadObject

No need for a separate thread when delivering to this participant.



75
# File 'lib/ruote/part/storage_participant.rb', line 75

def do_not_thread; true; end

#do_update(workitem = @workitem) ⇒ Object

Added for groups.google.com/forum/?fromgroups#!topic/openwferu-users/5bpV2yfKwM0

Makes sure the workitem get saved to the storage. Fails if the workitem is already gone. Returns nil in case of success.



115
116
117
118
119
120
121
122
123
124
# File 'lib/ruote/part/storage_participant.rb', line 115

def do_update(workitem=@workitem)

  r = update(workitem)

  fail ArgumentError.new("workitem is gone") if r == true
  return nil if r.nil?

  r.h['fields'] = workitem.fields
  do_update(r)
end

#each(&block) ⇒ Object

Iterates over the workitems stored in here.



202
203
204
205
# File 'lib/ruote/part/storage_participant.rb', line 202

def each(&block)

  all.each { |wi| block.call(wi) }
end

#firstObject

A convenience method (especially when testing), returns the first (only ?) workitem in the participant.



219
220
221
222
# File 'lib/ruote/part/storage_participant.rb', line 219

def first

  wi(fetch_all({}).first)
end

#flunk(workitem, err_class_or_instance, *err_arguments) ⇒ Object

Removes the workitem and hands it back to the flow with an error to raise for the participant expression that emitted the workitem.



169
170
171
172
173
174
175
176
177
178
# File 'lib/ruote/part/storage_participant.rb', line 169

def flunk(workitem, err_class_or_instance, *err_arguments)

  r = remove_workitem('reject', workitem)

  return flunk(workitem) if r != nil

  workitem.h.delete('_rev')

  super(workitem, err_class_or_instance, *err_arguments)
end

#on_cancelObject

Removes the document/workitem from the storage.

Warning: this method is called by the engine (worker), i.e. not by you.



130
131
132
133
134
135
136
137
138
139
# File 'lib/ruote/part/storage_participant.rb', line 130

def on_cancel

  doc = fetch(fei)

  return unless doc

  r = @context.storage.delete(doc)

  on_cancel(fei, flavour) if r != nil
end

#on_workitemObject

This is the method called by ruote when passing a workitem to this participant.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/ruote/part/storage_participant.rb', line 80

def on_workitem

  doc = workitem.to_h

  doc.merge!(
    'type' => 'workitems',
    '_id' => to_id(doc['fei']),
    'participant_name' => doc['participant_name'],
    'wfid' => doc['fei']['wfid'])

  doc['store_name'] = @store_name if @store_name

  @context.storage.put(doc)
end

#per_participantObject

Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.



347
348
349
350
# File 'lib/ruote/part/storage_participant.rb', line 347

def per_participant

  each_with_object({}) { |wi, h| (h[wi.participant_name] ||= []) << wi }
end

#per_participant_countObject

Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.



356
357
358
359
# File 'lib/ruote/part/storage_participant.rb', line 356

def per_participant_count

  per_participant.remap { |(k, v), h| h[k] = v.size }
end

#proceed(workitem) ⇒ Object

Removes the workitem from the storage and replies to the engine.



155
156
157
158
159
160
161
162
163
164
# File 'lib/ruote/part/storage_participant.rb', line 155

def proceed(workitem)

  r = remove_workitem('proceed', workitem)

  return proceed(workitem) if r != nil

  workitem.h.delete('_rev')

  reply_to_engine(workitem)
end

#purge!Object

Cleans this participant out completely



324
325
326
327
# File 'lib/ruote/part/storage_participant.rb', line 324

def purge!

  fetch_all({}).each { |hwi| @context.storage.delete(hwi) }
end

#query(criteria) ⇒ Object

Queries the store participant for workitems.

Some examples :

part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size

There are two ‘reserved’ criterion : ‘wfid’ and ‘participant’ (‘participant_name’ as well). The rest of the criteria are considered constraints for fields.

‘offset’ and ‘limit’ are reserved as well. They should prove useful for pagination. ‘skip’ can be used instead of ‘offset’.

Note : the criteria is AND only, you’ll have to do ORs (aggregation) by yourself.



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/ruote/part/storage_participant.rb', line 290

def query(criteria)

  cr = Ruote.keys_to_s(criteria)

  if @context.storage.respond_to?(:query_workitems)
    return @context.storage.query_workitems(cr)
  end

  opts = {}
  opts[:skip] = cr.delete('offset') || cr.delete('skip')
  opts[:limit] = cr.delete('limit')
  opts[:count] = cr.delete('count')

  wfid = cr.delete('wfid')

  count = opts[:count]

  pname = cr.delete('participant_name') || cr.delete('participant')
  opts.delete(:count) if pname

  hwis = wfid ?
    @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts)

  return hwis unless hwis.is_a?(Array)

  hwis = hwis.select { |hwi|
    Ruote::StorageParticipant.matches?(hwi, pname, cr)
  }

  count ? hwis.size : wis(hwis)
end

#reply(workitem) ⇒ Object

(soon to be removed)



182
183
184
185
186
187
188
189
190
191
# File 'lib/ruote/part/storage_participant.rb', line 182

def reply(workitem)

  puts '-' * 80
  puts '*** WARNING : please use the Ruote::StorageParticipant#proceed(wi)'
  puts '              instead of #reply(wi) which is deprecated'
  #caller.each { |l| puts l }
  puts '-' * 80

  proceed(workitem)
end

#reserve(workitem_or_fei, owner) ⇒ Object

Claims a workitem. Returns the [updated] workitem if successful.

Returns nil if the workitem is already reserved.

Fails if the workitem can’t be found, is gone, or got modified elsewhere.

Here is a mini-diagram explaining the reserve/delegate/proceed flow:

 in    delegate(nil)    delegate(other)
 |    +---------------+ +------+
 v    v               | |      v
+-------+  reserve   +----------+  proceed
| ready | ---------> | reserved | ---------> out
+-------+            +----------+


377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/ruote/part/storage_participant.rb', line 377

def reserve(workitem_or_fei, owner)

  hwi = fetch(workitem_or_fei)

  fail ArgumentError.new("workitem not found") if hwi.nil?

  return nil if hwi['owner'] && hwi['owner'] != owner

  hwi['owner'] = owner

  r = @context.storage.put(hwi, :update_rev => true)

  fail ArgumentError.new("workitem is gone") if r == true
  fail ArgumentError.new("workitem got modified meanwhile") if r != nil

  Workitem.new(hwi)
end

#sizeObject

Returns the count of workitems stored in this participant.



195
196
197
198
# File 'lib/ruote/part/storage_participant.rb', line 195

def size

  fetch_all(:count => true)
end

#update(workitem) ⇒ Object

Used by client code when “saving” a workitem (fields may have changed). Calling #update won’t proceed the workitem.

Returns nil in case of success, true if the workitem is already gone and the newer version of the workitem if the workitem changed in the mean time.



102
103
104
105
106
107
# File 'lib/ruote/part/storage_participant.rb', line 102

def update(workitem)

  r = @context.storage.put(workitem.h)

  r.is_a?(Hash) ? Ruote::Workitem.new(r) : r
end