Class: Ruote::Exp::FlowExpression

Inherits:
Object
  • Object
show all
Includes:
WithH, WithMeta
Defined in:
lib/ruote/exp/ro_on_x.rb,
lib/ruote/exp/ro_timers.rb,
lib/ruote/exp/ro_filters.rb,
lib/ruote/exp/ro_persist.rb,
lib/ruote/exp/ro_variables.rb,
lib/ruote/exp/ro_attributes.rb,
lib/ruote/exp/flow_expression.rb

Overview

Ruote is a process definition interpreter. It doesn’t directly “read” process definitions, it relies on a parser/generator to produce “abstract syntax trees” that look like

[ expression_name, { ... attributes ... }, [ children_expressions ] ]

The nodes (and leaves) in the trees are expressions. This is the base class for all expressions.

The most visible expressions are “define”, “sequence” and “participant”. Think :

pdef = Ruote.process_definition do
  sequence do
    participant :ref => 'customer'
    participant :ref => 'accounting'
    participant :ref => 'logistics'
  end
end

Each node is an expression…

the states of an expression

nil

the normal state

‘cancelling’

the expression and its children are getting cancelled

‘dying’

the expression and its children are getting killed

‘failed’

the expression has finishing

‘failing’

the expression just failed and it’s cancelling its children

‘timing_out’

the expression just timedout and it’s cancelling its children

‘paused’

the expression is paused, it will store downstream messages and play them only when a ‘resume’ message comes from upstream.

Defined Under Namespace

Classes: HandlerEntry

Constant Summary collapse

COMMON_ATT_KEYS =
%w[
  if unless
  forget lose flank
  timeout timers
  on_error on_cancel on_timeout
]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from WithMeta

#class_def, included

Methods included from WithH

included

Constructor Details

#initialize(context, h) ⇒ FlowExpression

Returns a new instance of FlowExpression.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/ruote/exp/flow_expression.rb', line 124

def initialize(context, h)

  @context = context

  @msg = nil
    # contains generally the msg the expression got instantiated for

  self.h = h

  h._id ||= Ruote.to_storage_id(h.fei)
  h['type'] ||= 'expressions'
  h.name ||= self.class.expression_names.first
  h.children ||= []
  h.applied_workitem['fei'] = h.fei
  h.created_time ||= Ruote.now_to_utc_s

  h.on_cancel ||= attribute(:on_cancel)
  h.on_error ||= attribute(:on_error)
  h.on_timeout ||= attribute(:on_timeout)
  h.on_terminate ||= attribute(:on_terminate)
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



114
115
116
# File 'lib/ruote/exp/flow_expression.rb', line 114

def context
  @context
end

#errorObject

Mostly used when the expression is returned via Ruote::Engine#ps(wfid) or Ruote::Engine#processes(). If an error occurred for this flow expression, #ps will set this error field so that it yields the ProcessError.

So, for short, usually, this attribute yields nil.



122
123
124
# File 'lib/ruote/exp/flow_expression.rb', line 122

def error
  @error
end

#hObject

Returns the value of attribute h.



99
100
101
# File 'lib/ruote/exp/flow_expression.rb', line 99

def h
  @h
end

Class Method Details

.do_action(context, msg) ⇒ Object

Called by the worker when it has something to do for a FlowExpression.



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/ruote/exp/flow_expression.rb', line 320

def self.do_action(context, msg)

  fei = msg['fei']
  action = msg['action']

  if action == 'reply' && fei['engine_id'] != context.engine_id
    #
    # the reply has to go to another engine, let's locate the
    # 'engine participant' and give it the workitem/reply
    #
    # see ft_37 for a test/example

    engine_participant =
      context.plist.lookup(fei['engine_id'], msg['workitem'])

    raise(
      "no EngineParticipant found under name '#{fei['engine_id']}'"
    ) unless engine_participant

    engine_participant.reply(fei, msg['workitem'])
    return
  end

  # normal case

  fexp = nil

  n = context.storage.class.name.match(/Couch/) ? 3 : 1
    #
  n.times do |i|
    if fexp = fetch(context, msg['fei']); break; end
    sleep 0.028 unless i == (n - 1)
  end
    #
    # Simplify that once ruote-couch behaves

  fexp.do(action, msg) if fexp
end

.dummy(h) ⇒ Object

Returns a dummy expression. Only used by the error_handler service.



53
54
55
56
57
58
59
60
61
# File 'lib/ruote/exp/ro_on_x.rb', line 53

def self.dummy(h)

  class << h; include Ruote::HashDot; end

  fe = self.allocate
  fe.instance_variable_set(:@h, h)

  fe
end

.fetch(context, fei) ⇒ Object

Fetches an expression from the storage and readies it for service.



293
294
295
296
297
298
299
300
# File 'lib/ruote/exp/flow_expression.rb', line 293

def self.fetch(context, fei)

  return nil if fei.nil?

  fexp = context.storage.get('expressions', Ruote.to_storage_id(fei))

  fexp ? from_h(context, fexp) : nil
end

.from_h(context, h) ⇒ Object

Instantiates expression back from hash.



282
283
284
285
286
287
288
289
# File 'lib/ruote/exp/flow_expression.rb', line 282

def self.from_h(context, h)

  return self.new(nil, h) unless context

  exp_class = context.expmap.expression_class(h['name'])

  exp_class.new(context, h)
end

.names(*exp_names) ⇒ Object

Keeping track of names and aliases for the expression



308
309
310
311
312
# File 'lib/ruote/exp/flow_expression.rb', line 308

def self.names(*exp_names)

  exp_names = exp_names.collect { |n| n.to_s }
  meta_def(:expression_names) { exp_names }
end

Instance Method Details

#ancestor?(fei) ⇒ Boolean

Returns true if the given fei points to an expression in the parent chain of this expression.

Returns:

  • (Boolean)


881
882
883
884
885
886
887
888
889
# File 'lib/ruote/exp/flow_expression.rb', line 881

def ancestor?(fei)

  fei = fei.to_h if fei.respond_to?(:to_h)

  return false unless h.parent_id
  return true if h.parent_id == fei

  parent.ancestor?(fei)
end

#applied_workitemObject

Returns a one-off Ruote::Workitem instance (the applied workitem).



256
257
258
259
# File 'lib/ruote/exp/flow_expression.rb', line 256

def applied_workitem

  @awi ||= Ruote::Workitem.new(h.applied_workitem)
end

#att(keys, values, opts = {}) ⇒ Object

Returns the value for attribute ‘key’, this value should be present in the array list ‘values’. If not, the default value is returned. By default, the default value is the first element of ‘values’.



75
76
77
78
79
80
81
82
# File 'lib/ruote/exp/ro_attributes.rb', line 75

def att(keys, values, opts={})

  default = opts[:default] || values.first

  val = Array(keys).collect { |key| attribute(key) }.compact.first.to_s

  values.include?(val) ? val : default
end

#att_text(workitem = h.applied_workitem) ⇒ Object

Equivalent to #attribute_text, but will return nil if there is no attribute whose values is nil.



139
140
141
142
143
144
# File 'lib/ruote/exp/ro_attributes.rb', line 139

def att_text(workitem=h.applied_workitem)

  text = attributes.keys.find { |k| attributes[k] == nil }

  text ? dsub(text.to_s, workitem) : nil
end

#attribute(n, workitem = h.applied_workitem, options = {}) ⇒ Object

Looks up the value for attribute n.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/ruote/exp/ro_attributes.rb', line 48

def attribute(n, workitem=h.applied_workitem, options={})

  n = n.to_s

  default = options[:default]
  escape = options[:escape]
  string = options[:to_s] || options[:string]

  v = attributes[n]

  v = if v == nil
    default
  elsif escape
    v
  else
    dsub(v, workitem)
  end

  v = v.to_s if v and string

  v
end

#attribute_text(workitem = h.applied_workitem) ⇒ Object

Given something like

sequence do
  participant 'alpha'
end

in the context of the participant expression

attribute_text()

will yield ‘alpha’.

Note : an empty text returns ”, not the nil value.



129
130
131
132
133
134
# File 'lib/ruote/exp/ro_attributes.rb', line 129

def attribute_text(workitem=h.applied_workitem)

  text = attributes.keys.find { |k| attributes[k] == nil }

  dsub(text.to_s, workitem)
end

#attributesObject

Returns the attributes of this expression (like { ‘ref’ => ‘toto’ } or { ‘timeout’ => ‘2d’ }.



935
936
937
938
# File 'lib/ruote/exp/flow_expression.rb', line 935

def attributes

  tree[1]
end

#await(att, msg) ⇒ Object

If the expression has an :await attribute, the expression gets into a special “awaiting” state until the condition in the value of :await gets triggered and the trigger calls resume on the expression.

Raises:

  • (::ArgumentError)


452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/ruote/exp/flow_expression.rb', line 452

def await(att, msg)

  action, condition =
    Ruote::Exp::AwaitExpression.extract_await_ac(:await => att)

  raise ::ArgumentError.new(
    ":await does not understand #{att.inspect}"
  ) if action == nil

  msg.merge!('flavour' => 'awaiting')

  h.state = 'awaiting'
  h.paused_apply = msg

  persist_or_raise

  @context.tracker.add_tracker(
    h.fei['wfid'],
    action,
    Ruote.to_storage_id(h.fei),
    condition,
    { '_auto_remove' => true,
      'action' => 'resume',
      'fei' => h.fei,
      'flavour' => 'awaiting' })
end

#cancel(flavour) ⇒ Object

This default implementation cancels all the [registered] children of this expression.



742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
# File 'lib/ruote/exp/flow_expression.rb', line 742

def cancel(flavour)

  return reply_to_parent(h.applied_workitem) if h.children.empty?
    #
    # there are no children, nothing to cancel, let's just reply to
    # the parent expression

  do_persist || return
    #
    # before firing the cancel message to the children
    #
    # if the do_persist returns false, it means it failed, implying this
    # expression is stale, let's return, thus discarding this cancel message

  children.each do |child_fei|
    #
    # let's send a cancel message to each of the children
    #
    # maybe some of them are gone or have not yet been applied, anyway,
    # the messages are sent

    @context.storage.put_msg(
      'cancel',
      'fei' => child_fei,
      'parent_id' => h.fei, # indicating that this is a "cancel child"
      'flavour' => flavour)
  end
end

#cancel_flanks(flavour) ⇒ Object

Emits a cancel message for each flanking expression (if any).



724
725
726
727
728
729
730
731
732
733
734
735
736
737
# File 'lib/ruote/exp/flow_expression.rb', line 724

def cancel_flanks(flavour)

  return unless h.flanks

  h.flanks.each do |flank_fei|

    @context.storage.put_msg(
      'cancel',
      'fei' => flank_fei,
      'parent_id' => h.fei,
        # indicating that this is a "cancel child", well...
      'flavour' => flavour)
  end
end

#cfei_at(i) ⇒ Object

Given an index, returns the child fei (among the currently registered children feis) whose fei.expid ends with this index (whose child_id is equal to that index).

Returns nil if not found or a child fei as a Hash.



267
268
269
270
# File 'lib/ruote/exp/flow_expression.rb', line 267

def cfei_at(i)

  children.find { |cfei| Ruote.extract_child_id(cfei) == i }
end

#child_idObject

Returns the child_id for this expression. (The rightmost part of the fei.expid).



180
181
182
183
# File 'lib/ruote/exp/flow_expression.rb', line 180

def child_id

  fei.child_id
end

#child_idsObject

Returns the list of child_ids (last part of the fei.expid) for the currently registered (active) children.



275
276
277
278
# File 'lib/ruote/exp/flow_expression.rb', line 275

def child_ids

  children.collect { |cfei| Ruote.extract_child_id(cfei) }
end

#compile_atts(opts = {}) ⇒ Object

Returns a Hash containing all attributes set for an expression with their values resolved.



108
109
110
111
112
113
# File 'lib/ruote/exp/ro_attributes.rb', line 108

def compile_atts(opts={})

  attributes.keys.each_with_object({}) { |k, r|
    r[dsub(k)] = attribute(k, h.applied_workitem, opts)
  }
end

#compile_variablesObject

Returns a fresh hash of all the variables visible from this expression.

This is used mainly when forgetting an expression.



44
45
46
47
48
49
50
# File 'lib/ruote/exp/ro_variables.rb', line 44

def compile_variables

  vars = h.parent_id ? parent.compile_variables : {}
  vars.merge!(h.variables) if h.variables

  vars
end

#debug_idObject

Outputs ids like “0_2!d218c1b”, no wfid, only <expid>!<subid>[0, 7]



39
40
41
42
# File 'lib/ruote/exp/ro_persist.rb', line 39

def debug_id

  "#{h.fei['expid']}!#{h.fei['subid'][0, 7]}"
end

#deflate(err) ⇒ Object

Given this expression and an error, deflates the error into a hash (serializable).



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/ruote/exp/ro_on_x.rb', line 37

def deflate(err)

  {
    'fei' => h.fei,
    'at' => Ruote.now_to_utc_s,
    'class' => err.class.to_s,
    'message' => err.message,
    'trace' => err.backtrace,
    'details' => err.respond_to?(:ruote_details) ? err.ruote_details : nil,
    'deviations' => err.respond_to?(:deviations) ? err.deviations : nil,
    'tree' => tree
  }
end

#do(action, msg) ⇒ Object

Wraps a call to “apply”, “reply”, etc… Makes sure to set @msg with a deep copy of the msg before.



362
363
364
365
366
367
# File 'lib/ruote/exp/flow_expression.rb', line 362

def do(action, msg)

  @msg = Ruote.fulldup(msg)

  send("do_#{action}", msg)
end

#do_apply(msg) ⇒ Object

Called by the worker when it has just created this FlowExpression and wants to apply it.



372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# File 'lib/ruote/exp/flow_expression.rb', line 372

def do_apply(msg)

  if msg['state'] == 'paused'

    return pause_on_apply(msg)
  end

  if msg['flavour'].nil? && (aw = attribute(:await))

    return await(aw, msg)
  end

  unless Condition.apply?(attribute(:if), attribute(:unless))

    return do_reply_to_parent(h.applied_workitem)
  end

  pi = h.parent_id
  reply_immediately = false

  if attribute(:scope).to_s == 'true'

    h.variables ||= {}
  end

  if attribute(:forget).to_s == 'true'

    h.variables = compile_variables
    h.parent_id = nil
    h.forgotten = true

    reply_immediately = true

  elsif attribute(:lose).to_s == 'true'

    h.lost = true

  elsif msg['flanking'] or (attribute(:flank).to_s == 'true')

    h.flanking = true

    reply_immediately = true
  end

  if reply_immediately and pi

    @context.storage.put_msg(
      'reply',
      'fei' => pi,
      'workitem' => Ruote.fulldup(h.applied_workitem),
      'flanking' => h.flanking)
  end

  filter

  consider_tag
  consider_timers

  apply
end

#do_cancel(msg) ⇒ Object

The raw handling of messages passed to expressions (the fine handling is done in the #cancel method).



682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
# File 'lib/ruote/exp/flow_expression.rb', line 682

def do_cancel(msg)

  flavour = msg['flavour']

  return if h.state == 'cancelling' && flavour != 'kill'
    # cancel on cancel gets discarded

  return if h.state == 'failed' && flavour == 'timeout'
    # do not timeout expressions that are "in error" (failed)

  h.state = case flavour
    when 'kill' then 'dying'
    when 'timeout' then 'timing_out'
    else 'cancelling'
  end

  if h.state == 'timing_out'

    h.applied_workitem['fields']['__timed_out__'] = [
      h.fei, Ruote.now_to_utc_s, tree.first, compile_atts
    ]

  elsif h.state == 'cancelling'

    if t = msg['on_cancel']

      h.on_cancel = t

    elsif ra_opts = msg['re_apply']

      ra_opts = {} if ra_opts == true
      ra_opts['tree'] ||= tree

      h.on_re_apply = ra_opts
    end
  end

  cancel(flavour)
end

#do_fail(msg) ⇒ Object

Called when handling an on_error, will place itself in a ‘failing’ state and cancel the children (when the reply from the children comes back, the on_error will get triggered).



775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
# File 'lib/ruote/exp/flow_expression.rb', line 775

def do_fail(msg)

  @h['state'] = 'failing'
  @h['applied_workitem'] = msg['workitem']

  if h.children.size < 1

    reply_to_parent(@h['applied_workitem'])

  else

    flavour = msg['immediate'] ? 'kill' : nil

    persist_or_raise

    h.children.each do |i|
      @context.storage.put_msg('cancel', 'fei' => i, 'flavour' => flavour)
    end
  end
end

#do_pause(msg) ⇒ Object

Expression received a “pause” message. Will put the expression in the “paused” state and then pass the message to the children.

If the expression is in a non-nil state (failed, timed_out, …), the message will be ignored.



802
803
804
805
806
807
808
809
810
811
812
813
# File 'lib/ruote/exp/flow_expression.rb', line 802

def do_pause(msg)

  return if h.state != nil

  h['state'] = 'paused'

  do_persist || return

  h.children.each { |i|
    @context.storage.put_msg('pause', 'fei' => i)
  } unless msg['breakpoint']
end

#do_persistObject

Make sure to persist (retry if necessary).



106
107
108
109
# File 'lib/ruote/exp/ro_persist.rb', line 106

def do_persist

  do_p(true)
end

#do_reply(msg) ⇒ Object Also known as: do_receive

Wraps #reply (does the administrative part of the reply work).



622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
# File 'lib/ruote/exp/flow_expression.rb', line 622

def do_reply(msg)

  workitem = msg['workitem']
  fei = workitem['fei']

  removed = h.children.delete(fei)
    # accept without any check ?

  if msg['flanking']

    (h.flanks ||= []) << fei

    if (not removed) # then it's a timer

      do_persist
      return
    end
  end

  if ut = msg['updated_tree']

    ct = tree.dup
    ct.last[Ruote::FlowExpressionId.child_id(fei)] = ut
    update_tree(ct)
  end

  if h.state == 'paused'

    (h['paused_replies'] ||= []) << msg

    do_persist

  elsif h.state != nil # failing or timing out ...

    if h.children.size < 1
      reply_to_parent(workitem)
    else
      persist_or_raise # for the updated h.children
    end

  else # vanilla reply

    reply(workitem)
  end
end

#do_reply_to_parent(workitem, delete = true) ⇒ Object

The essence of the reply_to_parent job…



493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
# File 'lib/ruote/exp/flow_expression.rb', line 493

def do_reply_to_parent(workitem, delete=true)

  # propagate the cancel "flavour" back, so that one can know
  # why a branch got cancelled.

  flavour = if @msg.nil?
    nil
  elsif @msg['action'] == 'cancel'
    @msg['flavour'] || 'cancel'
  elsif h.state.nil?
    nil
  else
    @msg['flavour']
  end

  # deal with the timers and the schedules

  %w[ timeout_schedule_id job_id ].each do |sid|
    @context.storage.delete_schedule(h[sid]) if h[sid]
  end
    #
    # legacy schedule ids, to be removed for ruote 2.4.0

  @context.storage.delete_schedule(h.schedule_id) if h.schedule_id
    #
    # time-driven exps like cron, wait and once now all use h.schedule_id

  h.timers.each do |schedule_id, action|
    @context.storage.delete_schedule(schedule_id)
  end if h.timers

  # cancel flanking expressions if any

  cancel_flanks(h.state == 'dying' ? 'kill' : nil)

  # trigger or vanilla reply

  if h.state == 'failing' # on_error is implicit (#do_fail got called)

    trigger('on_error', workitem)

  elsif h.state == 'cancelling' && h.on_cancel

    trigger('on_cancel', workitem)

  elsif h.state == 'cancelling' && h.on_re_apply

    trigger('on_re_apply', workitem)

  elsif h.state == 'timing_out' && h.on_timeout

    trigger('on_timeout', workitem)

  elsif h.state == nil && h.on_reply

    trigger('on_reply', workitem)

  elsif h.flanking && h.state.nil?
    #
    # do vanish

    do_unpersist

  elsif h.lost && h.state.nil?
    #
    # do not reply, sit here (and wait for cancellation probably)

    do_persist

  elsif h.trigger && workitem['fields']["__#{h.trigger}__"]
    #
    # the "second take"

    trigger(h.trigger, workitem)

  else # vanilla reply

    filter(workitem) if h.state.nil?

    f = h.state.nil? && attribute(:vars_to_f)
    Ruote.set(workitem['fields'], f, h.variables) if f

    workitem['sub_wf_name'] = @h.applied_workitem['sub_wf_name']
    workitem['sub_wf_revision'] = @h.applied_workitem['sub_wf_revision']

    leave_tag(workitem) if h.tagname

    (do_unpersist || return) if delete
      # remove expression from storage

    if h.parent_id && ! h.attached

      @context.storage.put_msg(
        'reply',
        'fei' => h.parent_id,
        'workitem' => workitem.merge!('fei' => h.fei),
        'updated_tree' => h.updated_tree, # nil most of the time
        'flavour' => flavour)

    else

      @context.storage.put_msg(
        (h.forgotten || h.attached) ? 'ceased' : 'terminated',
        'wfid' => h.fei['wfid'],
        'fei' => h.fei,
        'workitem' => workitem,
        'variables' => h.variables,
        'flavour' => flavour)

      if
        h.state.nil? &&
        h.on_terminate == 'regenerate' &&
        ! (h.forgotten || h.attached)
      then
        @context.storage.put_msg(
          'regenerate',
          'wfid' => h.fei['wfid'],
          'tree' => h.original_tree,
          'workitem' => workitem,
          'variables' => h.variables,
          'flavour' => flavour)
          #'stash' =>
      end
    end
  end
end

#do_resume(msg) ⇒ Object

Will “unpause” the expression (if it was paused), and trigger any ‘paused_replies’ (replies that came while the expression was paused).



818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
# File 'lib/ruote/exp/flow_expression.rb', line 818

def do_resume(msg)

  return unless h.state == 'paused' || h.state == 'awaiting'

  h['state'] = nil

  m = h.delete('paused_apply')
  return do_apply(m) if m
    # if it's a paused apply, pipe it directly to #do_apply

  replies = h.delete('paused_replies') || []

  do_persist || return

  h.children.each { |i| @context.storage.put_msg('resume', 'fei' => i) }
    # resume children

  replies.each { |m| @context.storage.put_msg(m.delete('action'), m) }
    # trigger replies
end

#do_unpersistObject

Make sure to unpersist (retry if necessary).



113
114
115
116
# File 'lib/ruote/exp/ro_persist.rb', line 113

def do_unpersist

  do_p(false)
end

#feiObject

Returns the Ruote::FlowExpressionId for this expression.



154
155
156
157
# File 'lib/ruote/exp/flow_expression.rb', line 154

def fei

  Ruote::FlowExpressionId.new(h.fei)
end

#handle_on_error(msg, error) ⇒ Object

Looks up parent with on_error attribute and triggers it



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/ruote/exp/ro_on_x.rb', line 65

def handle_on_error(msg, error)

  return false if h.state == 'failing'

  err = deflate(error)
  oe_parent = lookup_on_error(err)

  return false unless oe_parent
    # no parent with on_error attribute found

  handler = oe_parent.local_on_error(err)

  return false if handler.to_s == ''
    # empty on_error handler nullifies ancestor's on_error

  workitem = msg['workitem']
  workitem['fields']['__error__'] = err

  immediate = if handler.is_a?(String)
    !! handler.match(/^!/)
  elsif handler.is_a?(Array)
    !! handler.first.to_s.match(/^!/)
  else
    false
  end

  # NOTE: why not pass the handler in the msg?
  #       no, because of HandlerEntry (not JSON serializable)

  @context.storage.put_msg(
    'fail',
    'fei' => oe_parent.h.fei,
    'workitem' => workitem,
    'immediate' => immediate)

  true # yes, error is being handled.
end

#has_attribute(*args) ⇒ Object Also known as: has_att

Given a list of attribute names, returns the first attribute name for which there is a value.



37
38
39
40
41
42
# File 'lib/ruote/exp/ro_attributes.rb', line 37

def has_attribute(*args)

  args.each { |a| a = a.to_s; return a if attributes[a] != nil }

  nil
end

#initial_persistObject

Persists and fetches the _rev identifier from the storage.

Only used by the worker when creating the expression.



48
49
50
51
52
53
54
55
56
57
# File 'lib/ruote/exp/ro_persist.rb', line 48

def initial_persist

  r = @context.storage.put(@h, :update_rev => true)

  #t = Thread.current.object_id.to_s[-3..-1]
  #puts "+ per #{debug_id} #{tree[0]} r#{h._rev} t#{t} -> #{r.class}"
  #Ruote.p_caller('+ per')

  raise_or_return('initial_persist failed', r)
end

#is_concurrent?Boolean

Concurrent expressions (expressions that apply more than one child at a time) are supposed to return true here.

Returns:

  • (Boolean)


241
242
243
244
# File 'lib/ruote/exp/flow_expression.rb', line 241

def is_concurrent?

  false
end

#iterative_var_lookup(k) ⇒ Object

TODO : rdoc rewrite needed

This method is mostly used by the worker when looking up a process name or participant name bound under a variable.



114
115
116
117
118
119
120
121
# File 'lib/ruote/exp/ro_variables.rb', line 114

def iterative_var_lookup(k)

  v = lookup_variable(k)

  return [ k, v ] unless (v.is_a?(String) or v.is_a?(Symbol))

  iterative_var_lookup(v)
end

#launch_sub(pos, subtree, opts = {}) ⇒ Object

Launches a subprocesses (usually called from the #apply of certain expression implementations.



846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
# File 'lib/ruote/exp/flow_expression.rb', line 846

def launch_sub(pos, subtree, opts={})

  i = h.fei.merge(
    'subid' => Ruote.generate_subid(h.fei.inspect),
    'expid' => pos)

  if ci = opts[:child_id]
    i['subid'] = "#{i['subid']}k#{ci}"
  end

  #p '=== launch_sub ==='
  #p [ :launcher, h.fei['expid'], h.fei['subid'], h.fei['wfid'] ]
  #p [ :launched, i['expid'], i['subid'], i['wfid'] ]

  forget = opts[:forget]

  register_child(i) unless forget

  variables = (
    forget ? compile_variables : {}
  ).merge!(opts[:variables] || {})

  @context.storage.put_msg(
    'launch',
    'fei' => i,
    'parent_id' => forget ? nil : h.fei,
    'tree' => subtree,
    'workitem' => opts[:workitem] || h.applied_workitem,
    'variables' => variables,
    'forgotten' => forget)
end

#lookup_val(att_options = {}) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/ruote/exp/ro_attributes.rb', line 96

def lookup_val(att_options={})

  lval(
    VV,
    s_cartesian(%w[ v var variable ], VV),
    s_cartesian(%w[ f fld field ], VV),
    att_options)
end

#lookup_val_prefix(prefix, att_options = {}) ⇒ Object

prefix = ‘on’ => will lookup on, on_val, on_value, on_v, on_var, on_variable, on_f, on_fld, on_field…



87
88
89
90
91
92
93
94
# File 'lib/ruote/exp/ro_attributes.rb', line 87

def lookup_val_prefix(prefix, att_options={})

  lval(
    [ prefix ] + [ 'val', 'value' ].map { |s| "#{prefix}_#{s}" },
    %w[ v var variable ].map { |s| "#{prefix}_#{s}" },
    %w[ f fld field ].map { |s| "#{prefix}_#{s}" },
    att_options)
end

#lookup_variable(var, prefix = nil) ⇒ Object Also known as: v, lv

Looks up the value of a variable in expression tree (seen from a leaf, it looks more like a stack than a tree)



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/ruote/exp/ro_variables.rb', line 55

def lookup_variable(var, prefix=nil)

  var, prefix = split_prefix(var, prefix)

  if prefix == '//'
    return @context.storage.get_engine_variable(var)
  end

  if prefix == '/' && par = parent
    return par.lookup_variable(var, prefix)
  end

  if h.variables and Ruote.has_key?(h.variables, var)
    return Ruote.lookup(h.variables, var)
  end

  if h.parent_id && h.parent_id['engine_id'] == @context.engine_id
    #
    # do not lookup variables in a remote engine ...

    (return parent.lookup_variable(var, prefix)) rescue nil
      # if the lookup fails (parent gone) then rescue and let go
  end

  @context.storage.get_engine_variable(var)
end

#nameObject

Returns the name of this expression, like ‘sequence’, ‘participant’, ‘cursor’, etc…



927
928
929
930
# File 'lib/ruote/exp/flow_expression.rb', line 927

def name

  tree[0]
end

#parentObject

Fetches the parent expression, or returns nil if there is no parent expression.



188
189
190
191
192
193
# File 'lib/ruote/exp/flow_expression.rb', line 188

def parent

  h.parent_id ?
    Ruote::Exp::FlowExpression.fetch(@context, h.parent_id) :
    nil
end

#parent_idObject

Returns the Ruote::FlowExpressionIf of the parent expression, or nil if there is no parent expression.



170
171
172
173
174
175
# File 'lib/ruote/exp/flow_expression.rb', line 170

def parent_id

  h.parent_id ?
    Ruote::FlowExpressionId.new(h.parent_id) :
    nil
end

#pause_on_apply(msg) ⇒ Object

Called by #do_apply when msg == ‘paused’. Covers the “apply/launch it but it’s immediately paused” case. Freezes the apply message in h.paused_apply and saves the expression.



437
438
439
440
441
442
443
444
445
# File 'lib/ruote/exp/flow_expression.rb', line 437

def pause_on_apply(msg)

  msg['state'] = nil

  h.state = 'paused'
  h.paused_apply = msg

  persist_or_raise
end

#persist_or_raiseObject Also known as: persist



91
92
93
94
# File 'lib/ruote/exp/ro_persist.rb', line 91

def persist_or_raise

  p_or_raise(true)
end

#reply(workitem) ⇒ Object

A default implementation for all the expressions.



674
675
676
677
# File 'lib/ruote/exp/flow_expression.rb', line 674

def reply(workitem)

  reply_to_parent(workitem)
end

#reply_to_parent(workitem, delete = true) ⇒ Object

FlowExpression call this method when they’re done and they want their parent expression to take over (it will end up calling the #reply of the parent expression).

Expression implementations are free to override this method. The common behaviour is in #do_reply_to_parent.



486
487
488
489
# File 'lib/ruote/exp/flow_expression.rb', line 486

def reply_to_parent(workitem, delete=true)

  do_reply_to_parent(workitem, delete)
end

#root(stubborn = false) ⇒ Object

An expensive method, looks up all the expressions with the same wfid in the storage (for some storages this is not expensive at all), and determine the root of this expression. It does this by recursively going from an expression to its parent, starting with this expression. The root is when the parent can’t be reached.

By default, this method will always return an expression, but if stubborn is set to true and the top expression points to a gone parent then nil will be returned. The default (stubborn=true) is probably what you want anyway.



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/ruote/exp/flow_expression.rb', line 207

def root(stubborn=false)

  previous = nil
  current = @h

  exps = @context.storage.find_expressions(
    @h['fei']['wfid']
  ).each_with_object({}) { |exp, h|
    h[exp['fei']] = exp
  }

  while current && current['parent_id']
    previous = current
    current = exps[current['parent_id']]
  end

  current ||= previous unless stubborn

  current ? Ruote::Exp::FlowExpression.from_h(@context, current) : nil
end

#root_id(stubborn = false) ⇒ Object

Returns the fei of the root expression of this expression. The result is an instance of Ruote::FlowExpressionId.

Uses #root behind the scenes, hence the stubborn option.



233
234
235
236
# File 'lib/ruote/exp/flow_expression.rb', line 233

def root_id(stubborn=false)

  root(stubborn).fei
end

#set_variable(var, val, override = false) ⇒ Object

Sets a variable to a given value. (will set at the appropriate level).



93
94
95
96
97
98
# File 'lib/ruote/exp/ro_variables.rb', line 93

def set_variable(var, val, override=false)

  fexp, v = locate_set_var(var, override) || locate_var(var)

  fexp.un_set_variable(:set, v, val, (fexp.h.fei != h.fei)) if fexp
end

#to_hObject

Turns this FlowExpression instance into a Hash (well, just hands back the base hash behind it).



249
250
251
252
# File 'lib/ruote/exp/flow_expression.rb', line 249

def to_h

  @h
end

#treeObject

Returns the current version of the tree (returns the updated version if it got updated.



898
899
900
901
# File 'lib/ruote/exp/flow_expression.rb', line 898

def tree

  h.updated_tree || h.original_tree
end

#tree_childrenObject

Returns the “AST” view on the children of this expression…



942
943
944
945
# File 'lib/ruote/exp/flow_expression.rb', line 942

def tree_children

  tree[2]
end

#try_persistObject



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/ruote/exp/ro_persist.rb', line 59

def try_persist

  r = @context.storage.put(@h)

  #t = Thread.current.object_id.to_s[-3..-1]
  #puts "+ per #{debug_id} #{tree[0]} r#{h._rev} t#{t} -> #{r.class}"
  #p self.h.children.collect { |i| Ruote.sid(i) }
  #Ruote.p_caller('+ per')

  r
end

#try_unpersistObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/ruote/exp/ro_persist.rb', line 71

def try_unpersist

  r = @context.storage.delete(@h)

  #t = Thread.current.object_id.to_s[-3..-1]
  #puts "- unp #{debug_id} #{tree[0]} r#{h._rev} t#{t} -> #{r.class}"
  #Ruote.p_caller('- unp')

  return r if r

  #if h.has_error
  err = @context.storage.get('errors', "err_#{Ruote.to_storage_id(h.fei)}")
  @context.storage.delete(err) if err
  #end
    # removes any error in the journal for this expression
    # since it will now be gone, no need to keep track of its errors

  nil
end

#unpersist_or_raiseObject Also known as: unpersist



96
97
98
99
# File 'lib/ruote/exp/ro_persist.rb', line 96

def unpersist_or_raise

  p_or_raise(false)
end

#unset_variable(var, override = false) ⇒ Object

Unbinds a variables.



102
103
104
105
106
107
# File 'lib/ruote/exp/ro_variables.rb', line 102

def unset_variable(var, override=false)

  fexp, v = locate_set_var(var, override) || locate_var(var)

  fexp.un_set_variable(:unset, v, nil, (fexp.h.fei != h.fei)) if fexp
end

#update_tree(t = nil) ⇒ Object

Updates the tree of this expression

update_tree(t)

will set the updated tree to t

update_tree

will copy (deep copy) the original tree as the updated_tree.

Adding a child to a sequence expression :

seq.update_tree
seq.updated_tree[2] << [ 'participant', { 'ref' => 'bob' }, [] ]
seq.do_persist


919
920
921
922
# File 'lib/ruote/exp/flow_expression.rb', line 919

def update_tree(t=nil)

  h.updated_tree = t || Ruote.fulldup(h.original_tree)
end

#variablesObject

A shortcut to the variables held in the expression (nil if none held).



35
36
37
38
# File 'lib/ruote/exp/ro_variables.rb', line 35

def variables

  @h['variables']
end

#wfidObject

Returns the workflow instance id of the workflow this expression belongs to.



162
163
164
165
# File 'lib/ruote/exp/flow_expression.rb', line 162

def wfid

  h.fei['wfid']
end