Class: Ruote::Exp::ConcurrenceExpression
- Inherits:
-
FlowExpression
- Object
- FlowExpression
- Ruote::Exp::ConcurrenceExpression
- Defined in:
- lib/ruote/exp/fe_concurrence.rb
Overview
The ‘concurrence’ expression applies its child branches in parallel (well it makes a best effort to make them run in parallel).
concurrence do
alpha
bravo
end
attributes
The concurrence expression takes a number of attributes that allow for sophisticated control (especially at merge time).
:count
concurrence :count => 1 do
alpha
bravo
end
in that example, the concurrence will terminate as soon as 1 (count) of the branches replies. The other branch will get cancelled.
:count and :wait_for may point to a negative integer, meaning “all but x”.
concurrence :count => -2 do # all the branches replied but 2
# ...
end
:count can be shortened to :c.
:wait_for
This attribute accepts either an integer, either a list of tags.
When used with the integer, it’s equivalent to the :count attribute:
concurrence :wait_for => 1 do
# ...
end
It waits for 1 branch to respond and then moves on (concurrence over).
When used with a string (or an array), it extracts a list of tags and waits for the branches with those tags. Once all the tags have replied, the concurrence is over.
concurrence :wait_for => 'alpha, bravo' do
sequence :tag => 'alpha' do
# ...
end
sequence :tag => 'bravo' do
# ...
end
sequence :tag => 'charly' do
# ...
end
end
This concurrence will be over when the branches alpha and bravo have replied. The charly branch may have replied or not, it doesn’t matter.
:wait_for can be shortened to :wf.
:over_if (and :over_unless) attribute
Like the :count attribute controls how many branches have to reply before a concurrence ends, the :over attribute is used to specify a condition upon which the concurrence will [prematurely] end.
concurrence :over_if => '${f:over}'
alpha
bravo
charly
end
will end the concurrence as soon as one of the branches replies with a workitem whose field ‘over’ is set to true. (the remaining branches will get cancelled unless :remaining => :forget is set).
:over_unless needs no explanation.
:remaining
As said for :count, the remaining branches get cancelled. By setting :remaining to :forget (or ‘forget’), the remaining branches will continue their execution, forgotten.
concurrence :count => 1, :remaining => :forget do
alpha
bravo
end
:remaining can be shortened to :rem or :r.
The default is ‘cancel’, where all the remaining branches are cancelled while the hand is given back to the main flow.
There is a third setting, ‘wait’. It behaves like ‘cancel’, but the concurrence waits for the cancelled children to reply. The workitems from cancelled branches are merged in as well.
:merge
By default, the workitems override each others. By default, the first workitem to reply will win.
sequence do
concurrence do
alpha
bravo
end
charly
end
In that example, if ‘alpha’ replied first, the workitem that reaches ‘charly’ once ‘bravo’ replied will have the payload as seen/modified by ‘alpha’.
The :merge attribute determines which branch wins the merge.
-
first (default)
-
last
-
highest
-
lowest
highest and lowest refer to the position in the list of branch. It’s useful to set a fixed winner.
concurrence :merge => :highest do
alpha
bravo
end
makes sure that alpha’s version of the workitem wins.
:merge can be shortened to :m.
:merge_type
:merge_type => :override (default)
By default, the merge type is set to ‘override’, which means that the ‘winning’ workitem’s payload supplants all other workitems’ payloads.
:merge_type => :mix
Setting :merge_type to :mix, will actually attempt to merge field by field, making sure that the field value of the winner(s) are used.
:merge_type => :isolate
:isolate will rearrange the resulting workitem payload so that there is a new field for each branch. The name of each field is the index of the branch from ‘0’ to …
:merge_type => :stack
:stack will stack the workitems coming back from the concurrence branches in an array whose order is determined by the :merge attributes. The array is placed in the ‘stack’ field of the resulting workitem. Note that the :stack merge_type also creates a ‘stack_attributes’ field and populates it with the expanded attributes of the concurrence.
Thus
sequence do
concurrence :merge => :highest, :merge_type => :stack do
reviewer1
reviewer2
end
editor
end
will see the ‘editor’ receive a workitem whose fields look like :
{ 'stack' => [{ ... reviewer1 fields ... }, { ... reviewer2 fields ... }],
'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } }
This could prove useful for participant having to deal with multiple merge strategy results.
:merge_type => :union
(Available from ruote 2.3.0)
Will override atomic fields, concat arrays and merge hashes…
The union of those two workitems
{ 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' }
{ 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' }
will be
{ 'a' => 1,
'b' => [ 'x', 'y', 'z' ],
'c' => { 'aa' => 'bb', 'cc' => 'dd' } }
Warning: duplicates in arrays present before the merge will be removed as well.
:merge_type => :concat
(Available from ruote 2.3.0)
Much like :union, but duplicates are not removed. Thus
{ 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' }
{ 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' }
will be
{ 'a' => 1,
'b' => [ 'x', 'y', 'y', 'z' ],
'c' => { 'aa' => 'bb', 'cc' => 'dd' } }
:merge_type => :deep
(Available from ruote 2.3.0)
Identical to :concat but hashes are merged with deep_merge (ActiveSupport flavour).
:merge_type => :ignore
(Available from ruote 2.3.0)
A very simple merge type, the workitems given back by the branches are simply discarded and the workitem as passed to the concurrence expression is used to reply to the parent expression (of the concurrence expression).
:merge_type can be shortened to :mt.
Direct Known Subclasses
Constant Summary collapse
- COUNT_R =
/^-?\d+$/
Constants inherited from FlowExpression
FlowExpression::COMMON_ATT_KEYS
Instance Attribute Summary
Attributes inherited from FlowExpression
Instance Method Summary collapse
- #apply ⇒ Object
-
#is_concurrent? ⇒ Boolean
This method is used by some walking routines when analyzsing execution trees.
- #reply(workitem) ⇒ Object
Methods inherited from FlowExpression
#ancestor?, #applied_workitem, #att, #att_text, #attribute, #attribute_text, #attributes, #await, #cancel, #cancel_flanks, #cfei_at, #child_id, #child_ids, #compile_atts, #compile_variables, #debug_id, #deflate, #do, do_action, #do_apply, #do_cancel, #do_fail, #do_pause, #do_persist, #do_reply, #do_reply_to_parent, #do_resume, #do_unpersist, dummy, #fei, fetch, from_h, #handle_on_error, #has_attribute, #initial_persist, #initialize, #iterative_var_lookup, #launch_sub, #lookup_val, #lookup_val_prefix, #lookup_variable, #name, names, #parent, #parent_id, #pause_on_apply, #persist_or_raise, #root, #root_id, #set_variable, #to_h, #tree, #tree_children, #try_persist, #try_unpersist, #unpersist_or_raise, #unset_variable, #update_tree, #variables, #wfid
Methods included from WithMeta
Methods included from WithH
Constructor Details
This class inherits a constructor from Ruote::Exp::FlowExpression
Instance Method Details
#apply ⇒ Object
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/ruote/exp/fe_concurrence.rb', line 281 def apply return do_reply_to_parent(h.applied_workitem) if tree_children.empty? # # count and wait_for count = (attribute(:count) || attribute(:c)).to_s count = nil unless COUNT_R.match(count) wf = count || attribute(:wait_for) || attribute(:wf) if COUNT_R.match(wf.to_s) h.ccount = wf.to_i elsif wf h.wait_for = Ruote.comma_split(wf) end # # other attributes h.cmerge = att( [ :merge, :m ], %w[ first last highest lowest ]) h.cmerge_type = att( [ :merge_type, :mt ], %w[ override mix isolate stack union ignore concat deep ]) h.remaining = att( [ :remaining, :rem, :r ], %w[ cancel forget wait ]) #h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {} # # now merging iteratively, not keeping track of all the workitems, # but still able to deal with old flows with active h.workitems # h.workitems = [] if %w[ highest lowest ].include?(h.cmerge) # # still need to keep track of rank to get the right merging h.over = false apply_children @context.storage.put_msg( 'reply', 'fei' => h.fei, 'workitem' => h.applied_workitem ) if h.ccount == 0 # # force an immediate reply end |
#is_concurrent? ⇒ Boolean
This method is used by some walking routines when analyzsing execution trees. Returns true for concurrence (and concurrent iterator).
276 277 278 279 |
# File 'lib/ruote/exp/fe_concurrence.rb', line 276 def is_concurrent? true end |
#reply(workitem) ⇒ Object
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/ruote/exp/fe_concurrence.rb', line 332 def reply(workitem) workitem = Ruote.fulldup(workitem) # # since workitem field merging might happen, better to work on # a copy of the workitem (so that history, coming afterwards, # doesn't see a modified version of the workitem) if h.wait_for && tag = workitem['fields']['__left_tag__'] h.wait_for.delete(tag) end over = h.over h.over = over || over?(workitem) keep(workitem) # is done after the over? determination for its looks at 'winner' if (not over) && h.over # # just became 'over' reply_to_parent(nil) elsif h.over && h.remaining == 'wait' reply_to_parent(nil) elsif h.children.empty? do_unpersist || return @context.storage.put_msg( 'ceased', 'wfid' => h.fei['wfid'], 'fei' => h.fei, 'workitem' => workitem) else do_persist end end |