Class: OpenWFE::Engine

Inherits:
Service show all
Includes:
FeiMixin, OwfeServiceLocator, StatusMixin
Defined in:
lib/openwfe/engine/engine.rb

Overview

The simplest implementation of the OpenWFE workflow engine. No persistence is used, everything is stored in memory.

Direct Known Subclasses

FilePersistedEngine

Instance Attribute Summary

Attributes included from ServiceMixin

#service_name

Attributes included from Contextual

#application_context

Instance Method Summary collapse

Methods included from StatusMixin

#is_paused?, #process_status, #process_statuses

Methods included from OwfeServiceLocator

#get_engine, #get_error_journal, #get_expool, #get_expression_map, #get_expression_pool, #get_expression_storage, #get_expression_storages, #get_journal, #get_participant_map, #get_scheduler, #get_wfid_generator

Methods included from ServiceMixin

#service_init

Methods included from Contextual

#get_work_directory, #init_service, #lookup

Methods included from Logging

#ldebug, #ldebug_callstack, #lerror, #lfatal, #linfo, #llog, #lunknown, #lwarn

Constructor Details

#initialize(application_context = {}) ⇒ Engine

Builds an OpenWFEru engine.

Accepts an optional initial application_context (containing initialization params for services for example).

The engine itself uses one param :logger, used to define where all the log output for OpenWFEru should go. By default, this output goes to logs/openwferu.log



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/openwfe/engine/engine.rb', line 82

def initialize (application_context={})

    super S_ENGINE, application_context

    $OWFE_LOG = application_context[:logger]

    unless $OWFE_LOG
        #puts "Creating logs in " + FileUtils.pwd
        FileUtils.mkdir("logs") unless File.exist?("logs")
        $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000
        $OWFE_LOG.level = Logger::INFO
    end

    # build order matters.
    #
    # especially for the expstorage which 'observes' the expression
    # pool and thus needs to be instantiated after it.

    build_scheduler
        #
        # for delayed or repetitive executions (it's the engine's clock)
        # see http://openwferu.rubyforge.org/scheduler.html

    build_expression_map
        #
        # mapping expression names ('sequence', 'if', 'concurrence', 
        # 'when'...) to their implementations (SequenceExpression,
        # IfExpression, ConcurrenceExpression, ...)

    build_wfid_generator
        #
        # the workflow instance (process instance) id generator
        # making sure each process instance has a unique identifier

    build_expression_pool
        #
        # the core (hairy ball) of the engine

    build_expression_storage
        #
        # the engine persistence (persisting the expression instances
        # that make up process instances)

    build_participant_map
        #
        # building the services that maps participant names to 
        # participant implementations / instances.

    build_error_journal
        #
        # builds the error journal (keeping track of failures
        # in business process executions, and an opportunity to
        # fix and replay)

    linfo { "new() --- engine started --- #{self.object_id}" }
end

Instance Method Details

#add_workitem_listener(listener, freq = nil) ⇒ Object

Adds a workitem listener to this engine.

The ‘freq’ parameters if present might indicate how frequently the resource should be polled for incoming workitems.

engine.add_workitem_listener(listener, "3m10s")
   # every 3 minutes and 10 seconds

engine.add_workitem_listener(listener, "0 22 * * 1-5")
   # every weekday at 10pm

TODO : block handling…



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/openwfe/engine/engine.rb', line 349

def add_workitem_listener (listener, freq=nil)

    name = nil

    if listener.kind_of? Class

        listener = init_service nil, listener

        name = listener.service_name
    else

        name = listener.name if listener.respond_to? :name
        name = "#{listener.class}::#{listener.object_id}" unless name

        @application_context[name] = listener
    end

    result = nil

    if freq

        freq = freq.to_s.strip

        result = if Rufus::Scheduler.is_cron_string(freq)

            get_scheduler.schedule(freq, listener)
        else

            get_scheduler.schedule_every(freq, listener)
        end
    end

    linfo { "add_workitem_listener() added '#{name}'" }

    result
end

#cancel_expression(exp_or_fei) ⇒ Object

Cancels the given expression (and its children if any) (warning : advanced method)

Cancelling the root expression of a process is equivalent to cancelling the process.



595
596
597
598
# File 'lib/openwfe/engine/engine.rb', line 595

def cancel_expression (exp_or_fei)

    get_expression_pool.cancel_expression exp_or_fei
end

#cancel_process(exp_or_wfid) ⇒ Object Also known as: cancel_flow, abort_process

Given any expression of a process, cancels the complete process instance.



581
582
583
584
# File 'lib/openwfe/engine/engine.rb', line 581

def cancel_process (exp_or_wfid)

    get_expression_pool.cancel_process exp_or_wfid
end

#enable_irb_consoleObject

Enabling the console means that hitting CTRL-C on the window / term / dos box / whatever does run the OpenWFEru engine will open an IRB interactive console for directly manipulating the engine instance.

Hit CTRL-D to get out of the console.



426
427
428
429
# File 'lib/openwfe/engine/engine.rb', line 426

def enable_irb_console

    OpenWFE::trap_int_irb(binding)
end

#forget_expression(exp_or_fei) ⇒ Object

Forgets the given expression (make it an orphan) (warning : advanced method)



604
605
606
607
# File 'lib/openwfe/engine/engine.rb', line 604

def forget_expression (exp_or_fei)

    get_expression_pool.forget exp_or_fei 
end

#get_participant(participant_name) ⇒ Object

Given a participant name, returns the participant in charge of handling workitems for that name. May be useful in some embedded contexts.



321
322
323
324
# File 'lib/openwfe/engine/engine.rb', line 321

def get_participant (participant_name)

    get_participant_map.lookup_participant participant_name
end

#get_variables(fei_or_wfid = nil) ⇒ Object

Returns the variables set for a process or an expression.

If a process (wfid) is given, variables of the process environment will be returned, else variables in the environment valid for the expression (fei) will be returned.

If nothing (or nil) is given, the variables set in the engine environment will be returned.



696
697
698
699
700
701
702
# File 'lib/openwfe/engine/engine.rb', line 696

def get_variables (fei_or_wfid=nil)

    return get_expression_pool.fetch_engine_environment.variables \
        unless fei_or_wfid

    fetch_exp(fei_or_wfid).get_environment.variables
end

#joinObject

Makes the current thread join the engine’s scheduler thread

You can thus make an engine standalone with something like :

require 'openwfe/engine/engine'

the_engine = OpenWFE::Engine.new
the_engine.join

And you’ll have to hit CTRL-C to make it stop.



398
399
400
401
# File 'lib/openwfe/engine/engine.rb', line 398

def join

    get_scheduler.join
end

#join_until_idleObject

Calling this method makes the control flow block until the workflow engine is inactive.

TODO : implement idle_for



409
410
411
412
413
414
415
416
# File 'lib/openwfe/engine/engine.rb', line 409

def join_until_idle

    storage = get_expression_storage

    while storage.size > 1
        sleep 1
    end
end

#launch(launch_object, options = {}) ⇒ Object

Launches a [business] process. The ‘launch_object’ param may contain either a LaunchItem instance, either a String containing the URL of the process definition to launch (with an empty LaunchItem created on the fly).

The launch object can also be a String containing the XML process definition or directly a class extending OpenWFE::ProcessDefinition (Ruby process definition).

Returns the FlowExpressionId instance of the expression at the root of the newly launched process.

Options for scheduled launches like :at, :in and :cron are accepted via the ‘options’ optional parameter. For example :

engine.launch(launch_item)
    # will launch immediately

engine.launch(launch_item, :in => "1d20m")
    # will launch in one day and twenty minutes

engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
    # will launch at that point in time

engine.launch(launch_item, :cron => "0 5 * * *")
    # will launch that same process every day,
    # five minutes after midnight (see "man 5 crontab")


198
199
200
201
202
203
204
205
206
207
208
# File 'lib/openwfe/engine/engine.rb', line 198

def launch (launch_object, options={})

    launchitem = extract_launchitem launch_object

    fei = get_expression_pool.launch launchitem, options

    fei.dup
        #
        # so that users of this launch() method can play with their
        # fei without breaking things
end

#list_processes(options = {}) ⇒ Object Also known as: list_workflows

Lists all workflow (process) instances currently in the expool (in the engine). This method will return a list of “process-definition” expressions (i.e. OpenWFE::DefineExpression objects – each representing the root element of a flow).

:wfid

will list only one process, :wfid => '20071208-gipijiwozo'

:parent_wfid

will list only one process, and its subprocesses, :parent_wfid => '20071208-gipijiwozo'

:consider_subprocesses

if true, “process-definition” expressions of subprocesses will be returned as well.

:wfid_prefix

allows your to query for specific workflow instance id prefixes. for example : :wfid_prefix => "200712" for the processes started in December.

:wfname

will return only the process instances who belongs to the given workflow [name].

:wfrevision

usued in conjuction with :wfname, returns only the process instances of a given workflow revision.



571
572
573
574
# File 'lib/openwfe/engine/engine.rb', line 571

def list_processes (options={})

    get_expression_pool.list_processes options
end

#lookup_processes(var_name, value = nil) ⇒ Object

Returns an array of wfid (workflow instance ids) whose root environment containes the given variable

If there are no matches, an empty array will be returned.

Regular expressions are accepted as values.

If no value is given, all processes with the given variable name set will be returned.



715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
# File 'lib/openwfe/engine/engine.rb', line 715

def lookup_processes (var_name, value=nil)

    # TODO : maybe this would be better in the ExpressionPool

    regexp = if value
        if value.is_a?(Regexp)
            value
        else
            Regexp.compile(value.to_s)
        end
    else
        nil
    end

    envs = get_expression_storage.find_expressions(
        :include_classes => Environment)

    envs = envs.find_all do |env|
        val = env.variables[var_name]
        (val and ((not regexp) or (regexp.match(val))))
    end
    envs.collect do |env|
        env.fei.wfid
    end

    #envs.inject([]) do |r, env|
    #    val = env.variables[var_name]
    #    r << env.fei.wfid      #        if (val and ((not regexp) or (regexp.match(val))))
    #    r
    #end
        #
        # seems slower...
end

#lookup_variable(var_name, fei_or_wfid = nil) ⇒ Object

Looks up a process variable in a process. If fei_or_wfid is not given, will simply look in the ‘engine environment’ (where the top level variables ‘//’ do reside).



678
679
680
681
682
683
684
# File 'lib/openwfe/engine/engine.rb', line 678

def lookup_variable (var_name, fei_or_wfid=nil)

    return get_expression_pool.fetch_engine_environment[var_name] \
        unless fei_or_wfid

    fetch_exp(fei_or_wfid).lookup_variable var_name
end

#pause_process(wfid) ⇒ Object

Pauses a process (sets its /__paused__ variable to true).



612
613
614
615
616
617
618
619
620
# File 'lib/openwfe/engine/engine.rb', line 612

def pause_process (wfid)

    wfid = extract_wfid wfid

    root_expression = get_expression_pool.fetch_root wfid

    get_expression_pool.paused_instances[wfid] = true
    root_expression.set_variable VAR_PAUSED, true
end

#pre_launch_check(launchitem) ⇒ Object

When ‘parameters’ are used at the top of a process definition, this method can be used to assert a launchitem before launch. An expression will be raised if the parameters do not match the requirements.

Note that the launch method will raise those exceptions as well. This method can be useful in some scenarii though.



163
164
165
166
# File 'lib/openwfe/engine/engine.rb', line 163

def pre_launch_check (launchitem)

    get_expression_pool.prepare_raw_expression(launchitem)
end

#process_stack(workflow_instance_id, unapplied = false) ⇒ Object Also known as: get_process_stack, get_flow_stack

Returns the list of applied expressions belonging to a given workflow instance. May be used to determine where a process instance currently is.

This method returns all the expressions (the stack) a process went through to reach its current state.

If the unapplied optional parameter is set to true, all the expressions (even those not yet applied) that compose the process instance will be returned.



536
537
538
539
# File 'lib/openwfe/engine/engine.rb', line 536

def process_stack (workflow_instance_id, unapplied=false)

    get_expression_pool.process_stack workflow_instance_id, unapplied
end

#register_participant(regex, participant = nil, &block) ⇒ Object

Registers a participant in this [embedded] engine. This method is a shortcut to the ParticipantMap method with the same name.

engine.register_participant "user-.*", HashParticipant

or

engine.register_participant "user-.*" do |wi|
    puts "participant '#{wi.participant_name}' received a workitem"
    #
    # and did nothing with it
    # as a block participant implicitely returns the workitem
    # to the engine
end

Returns the participant instance.

The participant parameter can be set to hash like in

engine.register_participant(
    "alpha", 
    { :participant => HashParticipant, :position => :first })

or

engine.register_participant("alpha", :position => :first) do
    puts "first !"
end

There are some times where you have to position a participant first (especially with the regex technique).

see ParticipantMap#register_participant



302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/openwfe/engine/engine.rb', line 302

def register_participant (regex, participant=nil, &block)

    #get_participant_map.register_participant(
    #    regex, participant, &block)

    params = if participant.class == Hash
        participant
    else
        { :participant => participant }
    end

    get_participant_map.register_participant regex, params, &block
end

#replay_at_error(error) ⇒ Object

Takes care of removing an error from the error journal and they replays its process at that point.



661
662
663
664
665
666
667
668
669
670
671
# File 'lib/openwfe/engine/engine.rb', line 661

def replay_at_error (error)

    get_error_journal.remove_errors(
        error.fei.parent_wfid, 
        error)

    get_expression_pool.queue_work(
        error.message,
        error.fei,
        error.workitem)
end

#reply(workitem) ⇒ Object Also known as: forward, proceed

This method is used to feed a workitem back to the engine (after it got sent to a worklist or wherever by a participant). Participant implementations themselves do call this method usually.

This method also accepts LaunchItem instances.

Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem that don’t belong to a process instance (ie whose flow_expression_id is nil). It will simply notify the participant_map of the reply for the given participant_name. If there is no participant_name specified for this orphan workitem, an exception will be raised.



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/openwfe/engine/engine.rb', line 223

def reply (workitem)

    if workitem.is_a?(InFlowWorkItem)

        if workitem.flow_expression_id
            #
            # vanilla case, workitem coming back
            # (from listener probably)

            return get_expression_pool.reply(
                workitem.flow_expression_id, workitem)
        end

        if workitem.participant_name
            #
            # a workitem that doesn't belong to a process instance
            # but bears a participant name.
            # Notify, there may be something listening on
            # this channel (see the 'listen' expression).

            return get_participant_map.onotify(
                workitem.participant_name, :reply, workitem)
        end

        raise \
            "InFlowWorkitem doesn't belong to a process instance" +
            " nor to a participant"
    end

    return get_expression_pool.launch(workitem) \
        if workitem.is_a?(LaunchItem)
            #
            # launchitem coming from listener
            # let's attempt to launch a new process instance

    raise \
        "engine.reply() " +
        "cannot handle instances of #{workitem.class}"
end

#rescheduleObject Also known as: reload

Call this method once the participants for a persisted engine have been [re]added.

If this method is called too soon, missing participants will cause trouble… Call this method after all the participants have been added.



147
148
149
150
# File 'lib/openwfe/engine/engine.rb', line 147

def reschedule

    get_expression_pool.reschedule()
end

#resume_process(wfid) ⇒ Object

Restarts a process : removes its ‘paused’ flag (variable) and makes sure to ‘replay’ events (replies) that came for it while it was in pause.



627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
# File 'lib/openwfe/engine/engine.rb', line 627

def resume_process (wfid)

    wfid = extract_wfid wfid

    root_expression = get_expression_pool.fetch_root wfid

    #
    # remove 'paused' flag

    get_expression_pool.paused_instances.delete wfid
    root_expression.unset_variable VAR_PAUSED 

    #
    # replay
    #
    # select PausedError instances in separate list

    errors = get_error_journal.get_error_log wfid
    error_class = PausedError.name
    paused_errors = errors.select { |e| e.error_class == error_class }

    return if paused_errors.size < 1

    # replay select PausedError instances

    paused_errors.each do |e|
        replay_at_error e
    end
end

#stopObject

Stopping the engine will stop all the services in the application context.



447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/openwfe/engine/engine.rb', line 447

def stop

    linfo { "stop() stopping engine '#{@service_name}'" }

    @application_context.each do |sname, service|

        next if sname == self.service_name

        #if service.kind_of?(ServiceMixin)
        if service.respond_to?(:stop)

            service.stop

            linfo do 
                "stop() stopped service '#{sname}' (#{service.class})"
            end
        end
    end

    linfo { "stop() stopped engine '#{@service_name}'" }

    nil
end

#unregister_participant(participant_name) ⇒ Object

Removes the first participant matching the given name from the participant map kept by the engine.



330
331
332
333
# File 'lib/openwfe/engine/engine.rb', line 330

def unregister_participant (participant_name)

    get_participant_map.unregister_participant participant_name
end

#update_expression(fexp) ⇒ Object

Replaces an expression in the pool with a newer version of it.

(useful when fixing processes on the fly)



801
802
803
804
805
806
# File 'lib/openwfe/engine/engine.rb', line 801

def update_expression (fexp)

    fexp.application_context = application_context

    get_expression_pool.update fexp
end

#update_expression_data(fei, data) ⇒ Object

Use only when doing “process gardening”.

This method updates an expression, the ‘data’ parameter is expected to be a hash. If the expression is an Environment, the variables will be merged with the ones found in the data param. If the expression is not an Environment, the data will be merged into the ‘applied_workitem’ if any.

If the merge is not possible, an exception will be raised.



761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
# File 'lib/openwfe/engine/engine.rb', line 761

def update_expression_data (fei, data)

    fexp = fetch_exp fei

    original = if fexp.is_a?(Environment)

        fexp.variables
    else

        fexp.applied_workitem.attributes
    end

    original.merge! data

    get_expression_pool.update fexp
end

#update_raw_expression(fei, representation) ⇒ Object

A variant of update_expression() that directly replaces the raw representation stored within a RawExpression.

Useful for modifying [not yet reached] segments of processes.



784
785
786
787
788
789
790
791
792
793
794
# File 'lib/openwfe/engine/engine.rb', line 784

def update_raw_expression (fei, representation)

    fexp = fetch_exp fei

    raise "cannot update already applied expression" \
        unless fexp.is_a?(RawExpression)

    fexp.raw_representation = representation

    get_expression_pool.update fexp
end

#wait_for(fei_or_wfid) ⇒ Object

Waits for a given process instance to terminate. The method only exits when the flow terminates, but beware : if the process already terminated, the method will never exit.

The parameter can be a FlowExpressionId instance, for example the one given back by a launch(), or directly a workflow instance id (String).

This method is mainly used in utests.



482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/openwfe/engine/engine.rb', line 482

def wait_for (fei_or_wfid)

    wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
        fei_or_wfid.workflow_instance_id
    else
        fei_or_wfid
    end

    t = Thread.new { Thread.stop }

    to = get_expression_pool.add_observer(:terminate) do |c, fe, wi|
        t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?)
    end
    te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e|
        t.wakeup if (fei.parent_wfid == wfid and t.alive?)
    end
    #tc = get_expression_pool.add_observer(:cancel) do |c, fe|
    #    if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?)
    #        sleep 0.500
    #        t.wakeup 
    #    end
    #end

    linfo { "wait_for() #{wfid}" }

    t.join

    get_expression_pool.remove_observer(to, :terminate)
    get_expression_pool.remove_observer(te, :error)
    #get_expression_pool.remove_observer(tc, :cancel)
        #
        # it would work as well without specifying the channel,
        # but it's thus a little bit faster
end