Class: Ruote::Exp::AwaitExpression

Inherits:
FlowExpression show all
Defined in:
lib/ruote/exp/fe_await.rb

Overview

The ‘await’ expression is the successor of the ‘listen’ expression (Ruote::Exp::ListenExpression). It’s been introduced in ruote 2.3.0.

Hopefully it has a simpler syntax than ‘listen’. The major difference between listen and await is that await, by default, listens only to events in the same process instance.

This expression blocks until an event occurs somewhere else in the same process instance.

concurrence do
  sequence do
    participant 'alice'
    await :left_tag => 'a'
    participant 'bob'
  end
  sequence :tag => 'a' do
    participant 'charly'
    participant 'doug'
  end
  sequence do
    participant 'eric'
  end
end

In this example, the flow between alice and bob, will block until ‘doug’ has replied (Yes, this example could have been written using :left_participant => ‘doug’).

modes

‘await’, like ‘listen’ works in two mode, “once” and “multiple times”.

In the ‘once’ mode, the await expression blocks the flow until the workitem (somewhere else in the process instance) leaves the alice participant:

sequence do
  await :left_participant => 'alice'
  participant 'eric'
end

In the ‘multiple times’ mode, the await expression triggers its children expressions each time a matching event occurs. It never replies to its parent expression. Here, the participant ‘post_alice’ will receive a workitem each time, somewhere else in the process

await :left_participant => 'alice' do
  participant 'post_alice'
end

Note, that, unlike “listen” in previous versions of ruote, it’s OK to consider the children of “await” as part of an implicit sequence:

await :left_participant => 'alice' do
  participant 'bob'
  participant 'charly'
end

Bob and charly will be applied in sequence.

tag, participant or error events

Await listens to 3 types of events: tag, participant and error events.

Here is a assortment of examples:

await :in_participant => 'alice'
await :reached_participant => 'alice'
await :out_participant => 'alice'
await :left_participant => 'alice'

await :participant => 'alice'
await :participants => 'alice' # looks better with an array

# implicit OR with arrays:
#
await :in_participant => /^al/
await :in_participant => %w[ alice alfred ]
await :in_participant => [ /^al/, /fred$/ ]

await :in_tag => 'phase2'
await :reached_tag => 'phase2'
await :out_tag => 'phase2'
await :left_tag => 'phase2'

await :tags => 'phase2'

await :error
await :error => 'ArgumentError'
await :error => 'RuntimeError, ArgumentError'
await :error => %w[ RuntimeError ArgumentError ]

Basically, the attribute is composed of a left part and a right part. The right part is one of “in”, “reached” or “out”, “left”. “in” and “reached” are equivalent, as are “out” and “left”.

The right part is “tag” or “participant”.

“error” can be used alone.

When “tags” and “participant(s)” are used alone, they are synonymous with “reached_tag” and “reached_participant” respectively.

absolute tags

It’s OK to specify absolute tags, like in:

pdef = Ruote.define do
  concurrence do
    sequence do
      await :tag => 'a/b'
      echo 'a/b'
    end
    sequence :tag => 'a' do
      noop
      sequence :tag => 'b' do
        echo 'b'
      end
    end
  end
end

:where condition

The “await” expression accepts an optional attribute which adds another guard which is checked to determine if the trigger should occur or not.

pdef = Ruote.process_definition do
  concurrence :wait_for => 1 do
    await :left_participant => 'a', :where => "${task} == 'sing'" do
      echo 'sing-a'
    end
    await :left_participant => 'a' do
      echo 'any-a'
    end
    concurrence do
      participant 'a', :task => 'talk'
      participant 'a', :task => 'sing'
    end
  end
end

In this example, the message ‘sing-a’ will be echoed only once (twice for ‘any-a’).

:global => false by default

Unlike the ‘listen’ expression, ‘await’, by default, only triggers for events in the same process instance. To react on events whatever the process instance, :global => true (or “true”) can be used.

await :left_tag => 'phase1', :global => true do
  participant 'supervisor', :msg => 'phase1 over'
end

:merge => nil/ignore

In the listen expression, the default is for the event’s workitem to get merged into the waiting workitem. With ‘await’, the default is the event’s workitem completely overriding the awaiting workitem.

Using the :merge attribute, other behaviours are possible.

await :left_tag => 'phase3', :merge => 'ignore'
await :left_tag => 'phase3', :merge => 'drop'
  # the event's workitem is ignored, the awaiting workitem is used

await :left_tag => 'phase3', :merge => 'override'
  # the event's workitem is used, this is the default

await :left_tag => 'phase3', :merge => 'incoming'
  # a hash merge happens, the incoming (event) workitem wins
  # workitem = awaiting.merge(incoming)

await :left_tag => 'phase3', :merge => 'awaiting'
  # a hash merge happens, the awaiting workitem wins
  # workitem = incoming.merge(awaiting)

Note: the :where guard is always about the event’s workitem (not the workitem as it reached the ‘await’ expression).

note: the “await” attribute

(since ruote 3.2.1)

“listen” and “await” are both ruote expressions. There is also an attribute common to all the expressions: :await.

concurrence do
  sequence do
    alice
    sequence :tag => 'stage2' do
      bob
    end
  end
  sequence do
    charly
    diana :await => 'left_tag:stage2'
    eliza
  end
  frank
end

The expressions with an await attribute suspends its application until the awaited event happens.

This await attribute, defaults to “left_tag:”, so

diana :await => 'stage2'
  # is equivalent to
diana :await => 'left_tag:stage2'

Constant Summary

Constants inherited from FlowExpression

FlowExpression::COMMON_ATT_KEYS

Instance Attribute Summary

Attributes inherited from FlowExpression

#context, #error, #h

Instance Method Summary collapse

Methods inherited from FlowExpression

#ancestor?, #applied_workitem, #att, #att_text, #attribute, #attribute_text, #attributes, #await, #cancel, #cancel_flanks, #cfei_at, #child_id, #child_ids, #compile_atts, #compile_variables, #debug_id, #deflate, #do, do_action, #do_apply, #do_cancel, #do_fail, #do_pause, #do_persist, #do_reply, #do_reply_to_parent, #do_resume, #do_unpersist, dummy, #fei, fetch, from_h, #handle_on_error, #has_attribute, #initial_persist, #initialize, #is_concurrent?, #iterative_var_lookup, #launch_sub, #lookup_val, #lookup_val_prefix, #lookup_variable, #name, names, #parent, #parent_id, #pause_on_apply, #persist_or_raise, #root, #root_id, #set_variable, #to_h, #tree, #tree_children, #try_persist, #try_unpersist, #unpersist_or_raise, #unset_variable, #update_tree, #variables, #wfid

Methods included from WithMeta

#class_def, included

Methods included from WithH

included

Constructor Details

This class inherits a constructor from Ruote::Exp::FlowExpression

Instance Method Details

#applyObject

Raises:

  • (ArgumentError)


251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/ruote/exp/fe_await.rb', line 251

def apply

  #
  # gathering info

  action, condition = self.class.extract_await_ac(attributes)

  raise ArgumentError.new(
    "couldn't determine which event to listen to from: " +
    attributes.inspect
  ) unless action

  global = (attribute(:global).to_s == 'true')
  global = false if action == 'error_intercepted'

  h.amerge = attribute(:merge).to_s

  persist_or_raise

  #
  # adding a new tracker

  @context.tracker.add_tracker(
    global ? nil : h.fei['wfid'],
    action,
    Ruote.to_storage_id(h.fei),
    condition,
    { 'action' => 'reply',
      'fei' => h.fei,
      'workitem' => 'replace',
      'flavour' => 'await' })
end

#reply(workitem) ⇒ Object



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/ruote/exp/fe_await.rb', line 284

def reply(workitem)

  #
  # :where guard

  where = attribute(:where, workitem)
  return if where && Condition.false?(where)

  #
  # merge

  wi = h.applied_workitem.dup

  wi['fields'] =
    case h.amerge
      when 'ignore', 'drop' then wi['fields']
      when 'incoming' then wi['fields'].merge(workitem['fields'])
      when 'awaiting' then workitem['fields'].merge(wi['fields'])
      else workitem['fields'] # 'override'
    end

  #
  # actual trigger

  if tree_children.any?

    i, t = if tree_children.size == 1
      [ "#{h.fei['expid']}_0", tree_children[0] ]
    else
      [ h.fei['expid'], [ 'sequence', {}, tree_children ] ]
    end

    launch_sub(i, t, :forget => true, :workitem => wi)

  else

    reply_to_parent(wi)
  end
end