Class: OpenWFE::ExpressionPool

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, FeiMixin, OwfeObservable, OwfeServiceLocator, ServiceMixin, WorkqueueMixin
Defined in:
lib/openwfe/expool/expressionpool.rb

Overview

The ExpressionPool stores expressions (pieces of workflow instance). It’s the core of the workflow engine. It relies on an expression storage for actual persistence of the expressions.

Instance Attribute Summary collapse

Attributes included from ServiceMixin

#service_name

Attributes included from Contextual

#application_context

Instance Method Summary collapse

Methods included from WorkqueueMixin

#is_workqueue_busy?, #queue_work, #start_workqueue, #stop_workqueue

Methods included from OwfeObservable

#add_observer, #remove_observer

Methods included from OwfeServiceLocator

#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator

Methods included from ServiceMixin

#service_init

Methods included from Contextual

#get_work_directory, #init_service, #lookup

Methods included from Logging

#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn

Constructor Details

#initialize(service_name, application_context) ⇒ ExpressionPool

The constructor for the expression pool.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/openwfe/expool/expressionpool.rb', line 104

def initialize (service_name, application_context)

    super()
    
    service_init(service_name, application_context)

    @paused_instances = {}

    @monitors = MonitorProvider.new(application_context)

    @observers = {}

    @stopped = false

    engine_environment_id
        # makes sure it's called now

    start_workqueue
end

Instance Attribute Details

#paused_instancesObject (readonly)

The hash containing the wfid of the process instances currently paused.



99
100
101
# File 'lib/openwfe/expool/expressionpool.rb', line 99

def paused_instances
  @paused_instances
end

Instance Method Details

#apply(exp, workitem) ⇒ Object

Applies a given expression (id or expression)



346
347
348
349
350
# File 'lib/openwfe/expool/expressionpool.rb', line 346

def apply (exp, workitem)

    queue_work :do_apply, exp, workitem
    #do_apply exp, workitem
end

#cancel(exp) ⇒ Object

Cancels the given expression. The param might be an expression instance or a FlowExpressionId instance.



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/openwfe/expool/expressionpool.rb', line 366

def cancel (exp)

    exp, fei = fetch exp

    unless exp
        ldebug { "cancel() cannot cancel missing  #{fei.to_debug_s}" }
        return nil
    end

    ldebug { "cancel() for  #{fei.to_debug_s}" }

    onotify :cancel, exp

    inflowitem = exp.cancel()
    remove exp

    inflowitem
end

#cancel_expression(exp) ⇒ Object

Cancels the given expression and makes sure to resume the flow if the expression or one of its children were active.

If the cancelled branch was not active, this method will take care of removing the cancelled expression from the parent expression.



393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/openwfe/expool/expressionpool.rb', line 393

def cancel_expression (exp)

    exp = fetch_expression exp

    wi = cancel exp

    if wi
        reply_to_parent exp, wi, false
    else
        parent_exp = fetch_expression exp.parent_id
        parent_exp.remove_child(exp.fei) if parent_exp
    end
end

#cancel_process(exp_or_wfid) ⇒ Object Also known as: cancel_flow

Given any expression of a process, cancels the complete process instance.



411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/openwfe/expool/expressionpool.rb', line 411

def cancel_process (exp_or_wfid)

    wfid = extract_wfid exp_or_wfid, false

    ldebug { "cancel_process() '#{wfid}'" }

    root = fetch_root wfid

    raise "no process to cancel '#{wfid}'" unless root

    cancel root
end

#determine_rep(param) ⇒ Object

Gets the process definition (if necessary) and turns into into an expression tree (for storing into a RawExpression).



762
763
764
765
766
767
# File 'lib/openwfe/expool/expressionpool.rb', line 762

def determine_rep (param)

    param = read_uri(param) if param.is_a?(URI)

    DefParser.parse param
end

#engine_environment_idObject

Returns the unique engine_environment FlowExpressionId instance. There is only one such environment in an engine, hence this ‘singleton’ method.



664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
# File 'lib/openwfe/expool/expressionpool.rb', line 664

def engine_environment_id ()
    #synchronize do

    return @eei if @eei

    @eei = FlowExpressionId.new
    @eei.owfe_version = OPENWFERU_VERSION
    @eei.engine_id = get_engine.service_name
    @eei.initial_engine_id = @eei.engine_id
    @eei.workflow_definition_url = 'ee'
    @eei.workflow_definition_name = 'ee'
    @eei.workflow_definition_revision = '0'
    @eei.workflow_instance_id = '0'
    @eei.expression_name = EN_ENVIRONMENT
    @eei.expression_id = '0'
    @eei
    #end
end

#fetch(exp) ⇒ Object

Fetches a FlowExpression from the pool. Returns a tuple : the FlowExpression plus its FlowExpressionId.

The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.



537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
# File 'lib/openwfe/expool/expressionpool.rb', line 537

def fetch (exp)
    synchronize do

        #ldebug { "fetch() exp is of kind #{exp.class}" }

        fei = if exp.is_a?(FlowExpression)

            exp.fei 

        elsif not exp.is_a?(FlowExpressionId)

            raise \
                "Cannot fetch expression with key : "+
                "'#{fei}' (#{fei.class})"

        else

            exp
        end

        #ldebug { "fetch() for  #{fei.to_debug_s}" }

        [ get_expression_storage()[fei], fei ]
    end
end

#fetch_engine_environmentObject

Returns the engine environment (the top level environment)



578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
# File 'lib/openwfe/expool/expressionpool.rb', line 578

def fetch_engine_environment
    synchronize do
        #
        # synchronize to ensure that there's 1! engine env

        eei = engine_environment_id
        ee, fei = fetch eei

        return ee if ee

        ee = Environment.new_env(
            eei, nil, nil, @application_context, nil)

        ee.store_itself

        ee
    end
end

#fetch_expression(exp) ⇒ Object

Fetches a FlowExpression (returns only the FlowExpression instance)

The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.



569
570
571
572
573
# File 'lib/openwfe/expool/expressionpool.rb', line 569

def fetch_expression (exp)

    exp, fei = fetch exp
    exp
end

#fetch_root(wfid) ⇒ Object

Fetches the root expression of a process (or a subprocess).



600
601
602
603
# File 'lib/openwfe/expool/expressionpool.rb', line 600

def fetch_root (wfid)

    get_expression_storage.fetch_root wfid
end

#forget(parent_exp, exp) ⇒ Object

Forgets the given expression (makes sure to substitute its parent_id with the GONE_PARENT_ID constant)



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/openwfe/expool/expressionpool.rb', line 429

def forget (parent_exp, exp)

    exp, fei = fetch exp

    #ldebug { "forget() forgetting  #{fei}" }

    return if not exp

    onotify :forget, exp

    parent_exp.children.delete(fei)

    exp.parent_id = GONE_PARENT_ID
    exp.dup_environment
    exp.store_itself()

    ldebug { "forget() forgot      #{fei}" }
end

#get_monitor(fei) ⇒ Object

Obtains a unique monitor for an expression. It avoids the need for the FlowExpression instances to include the monitor mixin by themselves



143
144
145
146
# File 'lib/openwfe/expool/expressionpool.rb', line 143

def get_monitor (fei)

    @monitors[fei]
end

#launch(launchitem, options = {}) ⇒ Object

Instantiates a workflow definition and launches it.

This method call will return immediately, it could even return before the actual launch is completely over.

Returns the FlowExpressionId instance of the root expression of the newly launched flow.



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/openwfe/expool/expressionpool.rb', line 196

def launch (launchitem, options={})

    #
    # prepare raw expression

    raw_expression = prepare_raw_expression launchitem
        #
        # will raise an exception if there are requirements
        # and one of them is not met

    raw_expression.new_environment
        #
        # as this expression is the root of a new process instance,
        # it has to have an environment for all the variables of 
        # the process instance

    raw_expression = wrap_in_schedule(raw_expression, options) \
        if options.size > 0

    fei = raw_expression.fei

    #
    # apply prepared raw expression

    wi = build_workitem launchitem

    onotify :launch, fei, launchitem

    apply raw_expression, wi

    fei
end

#launch_template(requesting_expression, env_id, sub_id, template, workitem, params = nil) ⇒ Object

launches a subprocess



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/openwfe/expool/expressionpool.rb', line 306

def launch_template (
    requesting_expression, 
    env_id, 
    sub_id, 
    template, 
    workitem, 
    params=nil)

    rawexp = prepare_from_template(
        requesting_expression, env_id, sub_id, template, params)

    workitem.flow_expression_id = rawexp.fei

    onotify :launch_template, rawexp.fei, workitem

    apply rawexp, workitem

    rawexp.fei
end

#list_processes(options = {}) ⇒ Object

Lists all workflows (processes) currently in the expool (in the engine). This method will return a list of “process-definition” expressions (root of flows).



714
715
716
717
718
719
720
721
722
723
# File 'lib/openwfe/expool/expressionpool.rb', line 714

def list_processes (options={})

    options[:include_classes] = DefineExpression
        #
        # Maybe it would be better to list root expressions instead
        # so that expressions like 'sequence' can be used
        # as root expressions. Later...

    get_expression_storage.find_expressions options
end

#notify_error(error, fei, message, workitem) ⇒ Object

This method is called when apply() or reply() failed for an expression. There are currently only two ‘users’, the ParticipantExpression class and the do_process_workelement method of this ExpressionPool class.



732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
# File 'lib/openwfe/expool/expressionpool.rb', line 732

def notify_error (error, fei, message, workitem)

    fei = extract_fei fei
        # densha requires that... :(

    se = OpenWFE::exception_to_s error

    onotify :error, fei, message, workitem, error.class.name, se

    #fei = extract_fei fei

    if error.is_a?(PausedError)
        lwarn do
            "#{self.service_name} " +
            "operation :#{message.to_s} on #{fei.to_s} " +
            "delayed because process '#{fei.wfid}' is in pause"
        end
    else
        lwarn do
            "#{self.service_name} " +
            "operation :#{message.to_s} on #{fei.to_s} " +
            "failed with\n" + se
        end
    end
end

#prepare_from_template(requesting_expression, env_id, sub_id, template, params = nil) ⇒ Object

Prepares a raw expression from a template. Returns that raw expression.

Used in the concurrent-iterator when building up the children list and of course used by the launch_template() method.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/openwfe/expool/expressionpool.rb', line 236

def prepare_from_template (
    requesting_expression, env_id, sub_id, template, params=nil)

    rawexp = if template.is_a?(RawExpression)

        template.application_context = @application_context
        template

    elsif template.is_a?(FlowExpressionId)

        fetch_expression template

    else

        build_raw_expression nil, template
    end

    #raise "did not find subprocess in : #{template.to_s}"      #    unless rawexp

    rawexp = rawexp.dup
    rawexp.fei = rawexp.fei.dup

    if requesting_expression == nil

        rawexp.parent_id = nil
        rawexp.fei.workflow_instance_id = get_wfid_generator.generate

    elsif requesting_expression.kind_of?(FlowExpressionId)

        rawexp.parent_id = requesting_expression
        rawexp.fei.workflow_instance_id = \
            "#{requesting_expression.workflow_instance_id}.#{sub_id}"

    elsif requesting_expression.kind_of?(String)

        rawexp.parent_id = nil
        rawexp.fei.workflow_instance_id = \
            "#{requesting_expression}.#{sub_id}"

    else # kind is FlowExpression

        rawexp.parent_id = requesting_expression.fei
        rawexp.fei.workflow_instance_id = \
            "#{requesting_expression.fei.workflow_instance_id}.#{sub_id}"
    end

    #ldebug do
    #    "launch_template() spawning wfid " +
    #    "#{rawexp.fei.workflow_instance_id.to_s}"
    #end

    if env_id

        rawexp.environment_id = env_id
    else
        #
        # the new scope gets its own environment
        #
        rawexp.new_environment params
    end

    rawexp.store_itself

    rawexp
end

#prepare_raw_expression(launchitem) ⇒ Object

This method is called by the launch method. It’s actually the first stage of that method. It may be interessant to use to ‘validate’ a launchitem and its process definition, as it will raise an exception in case of ‘parameter’ mismatch.

There is a ‘pre_launch_check’ alias for this method in the Engine class.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/openwfe/expool/expressionpool.rb', line 158

def prepare_raw_expression (launchitem)

    wfdurl = launchitem.workflow_definition_url

    raise "launchitem.workflow_definition_url not set, cannot launch" \
        unless wfdurl

    definition = if wfdurl.match "^field:"

        wfdfield = wfdurl[6..-1]
        launchitem.attributes.delete wfdfield
    else

        read_uri wfdurl
    end

    raise "didn't find process definition at '#{wfdurl}'" \
        unless definition

    raw_expression = build_raw_expression launchitem, definition

    raw_expression.check_parameters launchitem
        #
        # will raise an exception if there are requirements
        # and one of them is not met

    raw_expression
end

#process_stack(wfid, unapplied = false) ⇒ Object

Returns the list of applied expressions belonging to a given workflow instance.

If the unapplied optional parameter is set to true, all the expressions (even those not yet applied) that compose the process instance will be returned. Environments will be returned as well.



691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
# File 'lib/openwfe/expool/expressionpool.rb', line 691

def process_stack (wfid, unapplied=false)

    #raise "please provide a non-nil workflow instance id"      #    unless wfid

    wfid = extract_wfid wfid, true

    params = {
        #:exclude_classes => [ Environment, RawExpression ],
        #:exclude_classes => [ Environment ],
        :parent_wfid => wfid
    }
    params[:applied] = true if (not unapplied)

    get_expression_storage.find_expressions params
end

#remove(exp) ⇒ Object

Removes a flow expression from the pool (This method is mainly called from the pool itself)



609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/openwfe/expool/expressionpool.rb', line 609

def remove (exp)

    exp, _fei = fetch(exp) \
        if exp.is_a?(FlowExpressionId)

    return unless exp

    ldebug { "remove() fe  #{exp.fei.to_debug_s}" }

    onotify :remove, exp.fei

    synchronize do

        @monitors.delete(exp.fei)

        remove_environment(exp.environment_id) \
            if exp.owns_its_environment?
    end
end

#reply(exp, workitem) ⇒ Object

Replies to a given expression



355
356
357
358
359
# File 'lib/openwfe/expool/expressionpool.rb', line 355

def reply (exp, workitem)

    queue_work :do_reply, exp, workitem
    #do_reply exp, workitem
end

#reply_to_parent(exp, workitem, remove = true) ⇒ Object

Replies to the parent of the given expression.



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
# File 'lib/openwfe/expool/expressionpool.rb', line 451

def reply_to_parent (exp, workitem, remove=true)

    ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }

    workitem.last_expression_id = exp.fei

    onotify :reply_to_parent, exp, workitem

    if remove

        remove(exp)
            #
            # remove the expression itself

        exp.clean_children()
            #
            # remove all the children of the expression
    end

    #
    # manage tag, have to remove it so it can get 'redone' or 'undone'
    # (preventing abuse)

    tagname = exp.attributes["tag"] if exp.attributes

    exp.delete_variable(tagname) if tagname

    #
    # flow terminated ?

    if not exp.parent_id

        ldebug do 
            "reply_to_parent() process " +
            "#{exp.fei.workflow_instance_id} terminated"
        end

        onotify :terminate, exp, workitem

        return
    end

    #
    # else, gone parent ?

    if exp.parent_id == GONE_PARENT_ID
        ldebug do
            "reply_to_parent() parent is gone for  " +
            exp.fei.to_debug_s
        end

        return
    end

    #
    # parent still present, reply to it

    reply exp.parent_id, workitem
end

#rescheduleObject

This method is called at each expool (engine) [re]start. It roams through the previously saved (persisted) expressions to reschedule ones like ‘sleep’ or ‘cron’.



634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
# File 'lib/openwfe/expool/expressionpool.rb', line 634

def reschedule

    return if @stopped

    synchronize do

        t = OpenWFE::Timer.new

        linfo { "reschedule() initiating..." }

        options = { :include_classes => Rufus::Schedulable }

        get_expression_storage.find_expressions(options).each do |fexp|

            linfo { "reschedule() for  #{fexp.fei.to_s}..." }

            onotify :reschedule, fexp.fei

            fexp.reschedule get_scheduler
        end

        linfo { "reschedule() done. (took #{t.duration} ms)" }
    end
end

#stopObject

Stops this expression pool (especially its workqueue).



127
128
129
130
131
132
133
134
135
136
# File 'lib/openwfe/expool/expressionpool.rb', line 127

def stop

    @stopped = true

    stop_workqueue
        #
        # flushes the work queue

    onotify :stop
end

#update(flow_expression) ⇒ Object

Adds or updates a flow expression in this pool



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
# File 'lib/openwfe/expool/expressionpool.rb', line 514

def update (flow_expression)

    ldebug { "update() for #{flow_expression.fei.to_debug_s}" }

    #t = Timer.new

    onotify :update, flow_expression.fei, flow_expression

    #ldebug do 
    #    "update() took #{t.duration} ms  " +
    #    "#{flow_expression.fei.to_debug_s}"
    #end

    flow_expression
end